This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c02eedaef86 [fix](local exchange) Do global shuffle for shuffled
join/set operator (#60758)
c02eedaef86 is described below
commit c02eedaef86d3a87abc1d1d7cbe9d28e1fabf791
Author: Gabriel <[email protected]>
AuthorDate: Sat Feb 14 17:20:00 2026 +0800
[fix](local exchange) Do global shuffle for shuffled join/set operator
(#60758)
Make followed_by_shuffled_operator() virtual and override it in hash
join and set operators to correctly report shuffle requirements.
For multi-child operators (hash join, set operations), the method now
returns true if the operator itself is shuffled (and not colocated),
ensuring proper global shuffle behavior in local exchange.
Changes:
- Make followed_by_shuffled_operator() virtual in base OperatorX class
- Override in HashJoinBuildSinkOperatorX and HashJoinProbeOperatorX to
return is_shuffled_operator() && !is_colocated_operator()
- Override in PartitionedHashJoinProbeOperatorX and
PartitionedHashJoinSinkOperatorX to delegate to inner operator
- Override in SetProbeSinkOperatorX and SetSinkOperatorX to return
!_is_colocate
This fixes incorrect data distribution when shuffled join/set operators
are used in pipelines with local exchange.
---
be/src/pipeline/exec/hashjoin_build_sink.h | 4 ++++
be/src/pipeline/exec/hashjoin_probe_operator.h | 4 ++++
be/src/pipeline/exec/operator.h | 6 ++++--
be/src/pipeline/exec/partitioned_hash_join_probe_operator.h | 3 +++
be/src/pipeline/exec/partitioned_hash_join_sink_operator.h | 3 +++
be/src/pipeline/exec/set_probe_sink_operator.h | 4 ++++
be/src/pipeline/exec/set_sink_operator.h | 4 ++++
7 files changed, 26 insertions(+), 2 deletions(-)
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h
b/be/src/pipeline/exec/hashjoin_build_sink.h
index 289755de645..f0b6f3c80dc 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -148,6 +148,10 @@ public:
return _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE ||
_join_distribution == TJoinDistributionType::COLOCATE;
}
+ bool followed_by_shuffled_operator() const override {
+ return (is_shuffled_operator() && !is_colocated_operator()) ||
+ _followed_by_shuffled_operator;
+ }
std::vector<bool>& is_null_safe_eq_join() { return _is_null_safe_eq_join; }
bool allow_left_semi_direct_return(RuntimeState* state) const {
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index b2b322da0c4..ceabc6cb8b6 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -161,6 +161,10 @@ public:
return _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE ||
_join_distribution == TJoinDistributionType::COLOCATE;
}
+ bool followed_by_shuffled_operator() const override {
+ return (is_shuffled_operator() && !is_colocated_operator()) ||
+ _followed_by_shuffled_operator;
+ }
bool need_finalize_variant_column() const { return
_need_finalize_variant_column; }
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 72fb311ef51..10e780c07f7 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -169,12 +169,14 @@ public:
*/
[[nodiscard]] virtual bool is_shuffled_operator() const { return false; }
/**
- * Return True if this operator is followed by a shuffled operator.
+ * For multiple children's operators, return true if this is a shuffled
operator or this is followed by a shuffled operator (HASH JOIN and SET
OPERATION).
+ *
+ * For single child's operators, return true if this operator is followed
by a shuffled operator.
* For example, in the plan fragment:
* `UNION` -> `SHUFFLED HASH JOIN`
* The `SHUFFLED HASH JOIN` is a shuffled operator so the UNION operator
is followed by a shuffled operator.
*/
- [[nodiscard]] bool followed_by_shuffled_operator() const {
+ [[nodiscard]] virtual bool followed_by_shuffled_operator() const {
return _followed_by_shuffled_operator;
}
/**
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index b744b8f2f7d..167dc1bd00c 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -167,6 +167,9 @@ public:
bool is_colocated_operator() const override {
return _inner_probe_operator->is_colocated_operator();
}
+ bool followed_by_shuffled_operator() const override {
+ return _inner_probe_operator->followed_by_shuffled_operator();
+ }
void update_operator(const TPlanNode& tnode, bool
followed_by_shuffled_operator,
bool require_bucket_distribution) override {
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index e098d071daa..59eed7aac66 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -150,6 +150,9 @@ public:
bool is_shuffled_operator() const override {
return _inner_sink_operator->is_shuffled_operator();
}
+ bool followed_by_shuffled_operator() const override {
+ return _inner_sink_operator->followed_by_shuffled_operator();
+ }
void update_operator(const TPlanNode& tnode, bool
followed_by_shuffled_operator,
bool require_bucket_distribution) override {
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h
b/be/src/pipeline/exec/set_probe_sink_operator.h
index 18cc29c37ce..141459fac5d 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.h
+++ b/be/src/pipeline/exec/set_probe_sink_operator.h
@@ -118,6 +118,10 @@ public:
bool is_shuffled_operator() const override { return true; }
bool is_colocated_operator() const override { return _is_colocate; }
+ bool followed_by_shuffled_operator() const override {
+ return (is_shuffled_operator() && !is_colocated_operator()) ||
+ Base::_followed_by_shuffled_operator;
+ }
private:
void _finalize_probe(SetProbeSinkLocalState<is_intersect>& local_state);
diff --git a/be/src/pipeline/exec/set_sink_operator.h
b/be/src/pipeline/exec/set_sink_operator.h
index c22a09ac07e..6eb18db6bfc 100644
--- a/be/src/pipeline/exec/set_sink_operator.h
+++ b/be/src/pipeline/exec/set_sink_operator.h
@@ -122,6 +122,10 @@ public:
bool is_shuffled_operator() const override { return true; }
bool is_colocated_operator() const override { return _is_colocate; }
+ bool followed_by_shuffled_operator() const override {
+ return (is_shuffled_operator() && !is_colocated_operator()) ||
+ Base::_followed_by_shuffled_operator;
+ }
private:
template <class HashTableContext, bool is_intersected>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]