This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c02eedaef86 [fix](local exchange) Do global shuffle for shuffled 
join/set operator (#60758)
c02eedaef86 is described below

commit c02eedaef86d3a87abc1d1d7cbe9d28e1fabf791
Author: Gabriel <[email protected]>
AuthorDate: Sat Feb 14 17:20:00 2026 +0800

    [fix](local exchange) Do global shuffle for shuffled join/set operator 
(#60758)
    
    Make followed_by_shuffled_operator() virtual and override it in hash
    join and set operators to correctly report shuffle requirements.
    For multi-child operators (hash join, set operations), the method now
    returns true if the operator itself is shuffled (and not colocated),
    ensuring proper global shuffle behavior in local exchange.
    
      Changes:
      - Make followed_by_shuffled_operator() virtual in base OperatorX class
    - Override in HashJoinBuildSinkOperatorX and HashJoinProbeOperatorX to
    return is_shuffled_operator() && !is_colocated_operator()
    - Override in PartitionedHashJoinProbeOperatorX and
    PartitionedHashJoinSinkOperatorX to delegate to inner operator
    - Override in SetProbeSinkOperatorX and SetSinkOperatorX to return
    !_is_colocate
    
    This fixes incorrect data distribution when shuffled join/set operators
    are used in pipelines with local exchange.
---
 be/src/pipeline/exec/hashjoin_build_sink.h                  | 4 ++++
 be/src/pipeline/exec/hashjoin_probe_operator.h              | 4 ++++
 be/src/pipeline/exec/operator.h                             | 6 ++++--
 be/src/pipeline/exec/partitioned_hash_join_probe_operator.h | 3 +++
 be/src/pipeline/exec/partitioned_hash_join_sink_operator.h  | 3 +++
 be/src/pipeline/exec/set_probe_sink_operator.h              | 4 ++++
 be/src/pipeline/exec/set_sink_operator.h                    | 4 ++++
 7 files changed, 26 insertions(+), 2 deletions(-)

diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h 
b/be/src/pipeline/exec/hashjoin_build_sink.h
index 289755de645..f0b6f3c80dc 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -148,6 +148,10 @@ public:
         return _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE ||
                _join_distribution == TJoinDistributionType::COLOCATE;
     }
+    bool followed_by_shuffled_operator() const override {
+        return (is_shuffled_operator() && !is_colocated_operator()) ||
+               _followed_by_shuffled_operator;
+    }
     std::vector<bool>& is_null_safe_eq_join() { return _is_null_safe_eq_join; }
 
     bool allow_left_semi_direct_return(RuntimeState* state) const {
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h 
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index b2b322da0c4..ceabc6cb8b6 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -161,6 +161,10 @@ public:
         return _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE ||
                _join_distribution == TJoinDistributionType::COLOCATE;
     }
+    bool followed_by_shuffled_operator() const override {
+        return (is_shuffled_operator() && !is_colocated_operator()) ||
+               _followed_by_shuffled_operator;
+    }
 
     bool need_finalize_variant_column() const { return 
_need_finalize_variant_column; }
 
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 72fb311ef51..10e780c07f7 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -169,12 +169,14 @@ public:
      */
     [[nodiscard]] virtual bool is_shuffled_operator() const { return false; }
     /**
-     * Return True if this operator is followed by a shuffled operator.
+     * For multiple children's operators, return true if this is a shuffled 
operator or this is followed by a shuffled operator (HASH JOIN and SET 
OPERATION).
+     *
+     * For single child's operators, return true if this operator is followed 
by a shuffled operator.
      * For example, in the plan fragment:
      *   `UNION` -> `SHUFFLED HASH JOIN`
      * The `SHUFFLED HASH JOIN` is a shuffled operator so the UNION operator 
is followed by a shuffled operator.
      */
-    [[nodiscard]] bool followed_by_shuffled_operator() const {
+    [[nodiscard]] virtual bool followed_by_shuffled_operator() const {
         return _followed_by_shuffled_operator;
     }
     /**
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index b744b8f2f7d..167dc1bd00c 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -167,6 +167,9 @@ public:
     bool is_colocated_operator() const override {
         return _inner_probe_operator->is_colocated_operator();
     }
+    bool followed_by_shuffled_operator() const override {
+        return _inner_probe_operator->followed_by_shuffled_operator();
+    }
 
     void update_operator(const TPlanNode& tnode, bool 
followed_by_shuffled_operator,
                          bool require_bucket_distribution) override {
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index e098d071daa..59eed7aac66 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -150,6 +150,9 @@ public:
     bool is_shuffled_operator() const override {
         return _inner_sink_operator->is_shuffled_operator();
     }
+    bool followed_by_shuffled_operator() const override {
+        return _inner_sink_operator->followed_by_shuffled_operator();
+    }
 
     void update_operator(const TPlanNode& tnode, bool 
followed_by_shuffled_operator,
                          bool require_bucket_distribution) override {
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h 
b/be/src/pipeline/exec/set_probe_sink_operator.h
index 18cc29c37ce..141459fac5d 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.h
+++ b/be/src/pipeline/exec/set_probe_sink_operator.h
@@ -118,6 +118,10 @@ public:
 
     bool is_shuffled_operator() const override { return true; }
     bool is_colocated_operator() const override { return _is_colocate; }
+    bool followed_by_shuffled_operator() const override {
+        return (is_shuffled_operator() && !is_colocated_operator()) ||
+               Base::_followed_by_shuffled_operator;
+    }
 
 private:
     void _finalize_probe(SetProbeSinkLocalState<is_intersect>& local_state);
diff --git a/be/src/pipeline/exec/set_sink_operator.h 
b/be/src/pipeline/exec/set_sink_operator.h
index c22a09ac07e..6eb18db6bfc 100644
--- a/be/src/pipeline/exec/set_sink_operator.h
+++ b/be/src/pipeline/exec/set_sink_operator.h
@@ -122,6 +122,10 @@ public:
 
     bool is_shuffled_operator() const override { return true; }
     bool is_colocated_operator() const override { return _is_colocate; }
+    bool followed_by_shuffled_operator() const override {
+        return (is_shuffled_operator() && !is_colocated_operator()) ||
+               Base::_followed_by_shuffled_operator;
+    }
 
 private:
     template <class HashTableContext, bool is_intersected>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to