This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new be164cf6155 branch-3.1: [fix](union) Local shuffle for union operator
#56048 #56449 (#56556)
be164cf6155 is described below
commit be164cf61557cb03de4f22e58e0c0536030c20d6
Author: Gabriel <[email protected]>
AuthorDate: Mon Sep 29 18:05:12 2025 +0800
branch-3.1: [fix](union) Local shuffle for union operator #56048 #56449
(#56556)
picked from #56048 #56449
---
be/src/pipeline/exec/union_sink_operator.cpp | 7 ++++++-
be/src/pipeline/exec/union_sink_operator.h | 13 ++++++++++++-
2 files changed, 18 insertions(+), 2 deletions(-)
diff --git a/be/src/pipeline/exec/union_sink_operator.cpp
b/be/src/pipeline/exec/union_sink_operator.cpp
index 8467eeb1d54..cbf5199aac4 100644
--- a/be/src/pipeline/exec/union_sink_operator.cpp
+++ b/be/src/pipeline/exec/union_sink_operator.cpp
@@ -57,7 +57,12 @@ UnionSinkOperatorX::UnionSinkOperatorX(int child_id, int
sink_id, ObjectPool* po
_first_materialized_child_idx(tnode.union_node.first_materialized_child_idx),
_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples),
_cur_child_id(child_id),
- _child_size(tnode.num_children) {}
+ _child_size(tnode.num_children),
+ _distribute_exprs(tnode.__isset.distribute_expr_lists
+ ? tnode.distribute_expr_lists[child_id]
+ : std::vector<TExpr> {}) {
+ DCHECK(!tnode.__isset.distribute_expr_lists ||
tnode.distribute_expr_lists.size() > child_id);
+}
Status UnionSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state));
diff --git a/be/src/pipeline/exec/union_sink_operator.h
b/be/src/pipeline/exec/union_sink_operator.h
index aa94ed9a730..46459a02f47 100644
--- a/be/src/pipeline/exec/union_sink_operator.h
+++ b/be/src/pipeline/exec/union_sink_operator.h
@@ -94,6 +94,16 @@ public:
return _followed_by_shuffled_operator;
}
+ DataDistribution required_data_distribution() const override {
+ if (_child->is_serial_operator() && _followed_by_shuffled_operator) {
+ return DataDistribution(ExchangeType::HASH_SHUFFLE,
_distribute_exprs);
+ }
+ if (_child->is_serial_operator()) {
+ return DataDistribution(ExchangeType::PASSTHROUGH);
+ }
+ return DataDistribution(ExchangeType::NOOP);
+ }
+
bool is_shuffled_operator() const override { return
_followed_by_shuffled_operator; }
private:
@@ -113,6 +123,7 @@ private:
const RowDescriptor _row_descriptor;
const int _cur_child_id;
const int _child_size;
+ const std::vector<TExpr> _distribute_exprs;
int children_count() const { return _child_size; }
bool is_child_passthrough(int child_idx) const {
DCHECK_LT(child_idx, _child_size);
@@ -152,4 +163,4 @@ private:
};
} // namespace pipeline
-} // namespace doris
\ No newline at end of file
+} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]