This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new da4e859aecb Pick 0110 (#29795)
da4e859aecb is described below
commit da4e859aecb8ec6d51b00815ec2635c55de6c7e1
Author: Gabriel <[email protected]>
AuthorDate: Wed Jan 10 17:07:15 2024 +0800
Pick 0110 (#29795)
* [pipelineX](bug) Fix correctness problem using multiple BE (#29765)
* [pipelineX](fix) fix StreamingAggSource crash due to empty data block
(#29769)
---
be/src/pipeline/exec/streaming_aggregation_source_operator.cpp | 9 +++++++--
.../src/main/java/org/apache/doris/planner/ScanNode.java | 4 +++-
fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java | 10 +++++-----
3 files changed, 15 insertions(+), 8 deletions(-)
diff --git a/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp
b/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp
index e68be656fea..44991e2e14f 100644
--- a/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp
@@ -83,16 +83,21 @@ Status StreamingAggSourceOperatorX::get_block(RuntimeState*
state, vectorized::B
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
if (!local_state._shared_state->data_queue->data_exhausted()) {
- std::unique_ptr<vectorized::Block> agg_block;
+ std::unique_ptr<vectorized::Block> agg_block = nullptr;
DCHECK(local_state._dependency->is_blocked_by() == nullptr);
RETURN_IF_ERROR(local_state._shared_state->data_queue->get_block_from_queue(&agg_block));
if (local_state._shared_state->data_queue->data_exhausted()) {
RETURN_IF_ERROR(Base::get_block(state, block, source_state));
- } else {
+ } else if (agg_block) {
block->swap(*agg_block);
agg_block->clear_column_data(row_desc().num_materialized_slots());
local_state._shared_state->data_queue->push_free_block(std::move(agg_block));
+ } else if (local_state._shared_state->data_queue->is_all_finish()) {
+ source_state = SourceState::FINISHED;
+ } else {
+ return Status::InternalError("Something wrong in
StreamingAggSource: {}",
+ Base::debug_string(0));
}
} else {
RETURN_IF_ERROR(Base::get_block(state, block, source_state));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
index d21fab7ac9a..885bb335e49 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
@@ -724,7 +724,9 @@ public abstract class ScanNode extends PlanNode {
return !isKeySearch() && context != null
&&
context.getSessionVariable().isIgnoreStorageDataDistribution()
&& context.getSessionVariable().getEnablePipelineXEngine()
- && !fragment.isHasNullAwareLeftAntiJoin();
+ && !fragment.isHasNullAwareLeftAntiJoin()
+ && ((this instanceof OlapScanNode) && ((OlapScanNode)
this).getScanTabletIds().size()
+ <
ConnectContext.get().getSessionVariable().getParallelExecInstanceNum());
}
public boolean haveLimitAndConjunts() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 9367316b201..efee918dfde 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -2044,9 +2044,10 @@ public class Coordinator implements CoordInterface {
boolean sharedScan = true;
int expectedInstanceNum =
Math.min(parallelExecInstanceNum,
leftMostNode.getNumInstances());
+ boolean ignoreStorageDataDistribution =
scanNodes.stream()
+ .allMatch(scanNode ->
scanNode.ignoreStorageDataDistribution(context)) && useNereids;
if (node.isPresent() &&
(!node.get().shouldDisableSharedScan(context)
- ||
(node.get().ignoreStorageDataDistribution(context)
- && expectedInstanceNum >
perNodeScanRanges.size() && useNereids))) {
+ || ignoreStorageDataDistribution)) {
expectedInstanceNum =
Math.max(expectedInstanceNum, 1);
// if have limit and conjunts, only need 1
instance to save cpu and
// mem resource
@@ -2906,9 +2907,8 @@ public class Coordinator implements CoordInterface {
* 2. `parallelExecInstanceNum` is larger than scan ranges.
* 3. Use Nereids planner.
*/
- boolean ignoreStorageDataDistribution =
scanNodes.stream().filter(scanNode -> {
- return scanNodeIds.contains(scanNode.getId().asInt());
- }).allMatch(node -> node.ignoreStorageDataDistribution(context))
+ boolean ignoreStorageDataDistribution = scanNodes.stream()
+ .allMatch(node -> node.ignoreStorageDataDistribution(context))
&&
addressToScanRanges.entrySet().stream().allMatch(addressScanRange -> {
return addressScanRange.getValue().size() <
parallelExecInstanceNum;
}) && useNereids;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]