This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 801c9f4394d [fix](local exchange) Fix BUCKET_HASH_SHUFFLE partition 
expr (#60765)
801c9f4394d is described below

commit 801c9f4394d22f731b6df81662eeb5306921b4e8
Author: Gabriel <[email protected]>
AuthorDate: Sat Feb 14 22:11:45 2026 +0800

    [fix](local exchange) Fix BUCKET_HASH_SHUFFLE partition expr (#60765)
    
    ### What problem does this PR solve?
    
    pick #60764
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [ ] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [ ] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 be/src/pipeline/pipeline_fragment_context.cpp | 49 ++++++++++++++-------------
 be/src/pipeline/pipeline_fragment_context.h   | 11 +++---
 2 files changed, 30 insertions(+), 30 deletions(-)

diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index eb76dd95ae7..78e961d0111 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -640,7 +640,7 @@ Status 
PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const Descrip
     int node_idx = 0;
 
     RETURN_IF_ERROR(_create_tree_helper(pool, _params.fragment.plan.nodes, 
descs, nullptr,
-                                        &node_idx, root, cur_pipe, 0, false));
+                                        &node_idx, root, cur_pipe, 0, false, 
false));
 
     if (node_idx + 1 != _params.fragment.plan.nodes.size()) {
         return Status::InternalError(
@@ -649,12 +649,10 @@ Status 
PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const Descrip
     return Status::OK();
 }
 
-Status PipelineFragmentContext::_create_tree_helper(ObjectPool* pool,
-                                                    const 
std::vector<TPlanNode>& tnodes,
-                                                    const DescriptorTbl& 
descs, OperatorPtr parent,
-                                                    int* node_idx, 
OperatorPtr* root,
-                                                    PipelinePtr& cur_pipe, int 
child_idx,
-                                                    const bool 
followed_by_shuffled_operator) {
+Status PipelineFragmentContext::_create_tree_helper(
+        ObjectPool* pool, const std::vector<TPlanNode>& tnodes, const 
DescriptorTbl& descs,
+        OperatorPtr parent, int* node_idx, OperatorPtr* root, PipelinePtr& 
cur_pipe, int child_idx,
+        const bool followed_by_shuffled_operator, const bool 
require_bucket_distribution) {
     // propagate error case
     if (*node_idx >= tnodes.size()) {
         return Status::InternalError(
@@ -665,10 +663,12 @@ Status 
PipelineFragmentContext::_create_tree_helper(ObjectPool* pool,
 
     int num_children = tnodes[*node_idx].num_children;
     bool current_followed_by_shuffled_operator = followed_by_shuffled_operator;
+    bool current_require_bucket_distribution = require_bucket_distribution;
     OperatorPtr op = nullptr;
     RETURN_IF_ERROR(_create_operator(pool, tnodes[*node_idx], descs, op, 
cur_pipe,
                                      parent == nullptr ? -1 : 
parent->node_id(), child_idx,
-                                     followed_by_shuffled_operator));
+                                     followed_by_shuffled_operator,
+                                     current_require_bucket_distribution));
     // Initialization must be done here. For example, group by expressions in 
agg will be used to
     // decide if a local shuffle should be planed, so it must be initialized 
here.
     RETURN_IF_ERROR(op->init(tnode, _runtime_state.get()));
@@ -700,6 +700,12 @@ Status 
PipelineFragmentContext::_create_tree_helper(ObjectPool* pool,
                                             : op->is_shuffled_operator())) &&
             
Pipeline::is_hash_exchange(required_data_distribution.distribution_type);
 
+    current_require_bucket_distribution =
+            (require_bucket_distribution ||
+             (cur_pipe->operators().empty() ? 
cur_pipe->sink()->is_colocated_operator()
+                                            : op->is_colocated_operator())) &&
+            
Pipeline::is_hash_exchange(required_data_distribution.distribution_type);
+
     if (num_children == 0) {
         _use_serial_source = op->is_serial_operator();
     }
@@ -707,7 +713,8 @@ Status 
PipelineFragmentContext::_create_tree_helper(ObjectPool* pool,
     for (int i = 0; i < num_children; i++) {
         ++*node_idx;
         RETURN_IF_ERROR(_create_tree_helper(pool, tnodes, descs, op, node_idx, 
nullptr, cur_pipe, i,
-                                            
current_followed_by_shuffled_operator));
+                                            
current_followed_by_shuffled_operator,
+                                            
current_require_bucket_distribution));
 
         // we are expecting a child, but have used all nodes
         // this means we have been given a bad tree and must fail
@@ -1192,18 +1199,15 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
                                                  const DescriptorTbl& descs, 
OperatorPtr& op,
                                                  PipelinePtr& cur_pipe, int 
parent_idx,
                                                  int child_idx,
-                                                 const bool 
followed_by_shuffled_operator) {
+                                                 const bool 
followed_by_shuffled_operator,
+                                                 const bool 
require_bucket_distribution) {
     std::vector<DataSinkOperatorPtr> sink_ops;
     Defer defer = Defer([&]() {
         if (op) {
-            op->update_operator(tnode, followed_by_shuffled_operator, 
_require_bucket_distribution);
-            _require_bucket_distribution =
-                    _require_bucket_distribution || 
op->is_colocated_operator();
+            op->update_operator(tnode, followed_by_shuffled_operator, 
require_bucket_distribution);
         }
         for (auto& s : sink_ops) {
-            s->update_operator(tnode, followed_by_shuffled_operator, 
_require_bucket_distribution);
-            _require_bucket_distribution =
-                    _require_bucket_distribution || s->is_colocated_operator();
+            s->update_operator(tnode, followed_by_shuffled_operator, 
require_bucket_distribution);
         }
     });
     // We directly construct the operator from Thrift because the given array 
is in the order of preorder traversal.
@@ -1580,15 +1584,13 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
         break;
     }
     case TPlanNodeType::INTERSECT_NODE: {
-        RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(
-                pool, tnode, descs, op, cur_pipe, parent_idx, child_idx,
-                !tnode.intersect_node.is_colocate || 
followed_by_shuffled_operator, sink_ops));
+        RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(pool, 
tnode, descs, op,
+                                                                      
cur_pipe, sink_ops));
         break;
     }
     case TPlanNodeType::EXCEPT_NODE: {
-        RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(
-                pool, tnode, descs, op, cur_pipe, parent_idx, child_idx,
-                !tnode.except_node.is_colocate || 
followed_by_shuffled_operator, sink_ops));
+        RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(pool, 
tnode, descs, op,
+                                                                       
cur_pipe, sink_ops));
         break;
     }
     case TPlanNodeType::REPEAT_NODE: {
@@ -1649,8 +1651,7 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
 template <bool is_intersect>
 Status PipelineFragmentContext::_build_operators_for_set_operation_node(
         ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, 
OperatorPtr& op,
-        PipelinePtr& cur_pipe, int parent_idx, int child_idx, bool 
followed_by_shuffled_operator,
-        std::vector<DataSinkOperatorPtr>& sink_ops) {
+        PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
     op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, 
next_operator_id(), descs));
     RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
 
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index 448f7e9b08d..e4cee3c5876 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -131,17 +131,17 @@ private:
     Status _create_tree_helper(ObjectPool* pool, const std::vector<TPlanNode>& 
tnodes,
                                const DescriptorTbl& descs, OperatorPtr parent, 
int* node_idx,
                                OperatorPtr* root, PipelinePtr& cur_pipe, int 
child_idx,
-                               const bool followed_by_shuffled_join);
+                               const bool followed_by_shuffled_join,
+                               const bool require_bucket_distribution);
 
     Status _create_operator(ObjectPool* pool, const TPlanNode& tnode, const 
DescriptorTbl& descs,
                             OperatorPtr& op, PipelinePtr& cur_pipe, int 
parent_idx, int child_idx,
-                            const bool followed_by_shuffled_join);
+                            const bool followed_by_shuffled_join,
+                            const bool require_bucket_distribution);
     template <bool is_intersect>
     Status _build_operators_for_set_operation_node(ObjectPool* pool, const 
TPlanNode& tnode,
                                                    const DescriptorTbl& descs, 
OperatorPtr& op,
-                                                   PipelinePtr& cur_pipe, int 
parent_idx,
-                                                   int child_idx,
-                                                   bool 
followed_by_shuffled_operator,
+                                                   PipelinePtr& cur_pipe,
                                                    
std::vector<DataSinkOperatorPtr>& sink_ops);
 
     Status _create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink,
@@ -321,7 +321,6 @@ private:
 
     // Total instance num running on all BEs
     int _total_instances = -1;
-    bool _require_bucket_distribution = false;
 
     TPipelineFragmentParams _params;
     int32_t _parallel_instances = 0;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to