This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 56fca531f66 [fix](local shuffle) Fix hash shuffle local exchanger
(#40036)
56fca531f66 is described below
commit 56fca531f6664151505ab2e41577c8927dd23c5d
Author: Gabriel <[email protected]>
AuthorDate: Thu Aug 29 16:51:45 2024 +0800
[fix](local shuffle) Fix hash shuffle local exchanger (#40036)
---
be/src/pipeline/exec/aggregation_sink_operator.h | 3 +-
be/src/pipeline/exec/analytic_sink_operator.h | 5 +-
.../exec/distinct_streaming_aggregation_operator.h | 5 +-
be/src/pipeline/exec/hashjoin_build_sink.h | 3 ++
be/src/pipeline/exec/hashjoin_probe_operator.h | 3 ++
be/src/pipeline/exec/operator.h | 6 +++
.../exec/partitioned_aggregation_sink_operator.h | 3 ++
.../exec/partitioned_hash_join_probe_operator.h | 3 ++
.../exec/partitioned_hash_join_sink_operator.h | 3 ++
be/src/pipeline/exec/set_probe_sink_operator.h | 2 +
be/src/pipeline/exec/set_sink_operator.h | 1 +
be/src/pipeline/exec/sort_sink_operator.h | 3 +-
.../local_exchange_sink_operator.cpp | 14 ++---
.../local_exchange/local_exchange_sink_operator.h | 2 +-
be/src/pipeline/local_exchange/local_exchanger.cpp | 22 +-------
be/src/pipeline/pipeline_fragment_context.cpp | 63 ++++++++++++++++++----
be/src/pipeline/pipeline_fragment_context.h | 5 +-
.../nereids_p0/join/test_join_local_shuffle.groovy | 6 ++-
18 files changed, 106 insertions(+), 46 deletions(-)
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h
b/be/src/pipeline/exec/aggregation_sink_operator.h
index 579b9eda1a6..f7b225311a3 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -149,11 +149,12 @@ public:
? DataDistribution(ExchangeType::PASSTHROUGH)
:
DataSinkOperatorX<AggSinkLocalState>::required_data_distribution();
}
- return _is_colocate && _require_bucket_distribution
+ return _is_colocate && _require_bucket_distribution &&
!_followed_by_shuffled_join
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE,
_partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE,
_partition_exprs);
}
bool require_data_distribution() const override { return _is_colocate; }
+ bool require_shuffled_data_distribution() const override { return
!_probe_expr_ctxs.empty(); }
size_t get_revocable_mem_size(RuntimeState* state) const;
AggregatedDataVariants* get_agg_data(RuntimeState* state) {
diff --git a/be/src/pipeline/exec/analytic_sink_operator.h
b/be/src/pipeline/exec/analytic_sink_operator.h
index 47080b82380..6d713996b9c 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.h
+++ b/be/src/pipeline/exec/analytic_sink_operator.h
@@ -82,7 +82,7 @@ public:
if (_partition_by_eq_expr_ctxs.empty()) {
return {ExchangeType::PASSTHROUGH};
} else if (_order_by_eq_expr_ctxs.empty()) {
- return _is_colocate && _require_bucket_distribution
+ return _is_colocate && _require_bucket_distribution &&
!_followed_by_shuffled_join
?
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE,
_partition_exprs);
}
@@ -90,6 +90,9 @@ public:
}
bool require_data_distribution() const override { return true; }
+ bool require_shuffled_data_distribution() const override {
+ return !_partition_by_eq_expr_ctxs.empty() &&
_order_by_eq_expr_ctxs.empty();
+ }
private:
Status _insert_range_column(vectorized::Block* block, const
vectorized::VExprContextSPtr& expr,
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
index d6ff5fde0c5..8ec1d18fd9e 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
@@ -106,7 +106,7 @@ public:
DataDistribution required_data_distribution() const override {
if (_needs_finalize || (!_probe_expr_ctxs.empty() &&
!_is_streaming_preagg)) {
- return _is_colocate && _require_bucket_distribution
+ return _is_colocate && _require_bucket_distribution &&
!_followed_by_shuffled_join
?
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE,
_partition_exprs);
}
@@ -114,6 +114,9 @@ public:
}
bool require_data_distribution() const override { return _is_colocate; }
+ bool require_shuffled_data_distribution() const override {
+ return _needs_finalize || (!_probe_expr_ctxs.empty() &&
!_is_streaming_preagg);
+ }
private:
friend class DistinctStreamingAggLocalState;
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h
b/be/src/pipeline/exec/hashjoin_build_sink.h
index f6681548dbe..4dba04559b4 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -143,6 +143,9 @@ public:
: DataDistribution(ExchangeType::HASH_SHUFFLE,
_partition_exprs);
}
+ bool require_shuffled_data_distribution() const override {
+ return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN &&
!_is_broadcast_join;
+ }
bool is_shuffled_hash_join() const override {
return _join_distribution == TJoinDistributionType::PARTITIONED;
}
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index c1a53c0c1f3..69ab0808be4 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -153,6 +153,9 @@ public:
:
DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs));
}
+ bool require_shuffled_data_distribution() const override {
+ return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN &&
!_is_broadcast_join;
+ }
bool is_shuffled_hash_join() const override {
return _join_distribution == TJoinDistributionType::PARTITIONED;
}
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 9d549690461..abed7fb446a 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -114,11 +114,17 @@ public:
virtual Status revoke_memory(RuntimeState* state) { return Status::OK(); }
[[nodiscard]] virtual bool require_data_distribution() const { return
false; }
OperatorXPtr child_x() { return _child_x; }
+ [[nodiscard]] bool followed_by_shuffled_join() const { return
_followed_by_shuffled_join; }
+ void set_followed_by_shuffled_join(bool followed_by_shuffled_join) {
+ _followed_by_shuffled_join = followed_by_shuffled_join;
+ }
+ [[nodiscard]] virtual bool require_shuffled_data_distribution() const {
return false; }
protected:
OperatorXPtr _child_x = nullptr;
bool _is_closed;
+ bool _followed_by_shuffled_join = false;
};
class PipelineXLocalStateBase {
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
index e5e44498ec0..9282df073cb 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
@@ -312,6 +312,9 @@ public:
bool require_data_distribution() const override {
return _agg_sink_operator->require_data_distribution();
}
+ bool require_shuffled_data_distribution() const override {
+ return _agg_sink_operator->require_shuffled_data_distribution();
+ }
Status set_child(OperatorXPtr child) override {
RETURN_IF_ERROR(DataSinkOperatorX<PartitionedAggSinkLocalState>::set_child(child));
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index 6ee718a3354..a63ddb3e69d 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -166,6 +166,9 @@ public:
_distribution_partition_exprs));
}
+ bool require_shuffled_data_distribution() const override {
+ return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN;
+ }
bool is_shuffled_hash_join() const override {
return _join_distribution == TJoinDistributionType::PARTITIONED;
}
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index 1592c29cdb0..252c53be12d 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -116,6 +116,9 @@ public:
_distribution_partition_exprs);
}
+ bool require_shuffled_data_distribution() const override {
+ return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN;
+ }
bool is_shuffled_hash_join() const override {
return _join_distribution == TJoinDistributionType::PARTITIONED;
}
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h
b/be/src/pipeline/exec/set_probe_sink_operator.h
index 93a862fa1cb..f21d5842581 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.h
+++ b/be/src/pipeline/exec/set_probe_sink_operator.h
@@ -98,6 +98,8 @@ public:
: DataDistribution(ExchangeType::HASH_SHUFFLE,
_partition_exprs);
}
+ bool require_shuffled_data_distribution() const override { return true; }
+
std::shared_ptr<BasicSharedState> create_shared_state() const override {
return nullptr; }
private:
diff --git a/be/src/pipeline/exec/set_sink_operator.h
b/be/src/pipeline/exec/set_sink_operator.h
index 09a1fa09e7c..ac0757e4467 100644
--- a/be/src/pipeline/exec/set_sink_operator.h
+++ b/be/src/pipeline/exec/set_sink_operator.h
@@ -95,6 +95,7 @@ public:
return _is_colocate ?
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE,
_partition_exprs);
}
+ bool require_shuffled_data_distribution() const override { return true; }
private:
template <class HashTableContext, bool is_intersected>
diff --git a/be/src/pipeline/exec/sort_sink_operator.h
b/be/src/pipeline/exec/sort_sink_operator.h
index b842a56f2ad..3188bfe3990 100644
--- a/be/src/pipeline/exec/sort_sink_operator.h
+++ b/be/src/pipeline/exec/sort_sink_operator.h
@@ -64,7 +64,7 @@ public:
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos)
override;
DataDistribution required_data_distribution() const override {
if (_is_analytic_sort) {
- return _is_colocate && _require_bucket_distribution
+ return _is_colocate && _require_bucket_distribution &&
!_followed_by_shuffled_join
?
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE,
_partition_exprs);
} else if (_merge_by_exchange) {
@@ -73,6 +73,7 @@ public:
}
return
DataSinkOperatorX<SortSinkLocalState>::required_data_distribution();
}
+ bool require_shuffled_data_distribution() const override { return
_is_analytic_sort; }
bool require_data_distribution() const override { return _is_colocate; }
size_t get_revocable_mem_size(RuntimeState* state) const;
diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
index f0a51696075..98b1a719a49 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
@@ -36,16 +36,16 @@ std::vector<Dependency*>
LocalExchangeSinkLocalState::dependencies() const {
}
Status LocalExchangeSinkOperatorX::init(ExchangeType type, const int
num_buckets,
- const bool is_shuffled_hash_join,
+ const bool
should_disable_bucket_shuffle,
const std::map<int, int>&
shuffle_idx_to_instance_idx) {
_name = "LOCAL_EXCHANGE_SINK_OPERATOR (" + get_exchange_type_name(type) +
")";
_type = type;
- if (_type == ExchangeType::HASH_SHUFFLE || _type ==
ExchangeType::BUCKET_HASH_SHUFFLE) {
+ if (_type == ExchangeType::HASH_SHUFFLE) {
// For shuffle join, if data distribution has been broken by previous
operator, we
// should use a HASH_SHUFFLE local exchanger to shuffle data again. To
be mentioned,
// we should use map shuffle idx to instance idx because all instances
will be
// distributed to all BEs. Otherwise, we should use shuffle idx
directly.
- if (is_shuffled_hash_join) {
+ if (should_disable_bucket_shuffle) {
std::for_each(shuffle_idx_to_instance_idx.begin(),
shuffle_idx_to_instance_idx.end(),
[&](const auto& item) {
DCHECK(item.first != -1);
@@ -58,9 +58,11 @@ Status LocalExchangeSinkOperatorX::init(ExchangeType type,
const int num_buckets
}
}
_partitioner.reset(new
vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>(
- _type == ExchangeType::HASH_SHUFFLE ||
_bucket_seq_to_instance_idx.empty()
- ? _num_partitions
- : num_buckets));
+ _num_partitions));
+ RETURN_IF_ERROR(_partitioner->init(_texprs));
+ } else if (_type == ExchangeType::BUCKET_HASH_SHUFFLE) {
+ _partitioner.reset(
+ new
vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>(num_buckets));
RETURN_IF_ERROR(_partitioner->init(_texprs));
}
return Status::OK();
diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
index faa48d209f4..e0e7688307c 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
@@ -102,7 +102,7 @@ public:
return Status::InternalError("{} should not init with TPlanNode",
Base::_name);
}
- Status init(ExchangeType type, const int num_buckets, const bool
is_shuffled_hash_join,
+ Status init(ExchangeType type, const int num_buckets, const bool
should_disable_bucket_shuffle,
const std::map<int, int>& shuffle_idx_to_instance_idx)
override;
Status prepare(RuntimeState* state) override;
diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp
b/be/src/pipeline/local_exchange/local_exchanger.cpp
index 1bcd9f34ba8..f4630f328bb 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/local_exchange/local_exchanger.cpp
@@ -239,28 +239,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state,
const uint32_t* __rest
new_block_wrapper->unref(local_state._shared_state,
local_state._channel_id);
}
}
- } else if (bucket_seq_to_instance_idx.empty()) {
- /**
- * If type is `BUCKET_HASH_SHUFFLE` and `_bucket_seq_to_instance_idx`
is empty, which
- * means no scan operators is included in this fragment so we also
need a `HASH_SHUFFLE` here.
- */
- const auto& map =
local_state._parent->cast<LocalExchangeSinkOperatorX>()
- ._shuffle_idx_to_instance_idx;
- DCHECK(!map.empty());
- new_block_wrapper->ref(map.size());
- for (const auto& it : map) {
- DCHECK(it.second >= 0 && it.second < _num_partitions)
- << it.first << " : " << it.second << " " <<
_num_partitions;
- uint32_t start = local_state._partition_rows_histogram[it.first];
- uint32_t size = local_state._partition_rows_histogram[it.first +
1] - start;
- if (size > 0) {
- _enqueue_data_and_set_ready(it.second, local_state,
- {new_block_wrapper, {row_idx,
start, size}});
- } else {
- new_block_wrapper->unref(local_state._shared_state,
local_state._channel_id);
- }
- }
} else {
+ DCHECK(!bucket_seq_to_instance_idx.empty());
new_block_wrapper->ref(_num_partitions);
for (size_t i = 0; i < _num_partitions; i++) {
uint32_t start = local_state._partition_rows_histogram[i];
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index c3942e8286e..3b94b258d31 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -616,7 +616,7 @@ Status
PipelineFragmentContext::_build_pipelines(ObjectPool* pool,
int node_idx = 0;
RETURN_IF_ERROR(_create_tree_helper(pool, request.fragment.plan.nodes,
request, descs, nullptr,
- &node_idx, root, cur_pipe, 0));
+ &node_idx, root, cur_pipe, 0, false));
if (node_idx + 1 != request.fragment.plan.nodes.size()) {
return Status::InternalError(
@@ -630,7 +630,8 @@ Status
PipelineFragmentContext::_create_tree_helper(ObjectPool* pool,
const
doris::TPipelineFragmentParams& request,
const DescriptorTbl&
descs, OperatorXPtr parent,
int* node_idx,
OperatorXPtr* root,
- PipelinePtr& cur_pipe, int
child_idx) {
+ PipelinePtr& cur_pipe, int
child_idx,
+ const bool
followed_by_shuffled_join) {
// propagate error case
if (*node_idx >= tnodes.size()) {
return Status::InternalError(
@@ -640,9 +641,11 @@ Status
PipelineFragmentContext::_create_tree_helper(ObjectPool* pool,
const TPlanNode& tnode = tnodes[*node_idx];
int num_children = tnodes[*node_idx].num_children;
+ bool current_followed_by_shuffled_join = followed_by_shuffled_join;
OperatorXPtr op = nullptr;
RETURN_IF_ERROR(_create_operator(pool, tnodes[*node_idx], request, descs,
op, cur_pipe,
- parent == nullptr ? -1 :
parent->node_id(), child_idx));
+ parent == nullptr ? -1 :
parent->node_id(), child_idx,
+ followed_by_shuffled_join));
// assert(parent != nullptr || (node_idx == 0 && root_expr != nullptr));
if (parent != nullptr) {
@@ -651,12 +654,30 @@ Status
PipelineFragmentContext::_create_tree_helper(ObjectPool* pool,
} else {
*root = op;
}
+ /**
+ * `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed
by a shuffled hash join.
+ *
+ * For plan:
+ * LocalExchange(id=0) -> Aggregation(id=1) -> ShuffledHashJoin(id=2)
+ * Exchange(id=3) -> ShuffledHashJoinBuild(id=2)
+ * We must ensure data distribution of `LocalExchange(id=0)` is same as
Exchange(id=3).
+ *
+ * If an operator's is followed by a local exchange without shuffle (e.g.
passthrough), a
+ * shuffled local exchanger will be used before join so it is not followed
by shuffle join.
+ */
+ auto require_shuffled_data_distribution =
+ cur_pipe->operator_xs().empty()
+ ? cur_pipe->sink_x()->require_shuffled_data_distribution()
+ : op->require_shuffled_data_distribution();
+ current_followed_by_shuffled_join =
+ (followed_by_shuffled_join || op->is_shuffled_hash_join()) &&
+ require_shuffled_data_distribution;
// rely on that tnodes is preorder of the plan
for (int i = 0; i < num_children; i++) {
++*node_idx;
RETURN_IF_ERROR(_create_tree_helper(pool, tnodes, request, descs, op,
node_idx, nullptr,
- cur_pipe, i));
+ cur_pipe, i,
current_followed_by_shuffled_join));
// we are expecting a child, but have used all nodes
// this means we have been given a bad tree and must fail
@@ -692,15 +713,30 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
// 1. Create a new pipeline with local exchange sink.
DataSinkOperatorXPtr sink;
auto sink_id = next_sink_operator_id();
- const bool is_shuffled_hash_join = operator_xs.size() > idx
- ?
operator_xs[idx]->is_shuffled_hash_join()
- :
cur_pipe->sink_x()->is_shuffled_hash_join();
+
+ /**
+ * `bucket_seq_to_instance_idx` is empty if no scan operator is contained
in this fragment.
+ * So co-located operators(e.g. Agg, Analytic) should use `HASH_SHUFFLE`
instead of `BUCKET_HASH_SHUFFLE`.
+ */
+ const bool followed_by_shuffled_join =
+ operator_xs.size() > idx ?
operator_xs[idx]->followed_by_shuffled_join()
+ :
cur_pipe->sink_x()->followed_by_shuffled_join();
+ const bool should_disable_bucket_shuffle =
+ bucket_seq_to_instance_idx.empty() &&
+ shuffle_idx_to_instance_idx.find(-1) ==
shuffle_idx_to_instance_idx.end() &&
+ followed_by_shuffled_join;
sink.reset(new LocalExchangeSinkOperatorX(
- sink_id, local_exchange_id, is_shuffled_hash_join ?
_total_instances : _num_instances,
+ sink_id, local_exchange_id,
+ should_disable_bucket_shuffle ? _total_instances : _num_instances,
data_distribution.partition_exprs, bucket_seq_to_instance_idx));
+ if (should_disable_bucket_shuffle &&
+ data_distribution.distribution_type ==
ExchangeType::BUCKET_HASH_SHUFFLE) {
+ data_distribution.distribution_type = ExchangeType::HASH_SHUFFLE;
+ }
RETURN_IF_ERROR(new_pip->set_sink(sink));
RETURN_IF_ERROR(new_pip->sink_x()->init(data_distribution.distribution_type,
num_buckets,
- is_shuffled_hash_join,
shuffle_idx_to_instance_idx));
+ should_disable_bucket_shuffle,
+ shuffle_idx_to_instance_idx));
// 2. Create and initialize LocalExchangeSharedState.
std::shared_ptr<LocalExchangeSharedState> shared_state =
@@ -711,7 +747,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
case ExchangeType::HASH_SHUFFLE:
shared_state->exchanger = ShuffleExchanger::create_unique(
std::max(cur_pipe->num_tasks(), _num_instances),
- is_shuffled_hash_join ? _total_instances : _num_instances,
+ should_disable_bucket_shuffle ? _total_instances :
_num_instances,
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
?
_runtime_state->query_options().local_exchange_free_blocks_limit
: 0);
@@ -1123,7 +1159,8 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
const
doris::TPipelineFragmentParams& request,
const DescriptorTbl& descs,
OperatorXPtr& op,
PipelinePtr& cur_pipe, int
parent_idx,
- int child_idx) {
+ int child_idx,
+ const bool
followed_by_shuffled_join) {
// We directly construct the operator from Thrift because the given array
is in the order of preorder traversal.
// Therefore, here we need to use a stack-like structure.
_pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx);
@@ -1214,6 +1251,7 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
!tnode.agg_node.grouping_exprs.empty() && !group_by_limit_opt) {
op.reset(new DistinctStreamingAggOperatorX(pool,
next_operator_id(), tnode, descs,
_require_bucket_distribution));
+ op->set_followed_by_shuffled_join(followed_by_shuffled_join);
_require_bucket_distribution =
_require_bucket_distribution ||
op->require_data_distribution();
RETURN_IF_ERROR(cur_pipe->add_operator(op));
@@ -1245,6 +1283,7 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
sink.reset(new AggSinkOperatorX(pool, next_sink_operator_id(),
tnode, descs,
_require_bucket_distribution));
}
+ sink->set_followed_by_shuffled_join(followed_by_shuffled_join);
_require_bucket_distribution =
_require_bucket_distribution ||
sink->require_data_distribution();
sink->set_dests_id({op->operator_id()});
@@ -1387,6 +1426,7 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
sink.reset(new SortSinkOperatorX(pool, next_sink_operator_id(),
tnode, descs,
_require_bucket_distribution));
}
+ sink->set_followed_by_shuffled_join(followed_by_shuffled_join);
_require_bucket_distribution =
_require_bucket_distribution ||
sink->require_data_distribution();
sink->set_dests_id({op->operator_id()});
@@ -1426,6 +1466,7 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
DataSinkOperatorXPtr sink;
sink.reset(new AnalyticSinkOperatorX(pool, next_sink_operator_id(),
tnode, descs,
_require_bucket_distribution));
+ sink->set_followed_by_shuffled_join(followed_by_shuffled_join);
_require_bucket_distribution =
_require_bucket_distribution ||
sink->require_data_distribution();
sink->set_dests_id({op->operator_id()});
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index 7597c3ce9b5..06c88267441 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -145,12 +145,13 @@ private:
Status _create_tree_helper(ObjectPool* pool, const std::vector<TPlanNode>&
tnodes,
const doris::TPipelineFragmentParams& request,
const DescriptorTbl& descs, OperatorXPtr
parent, int* node_idx,
- OperatorXPtr* root, PipelinePtr& cur_pipe, int
child_idx);
+ OperatorXPtr* root, PipelinePtr& cur_pipe, int
child_idx,
+ const bool followed_by_shuffled_join);
Status _create_operator(ObjectPool* pool, const TPlanNode& tnode,
const doris::TPipelineFragmentParams& request,
const DescriptorTbl& descs, OperatorXPtr& op,
PipelinePtr& cur_pipe,
- int parent_idx, int child_idx);
+ int parent_idx, int child_idx, const bool
followed_by_shuffled_join);
template <bool is_intersect>
Status _build_operators_for_set_operation_node(ObjectPool* pool, const
TPlanNode& tnode,
const DescriptorTbl& descs,
OperatorXPtr& op,
diff --git
a/regression-test/suites/nereids_p0/join/test_join_local_shuffle.groovy
b/regression-test/suites/nereids_p0/join/test_join_local_shuffle.groovy
index c66131b57dc..29fe192e2b5 100644
--- a/regression-test/suites/nereids_p0/join/test_join_local_shuffle.groovy
+++ b/regression-test/suites/nereids_p0/join/test_join_local_shuffle.groovy
@@ -16,6 +16,10 @@
// under the License.
suite("test_join_local_shuffle", "query,p0") {
+ sql "DROP TABLE IF EXISTS test_join_local_shuffle_1;"
+ sql "DROP TABLE IF EXISTS test_join_local_shuffle_2;"
+ sql "DROP TABLE IF EXISTS test_join_local_shuffle_3;"
+ sql "DROP TABLE IF EXISTS test_join_local_shuffle_4;"
sql "SET enable_nereids_planner=true"
sql "SET enable_fallback_to_original_planner=false"
sql """
@@ -72,7 +76,7 @@ suite("test_join_local_shuffle", "query,p0") {
sql "insert into test_join_local_shuffle_2 values(2, 0);"
sql "insert into test_join_local_shuffle_3 values(2, 0);"
sql "insert into test_join_local_shuffle_4 values(0, 1);"
- qt_sql " select
/*+SET_VAR(disable_join_reorder=true,enable_local_shuffle=true) */ * from
(select c1, max(c2) from (select b.c1 c1, b.c2 c2 from
test_join_local_shuffle_3 a join [shuffle] test_join_local_shuffle_1 b on a.c2
= b.c1 join [broadcast] test_join_local_shuffle_4 c on b.c1 = c.c1) t1 group by
c1) t, test_join_local_shuffle_2 where t.c1 = test_join_local_shuffle_2.c2; "
+ qt_sql " select
/*+SET_VAR(disable_join_reorder=true,enable_local_shuffle=true) */ * from
(select c1, max(c2) from (select b.c1 c1, b.c2 c2 from
test_join_local_shuffle_3 a join [shuffle] test_join_local_shuffle_1 b on a.c2
= b.c1 join [broadcast] test_join_local_shuffle_4 c on b.c1 = c.c1) t1 group by
c1) t join [shuffle] test_join_local_shuffle_2 on t.c1 =
test_join_local_shuffle_2.c2; "
sql "DROP TABLE IF EXISTS test_join_local_shuffle_1;"
sql "DROP TABLE IF EXISTS test_join_local_shuffle_2;"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]