This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 801c9f4394d [fix](local exchange) Fix BUCKET_HASH_SHUFFLE partition
expr (#60765)
801c9f4394d is described below
commit 801c9f4394d22f731b6df81662eeb5306921b4e8
Author: Gabriel <[email protected]>
AuthorDate: Sat Feb 14 22:11:45 2026 +0800
[fix](local exchange) Fix BUCKET_HASH_SHUFFLE partition expr (#60765)
### What problem does this PR solve?
pick #60764
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
be/src/pipeline/pipeline_fragment_context.cpp | 49 ++++++++++++++-------------
be/src/pipeline/pipeline_fragment_context.h | 11 +++---
2 files changed, 30 insertions(+), 30 deletions(-)
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index eb76dd95ae7..78e961d0111 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -640,7 +640,7 @@ Status
PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const Descrip
int node_idx = 0;
RETURN_IF_ERROR(_create_tree_helper(pool, _params.fragment.plan.nodes,
descs, nullptr,
- &node_idx, root, cur_pipe, 0, false));
+ &node_idx, root, cur_pipe, 0, false,
false));
if (node_idx + 1 != _params.fragment.plan.nodes.size()) {
return Status::InternalError(
@@ -649,12 +649,10 @@ Status
PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const Descrip
return Status::OK();
}
-Status PipelineFragmentContext::_create_tree_helper(ObjectPool* pool,
- const
std::vector<TPlanNode>& tnodes,
- const DescriptorTbl&
descs, OperatorPtr parent,
- int* node_idx,
OperatorPtr* root,
- PipelinePtr& cur_pipe, int
child_idx,
- const bool
followed_by_shuffled_operator) {
+Status PipelineFragmentContext::_create_tree_helper(
+ ObjectPool* pool, const std::vector<TPlanNode>& tnodes, const
DescriptorTbl& descs,
+ OperatorPtr parent, int* node_idx, OperatorPtr* root, PipelinePtr&
cur_pipe, int child_idx,
+ const bool followed_by_shuffled_operator, const bool
require_bucket_distribution) {
// propagate error case
if (*node_idx >= tnodes.size()) {
return Status::InternalError(
@@ -665,10 +663,12 @@ Status
PipelineFragmentContext::_create_tree_helper(ObjectPool* pool,
int num_children = tnodes[*node_idx].num_children;
bool current_followed_by_shuffled_operator = followed_by_shuffled_operator;
+ bool current_require_bucket_distribution = require_bucket_distribution;
OperatorPtr op = nullptr;
RETURN_IF_ERROR(_create_operator(pool, tnodes[*node_idx], descs, op,
cur_pipe,
parent == nullptr ? -1 :
parent->node_id(), child_idx,
- followed_by_shuffled_operator));
+ followed_by_shuffled_operator,
+ current_require_bucket_distribution));
// Initialization must be done here. For example, group by expressions in
agg will be used to
// decide if a local shuffle should be planed, so it must be initialized
here.
RETURN_IF_ERROR(op->init(tnode, _runtime_state.get()));
@@ -700,6 +700,12 @@ Status
PipelineFragmentContext::_create_tree_helper(ObjectPool* pool,
: op->is_shuffled_operator())) &&
Pipeline::is_hash_exchange(required_data_distribution.distribution_type);
+ current_require_bucket_distribution =
+ (require_bucket_distribution ||
+ (cur_pipe->operators().empty() ?
cur_pipe->sink()->is_colocated_operator()
+ : op->is_colocated_operator())) &&
+
Pipeline::is_hash_exchange(required_data_distribution.distribution_type);
+
if (num_children == 0) {
_use_serial_source = op->is_serial_operator();
}
@@ -707,7 +713,8 @@ Status
PipelineFragmentContext::_create_tree_helper(ObjectPool* pool,
for (int i = 0; i < num_children; i++) {
++*node_idx;
RETURN_IF_ERROR(_create_tree_helper(pool, tnodes, descs, op, node_idx,
nullptr, cur_pipe, i,
-
current_followed_by_shuffled_operator));
+
current_followed_by_shuffled_operator,
+
current_require_bucket_distribution));
// we are expecting a child, but have used all nodes
// this means we have been given a bad tree and must fail
@@ -1192,18 +1199,15 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
const DescriptorTbl& descs,
OperatorPtr& op,
PipelinePtr& cur_pipe, int
parent_idx,
int child_idx,
- const bool
followed_by_shuffled_operator) {
+ const bool
followed_by_shuffled_operator,
+ const bool
require_bucket_distribution) {
std::vector<DataSinkOperatorPtr> sink_ops;
Defer defer = Defer([&]() {
if (op) {
- op->update_operator(tnode, followed_by_shuffled_operator,
_require_bucket_distribution);
- _require_bucket_distribution =
- _require_bucket_distribution ||
op->is_colocated_operator();
+ op->update_operator(tnode, followed_by_shuffled_operator,
require_bucket_distribution);
}
for (auto& s : sink_ops) {
- s->update_operator(tnode, followed_by_shuffled_operator,
_require_bucket_distribution);
- _require_bucket_distribution =
- _require_bucket_distribution || s->is_colocated_operator();
+ s->update_operator(tnode, followed_by_shuffled_operator,
require_bucket_distribution);
}
});
// We directly construct the operator from Thrift because the given array
is in the order of preorder traversal.
@@ -1580,15 +1584,13 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
break;
}
case TPlanNodeType::INTERSECT_NODE: {
- RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(
- pool, tnode, descs, op, cur_pipe, parent_idx, child_idx,
- !tnode.intersect_node.is_colocate ||
followed_by_shuffled_operator, sink_ops));
+ RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(pool,
tnode, descs, op,
+
cur_pipe, sink_ops));
break;
}
case TPlanNodeType::EXCEPT_NODE: {
- RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(
- pool, tnode, descs, op, cur_pipe, parent_idx, child_idx,
- !tnode.except_node.is_colocate ||
followed_by_shuffled_operator, sink_ops));
+ RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(pool,
tnode, descs, op,
+
cur_pipe, sink_ops));
break;
}
case TPlanNodeType::REPEAT_NODE: {
@@ -1649,8 +1651,7 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
template <bool is_intersect>
Status PipelineFragmentContext::_build_operators_for_set_operation_node(
ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs,
OperatorPtr& op,
- PipelinePtr& cur_pipe, int parent_idx, int child_idx, bool
followed_by_shuffled_operator,
- std::vector<DataSinkOperatorPtr>& sink_ops) {
+ PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode,
next_operator_id(), descs));
RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index 448f7e9b08d..e4cee3c5876 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -131,17 +131,17 @@ private:
Status _create_tree_helper(ObjectPool* pool, const std::vector<TPlanNode>&
tnodes,
const DescriptorTbl& descs, OperatorPtr parent,
int* node_idx,
OperatorPtr* root, PipelinePtr& cur_pipe, int
child_idx,
- const bool followed_by_shuffled_join);
+ const bool followed_by_shuffled_join,
+ const bool require_bucket_distribution);
Status _create_operator(ObjectPool* pool, const TPlanNode& tnode, const
DescriptorTbl& descs,
OperatorPtr& op, PipelinePtr& cur_pipe, int
parent_idx, int child_idx,
- const bool followed_by_shuffled_join);
+ const bool followed_by_shuffled_join,
+ const bool require_bucket_distribution);
template <bool is_intersect>
Status _build_operators_for_set_operation_node(ObjectPool* pool, const
TPlanNode& tnode,
const DescriptorTbl& descs,
OperatorPtr& op,
- PipelinePtr& cur_pipe, int
parent_idx,
- int child_idx,
- bool
followed_by_shuffled_operator,
+ PipelinePtr& cur_pipe,
std::vector<DataSinkOperatorPtr>& sink_ops);
Status _create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink,
@@ -321,7 +321,6 @@ private:
// Total instance num running on all BEs
int _total_instances = -1;
- bool _require_bucket_distribution = false;
TPipelineFragmentParams _params;
int32_t _parallel_instances = 0;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]