This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new da4e859aecb Pick 0110 (#29795)
da4e859aecb is described below

commit da4e859aecb8ec6d51b00815ec2635c55de6c7e1
Author: Gabriel <[email protected]>
AuthorDate: Wed Jan 10 17:07:15 2024 +0800

    Pick 0110 (#29795)
    
    * [pipelineX](bug) Fix correctness problem using multiple BE (#29765)
    
    * [pipelineX](fix) fix StreamingAggSource crash due to empty data block 
(#29769)
---
 be/src/pipeline/exec/streaming_aggregation_source_operator.cpp |  9 +++++++--
 .../src/main/java/org/apache/doris/planner/ScanNode.java       |  4 +++-
 fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java  | 10 +++++-----
 3 files changed, 15 insertions(+), 8 deletions(-)

diff --git a/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp 
b/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp
index e68be656fea..44991e2e14f 100644
--- a/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp
@@ -83,16 +83,21 @@ Status StreamingAggSourceOperatorX::get_block(RuntimeState* 
state, vectorized::B
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
     if (!local_state._shared_state->data_queue->data_exhausted()) {
-        std::unique_ptr<vectorized::Block> agg_block;
+        std::unique_ptr<vectorized::Block> agg_block = nullptr;
         DCHECK(local_state._dependency->is_blocked_by() == nullptr);
         
RETURN_IF_ERROR(local_state._shared_state->data_queue->get_block_from_queue(&agg_block));
 
         if (local_state._shared_state->data_queue->data_exhausted()) {
             RETURN_IF_ERROR(Base::get_block(state, block, source_state));
-        } else {
+        } else if (agg_block) {
             block->swap(*agg_block);
             agg_block->clear_column_data(row_desc().num_materialized_slots());
             
local_state._shared_state->data_queue->push_free_block(std::move(agg_block));
+        } else if (local_state._shared_state->data_queue->is_all_finish()) {
+            source_state = SourceState::FINISHED;
+        } else {
+            return Status::InternalError("Something wrong in 
StreamingAggSource: {}",
+                                         Base::debug_string(0));
         }
     } else {
         RETURN_IF_ERROR(Base::get_block(state, block, source_state));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
index d21fab7ac9a..885bb335e49 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
@@ -724,7 +724,9 @@ public abstract class ScanNode extends PlanNode {
         return !isKeySearch() && context != null
                 && 
context.getSessionVariable().isIgnoreStorageDataDistribution()
                 && context.getSessionVariable().getEnablePipelineXEngine()
-                && !fragment.isHasNullAwareLeftAntiJoin();
+                && !fragment.isHasNullAwareLeftAntiJoin()
+                && ((this instanceof OlapScanNode) && ((OlapScanNode) 
this).getScanTabletIds().size()
+                < 
ConnectContext.get().getSessionVariable().getParallelExecInstanceNum());
     }
 
     public boolean haveLimitAndConjunts() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 9367316b201..efee918dfde 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -2044,9 +2044,10 @@ public class Coordinator implements CoordInterface {
                         boolean sharedScan = true;
                         int expectedInstanceNum = 
Math.min(parallelExecInstanceNum,
                                 leftMostNode.getNumInstances());
+                        boolean ignoreStorageDataDistribution = 
scanNodes.stream()
+                                .allMatch(scanNode -> 
scanNode.ignoreStorageDataDistribution(context)) && useNereids;
                         if (node.isPresent() && 
(!node.get().shouldDisableSharedScan(context)
-                                || 
(node.get().ignoreStorageDataDistribution(context)
-                                && expectedInstanceNum > 
perNodeScanRanges.size() && useNereids))) {
+                                || ignoreStorageDataDistribution)) {
                             expectedInstanceNum = 
Math.max(expectedInstanceNum, 1);
                             // if have limit and conjunts, only need 1 
instance to save cpu and
                             // mem resource
@@ -2906,9 +2907,8 @@ public class Coordinator implements CoordInterface {
          * 2. `parallelExecInstanceNum` is larger than scan ranges.
          * 3. Use Nereids planner.
          */
-        boolean ignoreStorageDataDistribution = 
scanNodes.stream().filter(scanNode -> {
-            return scanNodeIds.contains(scanNode.getId().asInt());
-        }).allMatch(node -> node.ignoreStorageDataDistribution(context))
+        boolean ignoreStorageDataDistribution = scanNodes.stream()
+                .allMatch(node -> node.ignoreStorageDataDistribution(context))
                 && 
addressToScanRanges.entrySet().stream().allMatch(addressScanRange -> {
                     return addressScanRange.getValue().size() < 
parallelExecInstanceNum;
                 }) && useNereids;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to