This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new eb1c43ef751 [fix](union) Fix union operator (#60334)
eb1c43ef751 is described below
commit eb1c43ef751528cbbed28864ac6360331291a639
Author: Gabriel <[email protected]>
AuthorDate: Mon Feb 2 16:08:06 2026 +0800
[fix](union) Fix union operator (#60334)
---
be/src/pipeline/exec/aggregation_sink_operator.cpp | 36 ++---
be/src/pipeline/exec/aggregation_sink_operator.h | 17 ++-
be/src/pipeline/exec/analytic_sink_operator.cpp | 7 +-
be/src/pipeline/exec/analytic_sink_operator.h | 19 ++-
.../distinct_streaming_aggregation_operator.cpp | 9 +-
.../exec/distinct_streaming_aggregation_operator.h | 20 ++-
be/src/pipeline/exec/hashjoin_build_sink.h | 12 +-
be/src/pipeline/exec/hashjoin_probe_operator.h | 12 +-
be/src/pipeline/exec/operator.cpp | 14 +-
be/src/pipeline/exec/operator.h | 46 +++++--
.../exec/partitioned_aggregation_sink_operator.cpp | 7 +-
.../exec/partitioned_aggregation_sink_operator.h | 16 ++-
.../partitioned_aggregation_source_operator.cpp | 20 +++
.../exec/partitioned_aggregation_source_operator.h | 6 +
.../exec/partitioned_hash_join_probe_operator.h | 17 ++-
.../exec/partitioned_hash_join_sink_operator.h | 17 ++-
be/src/pipeline/exec/set_probe_sink_operator.h | 3 +-
be/src/pipeline/exec/set_sink_operator.h | 3 +-
be/src/pipeline/exec/set_source_operator.h | 14 +-
be/src/pipeline/exec/sort_sink_operator.cpp | 4 +-
be/src/pipeline/exec/sort_sink_operator.h | 10 +-
be/src/pipeline/exec/spill_sort_sink_operator.cpp | 7 +-
be/src/pipeline/exec/spill_sort_sink_operator.h | 14 +-
.../exec/streaming_aggregation_operator.cpp | 39 +++---
.../pipeline/exec/streaming_aggregation_operator.h | 6 +-
be/src/pipeline/exec/union_sink_operator.h | 18 +--
be/src/pipeline/exec/union_source_operator.h | 12 +-
be/src/pipeline/pipeline_fragment_context.cpp | 151 +++++++++------------
be/src/pipeline/pipeline_fragment_context.h | 3 +-
.../operator/hashjoin_probe_operator_test.cpp | 4 +-
.../partitioned_aggregation_test_helper.cpp | 4 +-
.../operator/partitioned_aggregation_test_helper.h | 2 +-
.../operator/spill_sort_sink_operator_test.cpp | 4 +-
.../pipeline/operator/spill_sort_test_helper.cpp | 2 +-
34 files changed, 331 insertions(+), 244 deletions(-)
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index ddb4bbfbe6f..b7b05918d2a 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -724,8 +724,7 @@ size_t
AggSinkLocalState::get_reserve_mem_size(RuntimeState* state, bool eos) co
// TODO: Tricky processing if `multi_distinct_` exists which will be re-planed
by optimizer.
AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int operator_id, int
dest_id,
- const TPlanNode& tnode, const
DescriptorTbl& descs,
- bool require_bucket_distribution)
+ const TPlanNode& tnode, const
DescriptorTbl& descs)
: DataSinkOperatorX<AggSinkLocalState>(operator_id, tnode, dest_id),
_intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id),
_output_tuple_id(tnode.agg_node.output_tuple_id),
@@ -736,24 +735,27 @@ AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int
operator_id, int dest_i
_limit(tnode.limit),
_have_conjuncts((tnode.__isset.vconjunct &&
!tnode.vconjunct.nodes.empty()) ||
(tnode.__isset.conjuncts &&
!tnode.conjuncts.empty())),
- _partition_exprs(
- tnode.__isset.distribute_expr_lists &&
- (require_bucket_distribution ||
- std::any_of(
-
tnode.agg_node.aggregate_functions.begin(),
-
tnode.agg_node.aggregate_functions.end(),
- [](const TExpr& texpr) -> bool {
- return texpr.nodes[0]
-
.fn.name.function_name.starts_with(
- vectorized::
-
DISTINCT_FUNCTION_PREFIX);
- }))
- ? tnode.distribute_expr_lists[0]
- : tnode.agg_node.grouping_exprs),
_is_colocate(tnode.agg_node.__isset.is_colocate &&
tnode.agg_node.is_colocate),
- _require_bucket_distribution(require_bucket_distribution),
_agg_fn_output_row_descriptor(descs, tnode.row_tuples) {}
+void AggSinkOperatorX::update_operator(const TPlanNode& tnode, bool
followed_by_shuffled_operator,
+ bool require_bucket_distribution) {
+ _followed_by_shuffled_operator = followed_by_shuffled_operator;
+ _require_bucket_distribution = require_bucket_distribution;
+ _partition_exprs =
+ tnode.__isset.distribute_expr_lists &&
+ (_followed_by_shuffled_operator ||
+ std::any_of(
+
tnode.agg_node.aggregate_functions.begin(),
+ tnode.agg_node.aggregate_functions.end(),
+ [](const TExpr& texpr) -> bool {
+ return
texpr.nodes[0].fn.name.function_name.starts_with(
+
vectorized::DISTINCT_FUNCTION_PREFIX);
+ }))
+ ? tnode.distribute_expr_lists[0]
+ : tnode.agg_node.grouping_exprs;
+}
+
Status AggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX<AggSinkLocalState>::init(tnode, state));
// ignore return status for now , so we need to introduce ExecNode::init()
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h
b/be/src/pipeline/exec/aggregation_sink_operator.h
index 6946578109d..d85308a7cb2 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -131,14 +131,11 @@ protected:
class AggSinkOperatorX MOCK_REMOVE(final) : public
DataSinkOperatorX<AggSinkLocalState> {
public:
AggSinkOperatorX(ObjectPool* pool, int operator_id, int dest_id, const
TPlanNode& tnode,
- const DescriptorTbl& descs, bool
require_bucket_distribution);
+ const DescriptorTbl& descs);
#ifdef BE_TEST
AggSinkOperatorX()
- : DataSinkOperatorX<AggSinkLocalState>(1, 0, 2),
- _is_first_phase(),
- _is_colocate(),
- _require_bucket_distribution() {}
+ : DataSinkOperatorX<AggSinkLocalState>(1, 0, 2),
_is_first_phase(), _is_colocate() {}
#endif
~AggSinkOperatorX() override = default;
@@ -148,6 +145,8 @@ public:
}
Status init(const TPlanNode& tnode, RuntimeState* state) override;
+ void update_operator(const TPlanNode& tnode, bool
followed_by_shuffled_operator,
+ bool require_bucket_distribution) override;
Status prepare(RuntimeState* state) override;
@@ -160,11 +159,12 @@ public:
:
DataSinkOperatorX<AggSinkLocalState>::required_data_distribution(
state);
}
- return _is_colocate && _require_bucket_distribution &&
!_followed_by_shuffled_operator
+ return _is_colocate && _require_bucket_distribution
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE,
_partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE,
_partition_exprs);
}
- bool require_data_distribution() const override { return _is_colocate; }
+ bool is_colocated_operator() const override { return _is_colocate; }
+ bool is_shuffled_operator() const override { return
!_partition_exprs.empty(); }
size_t get_revocable_mem_size(RuntimeState* state) const;
AggregatedDataVariants* get_agg_data(RuntimeState* state) {
@@ -221,9 +221,8 @@ protected:
std::vector<int> _null_directions;
bool _have_conjuncts;
- const std::vector<TExpr> _partition_exprs;
+ std::vector<TExpr> _partition_exprs;
const bool _is_colocate;
- const bool _require_bucket_distribution;
RowDescriptor _agg_fn_output_row_descriptor;
};
diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp
b/be/src/pipeline/exec/analytic_sink_operator.cpp
index a2f30a30337..33ba9053103 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.cpp
+++ b/be/src/pipeline/exec/analytic_sink_operator.cpp
@@ -632,8 +632,7 @@ int64_t
AnalyticSinkLocalState::find_first_not_equal(vectorized::IColumn* refere
}
AnalyticSinkOperatorX::AnalyticSinkOperatorX(ObjectPool* pool, int
operator_id, int dest_id,
- const TPlanNode& tnode, const
DescriptorTbl& descs,
- bool require_bucket_distribution)
+ const TPlanNode& tnode, const
DescriptorTbl& descs)
: DataSinkOperatorX(operator_id, tnode, dest_id),
_pool(pool),
_intermediate_tuple_id(tnode.analytic_node.intermediate_tuple_id),
@@ -642,10 +641,6 @@ AnalyticSinkOperatorX::AnalyticSinkOperatorX(ObjectPool*
pool, int operator_id,
? tnode.analytic_node.buffered_tuple_id
: 0),
_is_colocate(tnode.analytic_node.__isset.is_colocate &&
tnode.analytic_node.is_colocate),
- _require_bucket_distribution(require_bucket_distribution),
- _partition_exprs(tnode.__isset.distribute_expr_lists &&
require_bucket_distribution
- ? tnode.distribute_expr_lists[0]
- : tnode.analytic_node.partition_exprs),
_window(tnode.analytic_node.window),
_has_window(tnode.analytic_node.__isset.window),
_has_range_window(tnode.analytic_node.window.type ==
TAnalyticWindowType::RANGE),
diff --git a/be/src/pipeline/exec/analytic_sink_operator.h
b/be/src/pipeline/exec/analytic_sink_operator.h
index 4815102f6ff..c4168a33c4a 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.h
+++ b/be/src/pipeline/exec/analytic_sink_operator.h
@@ -180,7 +180,7 @@ private:
class AnalyticSinkOperatorX final : public
DataSinkOperatorX<AnalyticSinkLocalState> {
public:
AnalyticSinkOperatorX(ObjectPool* pool, int operator_id, int dest_id,
const TPlanNode& tnode,
- const DescriptorTbl& descs, bool
require_bucket_distribution);
+ const DescriptorTbl& descs);
#ifdef BE_TEST
AnalyticSinkOperatorX(ObjectPool* pool)
@@ -189,7 +189,6 @@ public:
_output_tuple_id(0),
_buffered_tuple_id(0),
_is_colocate(false),
- _require_bucket_distribution(false),
_has_window(false),
_has_range_window(false),
_has_window_start(false),
@@ -202,6 +201,14 @@ public:
}
Status init(const TPlanNode& tnode, RuntimeState* state) override;
+ void update_operator(const TPlanNode& tnode, bool
followed_by_shuffled_operator,
+ bool require_bucket_distribution) override {
+ _followed_by_shuffled_operator = followed_by_shuffled_operator;
+ _require_bucket_distribution = require_bucket_distribution;
+ _partition_exprs = tnode.__isset.distribute_expr_lists &&
_followed_by_shuffled_operator
+ ? tnode.distribute_expr_lists[0]
+ : tnode.analytic_node.partition_exprs;
+ }
Status prepare(RuntimeState* state) override;
@@ -210,13 +217,14 @@ public:
if (_partition_by_eq_expr_ctxs.empty()) {
return {ExchangeType::PASSTHROUGH};
} else {
- return _is_colocate && _require_bucket_distribution &&
!_followed_by_shuffled_operator
+ return _is_colocate && _require_bucket_distribution
?
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE,
_partition_exprs);
}
}
- bool require_data_distribution() const override { return true; }
+ bool is_colocated_operator() const override { return _is_colocate; }
+ bool is_shuffled_operator() const override { return
!_partition_by_eq_expr_ctxs.empty(); }
size_t get_reserve_mem_size(RuntimeState* state, bool eos) override;
@@ -243,8 +251,7 @@ private:
const TTupleId _buffered_tuple_id;
const bool _is_colocate;
- const bool _require_bucket_distribution;
- const std::vector<TExpr> _partition_exprs;
+ std::vector<TExpr> _partition_exprs;
TAnalyticWindow _window;
bool _has_window;
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
index 6363355db73..9cafe75ec76 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
@@ -321,17 +321,12 @@ void
DistinctStreamingAggLocalState::_emplace_into_hash_table_to_distinct(
DistinctStreamingAggOperatorX::DistinctStreamingAggOperatorX(ObjectPool* pool,
int operator_id,
const TPlanNode&
tnode,
- const
DescriptorTbl& descs,
- bool
require_bucket_distribution)
+ const
DescriptorTbl& descs)
: StatefulOperatorX<DistinctStreamingAggLocalState>(pool, tnode,
operator_id, descs),
_output_tuple_id(tnode.agg_node.output_tuple_id),
_needs_finalize(tnode.agg_node.need_finalize),
_is_first_phase(tnode.agg_node.__isset.is_first_phase &&
tnode.agg_node.is_first_phase),
- _partition_exprs(tnode.__isset.distribute_expr_lists &&
require_bucket_distribution
- ? tnode.distribute_expr_lists[0]
- : tnode.agg_node.grouping_exprs),
- _is_colocate(tnode.agg_node.__isset.is_colocate &&
tnode.agg_node.is_colocate),
- _require_bucket_distribution(require_bucket_distribution) {
+ _is_colocate(tnode.agg_node.__isset.is_colocate &&
tnode.agg_node.is_colocate) {
if (tnode.agg_node.__isset.use_streaming_preaggregation) {
_is_streaming_preagg = tnode.agg_node.use_streaming_preaggregation;
if (_is_streaming_preagg) {
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
index b0baf64ff96..805223aa8d6 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
@@ -93,17 +93,24 @@ class DistinctStreamingAggOperatorX final
: public StatefulOperatorX<DistinctStreamingAggLocalState> {
public:
DistinctStreamingAggOperatorX(ObjectPool* pool, int operator_id, const
TPlanNode& tnode,
- const DescriptorTbl& descs, bool
require_bucket_distribution);
+ const DescriptorTbl& descs);
#ifdef BE_TEST
DistinctStreamingAggOperatorX()
: _needs_finalize(false),
_is_first_phase(true),
_partition_exprs({}),
- _is_colocate(false),
- _require_bucket_distribution {false} {}
+ _is_colocate(false) {}
#endif
Status init(const TPlanNode& tnode, RuntimeState* state) override;
+ void update_operator(const TPlanNode& tnode, bool
followed_by_shuffled_operator,
+ bool require_bucket_distribution) override {
+ _followed_by_shuffled_operator = followed_by_shuffled_operator;
+ _require_bucket_distribution = require_bucket_distribution;
+ _partition_exprs = tnode.__isset.distribute_expr_lists &&
_followed_by_shuffled_operator
+ ? tnode.distribute_expr_lists[0]
+ : tnode.agg_node.grouping_exprs;
+ }
Status prepare(RuntimeState* state) override;
Status pull(RuntimeState* state, vectorized::Block* block, bool* eos)
const override;
Status push(RuntimeState* state, vectorized::Block* input_block, bool eos)
const override;
@@ -114,14 +121,14 @@ public:
return {ExchangeType::NOOP};
}
if (_needs_finalize || (!_probe_expr_ctxs.empty() &&
!_is_streaming_preagg)) {
- return _is_colocate && _require_bucket_distribution &&
!_followed_by_shuffled_operator
+ return _is_colocate && _require_bucket_distribution
?
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE,
_partition_exprs);
}
return {ExchangeType::PASSTHROUGH};
}
- bool require_data_distribution() const override { return _is_colocate; }
+ bool is_colocated_operator() const override { return _is_colocate; }
private:
friend class DistinctStreamingAggLocalState;
@@ -130,9 +137,8 @@ private:
TupleDescriptor* _output_tuple_desc = nullptr;
const bool _needs_finalize;
const bool _is_first_phase;
- const std::vector<TExpr> _partition_exprs;
+ std::vector<TExpr> _partition_exprs;
const bool _is_colocate;
- const bool _require_bucket_distribution;
// group by k1,k2
vectorized::VExprContextSPtrs _probe_expr_ctxs;
std::vector<size_t> _make_nullable_keys;
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h
b/be/src/pipeline/exec/hashjoin_build_sink.h
index 45ab687a631..a08f80063e7 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -140,11 +140,13 @@ public:
}
bool is_shuffled_operator() const override {
- return _join_distribution == TJoinDistributionType::PARTITIONED;
+ return _join_distribution == TJoinDistributionType::PARTITIONED ||
+ _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE ||
+ _join_distribution == TJoinDistributionType::COLOCATE;
}
- bool require_data_distribution() const override {
- return _join_distribution != TJoinDistributionType::BROADCAST &&
- _join_distribution != TJoinDistributionType::NONE;
+ bool is_colocated_operator() const override {
+ return _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE ||
+ _join_distribution == TJoinDistributionType::COLOCATE;
}
std::vector<bool>& is_null_safe_eq_join() { return _is_null_safe_eq_join; }
@@ -161,7 +163,7 @@ private:
std::vector<bool> _is_null_safe_eq_join;
bool _is_broadcast_join = false;
- const std::vector<TExpr> _partition_exprs;
+ std::vector<TExpr> _partition_exprs;
std::vector<SlotId> _hash_output_slot_ids;
std::vector<bool> _should_keep_column_flags;
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index a0ad3e56c8b..4b455fadcab 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -147,11 +147,13 @@ public:
bool is_broadcast_join() const { return _is_broadcast_join; }
bool is_shuffled_operator() const override {
- return _join_distribution == TJoinDistributionType::PARTITIONED;
+ return _join_distribution == TJoinDistributionType::PARTITIONED ||
+ _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE ||
+ _join_distribution == TJoinDistributionType::COLOCATE;
}
- bool require_data_distribution() const override {
- return _join_distribution != TJoinDistributionType::BROADCAST &&
- _join_distribution != TJoinDistributionType::NONE;
+ bool is_colocated_operator() const override {
+ return _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE ||
+ _join_distribution == TJoinDistributionType::COLOCATE;
}
bool need_finalize_variant_column() const { return
_need_finalize_variant_column; }
@@ -192,7 +194,7 @@ private:
bool _need_finalize_variant_column = false;
std::set<int> _should_not_lazy_materialized_column_ids;
std::vector<std::string> _right_table_column_names;
- const std::vector<TExpr> _partition_exprs;
+ std::vector<TExpr> _partition_exprs;
// Index of column(slot) from right table in the `_intermediate_row_desc`.
size_t _right_col_idx;
diff --git a/be/src/pipeline/exec/operator.cpp
b/be/src/pipeline/exec/operator.cpp
index cf6327bcf0c..7258b4ca028 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -145,10 +145,6 @@ DataDistribution
OperatorBase::required_data_distribution(RuntimeState* /*state*
: DataDistribution(ExchangeType::NOOP);
}
-bool OperatorBase::require_shuffled_data_distribution(RuntimeState* state)
const {
- return
Pipeline::is_hash_exchange(required_data_distribution(state).distribution_type);
-}
-
const RowDescriptor& OperatorBase::row_desc() const {
return _child->row_desc();
}
@@ -566,6 +562,11 @@ Status
PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, LocalState
_exec_timer = ADD_TIMER_WITH_LEVEL(_common_profile, "ExecTime", 1);
_memory_used_counter =
_common_profile->AddHighWaterMarkCounter("MemoryUsage",
TUnit::BYTES, "", 1);
+ _common_profile->add_info_string("IsColocate",
+
std::to_string(_parent->is_colocated_operator()));
+ _common_profile->add_info_string("IsShuffled",
std::to_string(_parent->is_shuffled_operator()));
+ _common_profile->add_info_string("FollowedByShuffledOperator",
+
std::to_string(_parent->followed_by_shuffled_operator()));
return Status::OK();
}
@@ -664,6 +665,11 @@ Status
PipelineXSinkLocalState<SharedState>::init(RuntimeState* state, LocalSink
_exec_timer = ADD_TIMER_WITH_LEVEL(_common_profile, "ExecTime", 1);
_memory_used_counter =
_common_profile->AddHighWaterMarkCounter("MemoryUsage",
TUnit::BYTES, "", 1);
+ _common_profile->add_info_string("IsColocate",
+
std::to_string(_parent->is_colocated_operator()));
+ _common_profile->add_info_string("IsShuffled",
std::to_string(_parent->is_shuffled_operator()));
+ _common_profile->add_info_string("FollowedByShuffledOperator",
+
std::to_string(_parent->followed_by_shuffled_operator()));
return Status::OK();
}
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 1b6afecd0e6..cc56f1d11ca 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -143,31 +143,59 @@ public:
* pipeline task into the blocking task scheduler.
*/
virtual bool is_blockable(RuntimeState* state) const = 0;
-
virtual void set_low_memory_mode(RuntimeState* state) {}
- [[nodiscard]] virtual bool require_data_distribution() const { return
false; }
OperatorPtr child() { return _child; }
+ virtual Status reset(RuntimeState* state) {
+ return Status::InternalError("Reset is not implemented in operator:
{}", get_name());
+ }
+
+ /* -------------- Interfaces to determine the input data properties
-------------- */
+ /**
+ * Return True if this operator relies on the bucket distribution (e.g.
COLOCATE join, 1-phase AGG).
+ * Data input to this kind of operators must have the same distribution
with the table buckets.
+ * It is also means `required_data_distribution` should be
`BUCKET_HASH_SHUFFLE`.
+ * @return
+ */
+ [[nodiscard]] virtual bool is_colocated_operator() const { return false; }
+ /**
+ * Return True if this operator relies on the bucket distribution or
specific hash data distribution (e.g. SHUFFLED HASH join).
+ * Data input to this kind of operators must be HASH distributed according
to some rules.
+ * All colocated operators are also shuffled operators.
+ * It is also means `required_data_distribution` should be
`BUCKET_HASH_SHUFFLE` or `HASH_SHUFFLE`.
+ * @return
+ */
+ [[nodiscard]] virtual bool is_shuffled_operator() const { return false; }
+ /**
+ * Return True if this operator is followed by a shuffled operator.
+ * For example, in the plan fragment:
+ * `UNION` -> `SHUFFLED HASH JOIN`
+ * The `SHUFFLED HASH JOIN` is a shuffled operator so the UNION operator
is followed by a shuffled operator.
+ */
[[nodiscard]] bool followed_by_shuffled_operator() const {
return _followed_by_shuffled_operator;
}
- void set_followed_by_shuffled_operator(bool followed_by_shuffled_operator)
{
+ /**
+ * Update the operator properties according to the plan node.
+ * This is called before `prepare`.
+ */
+ virtual void update_operator(const TPlanNode& tnode, bool
followed_by_shuffled_operator,
+ bool require_bucket_distribution) {
_followed_by_shuffled_operator = followed_by_shuffled_operator;
+ _require_bucket_distribution = require_bucket_distribution;
}
- [[nodiscard]] virtual bool is_shuffled_operator() const { return false; }
+ /**
+ * Return the required data distribution of this operator.
+ */
[[nodiscard]] virtual DataDistribution required_data_distribution(
RuntimeState* /*state*/) const;
- [[nodiscard]] virtual bool
require_shuffled_data_distribution(RuntimeState* /*state*/) const;
-
- virtual Status reset(RuntimeState* state) {
- return Status::InternalError("Reset is not implemented in operator:
{}", get_name());
- }
protected:
OperatorPtr _child = nullptr;
bool _is_closed;
bool _followed_by_shuffled_operator = false;
+ bool _require_bucket_distribution = false;
bool _is_serial_operator = false;
};
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index 391f30d82d1..74c12352a88 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -114,11 +114,10 @@ void
PartitionedAggSinkLocalState::update_profile(RuntimeProfile* child_profile)
PartitionedAggSinkOperatorX::PartitionedAggSinkOperatorX(ObjectPool* pool, int
operator_id,
int dest_id, const
TPlanNode& tnode,
- const DescriptorTbl&
descs,
- bool
require_bucket_distribution)
+ const DescriptorTbl&
descs)
: DataSinkOperatorX<PartitionedAggSinkLocalState>(operator_id,
tnode.node_id, dest_id) {
- _agg_sink_operator = std::make_unique<AggSinkOperatorX>(pool, operator_id,
dest_id, tnode,
- descs,
require_bucket_distribution);
+ _agg_sink_operator =
+ std::make_unique<AggSinkOperatorX>(pool, operator_id, dest_id,
tnode, descs);
_spillable = true;
}
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
index 9d5cb146c4d..827655e4c3d 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
@@ -104,8 +104,7 @@ public:
class PartitionedAggSinkOperatorX : public
DataSinkOperatorX<PartitionedAggSinkLocalState> {
public:
PartitionedAggSinkOperatorX(ObjectPool* pool, int operator_id, int dest_id,
- const TPlanNode& tnode, const DescriptorTbl&
descs,
- bool require_bucket_distribution);
+ const TPlanNode& tnode, const DescriptorTbl&
descs);
~PartitionedAggSinkOperatorX() override = default;
Status init(const TDataSink& tsink) override {
return Status::InternalError("{} should not init with TPlanNode",
@@ -118,12 +117,21 @@ public:
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos)
override;
+ void update_operator(const TPlanNode& tnode, bool
followed_by_shuffled_operator,
+ bool require_bucket_distribution) override {
+ _agg_sink_operator->update_operator(tnode,
followed_by_shuffled_operator,
+ require_bucket_distribution);
+ }
+
DataDistribution required_data_distribution(RuntimeState* state) const
override {
return _agg_sink_operator->required_data_distribution(state);
}
- bool require_data_distribution() const override {
- return _agg_sink_operator->require_data_distribution();
+ bool is_colocated_operator() const override {
+ return _agg_sink_operator->is_colocated_operator();
+ }
+ bool is_shuffled_operator() const override {
+ return _agg_sink_operator->is_shuffled_operator();
}
Status set_child(OperatorPtr child) override {
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index f9b900cba2f..a3446a28881 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -25,6 +25,7 @@
#include "common/exception.h"
#include "common/logging.h"
#include "common/status.h"
+#include "pipeline/exec/aggregation_source_operator.h"
#include "pipeline/exec/operator.h"
#include "pipeline/exec/spill_utils.h"
#include "pipeline/pipeline_task.h"
@@ -116,6 +117,25 @@ bool PartitionedAggSourceOperatorX::is_serial_operator()
const {
return _agg_source_operator->is_serial_operator();
}
+void PartitionedAggSourceOperatorX::update_operator(const TPlanNode& tnode,
+ bool
followed_by_shuffled_operator,
+ bool
require_bucket_distribution) {
+ _agg_source_operator->update_operator(tnode, followed_by_shuffled_operator,
+ require_bucket_distribution);
+}
+
+DataDistribution PartitionedAggSourceOperatorX::required_data_distribution(
+ RuntimeState* state) const {
+ return _agg_source_operator->required_data_distribution(state);
+}
+
+bool PartitionedAggSourceOperatorX::is_colocated_operator() const {
+ return _agg_source_operator->is_colocated_operator();
+}
+bool PartitionedAggSourceOperatorX::is_shuffled_operator() const {
+ return _agg_source_operator->is_shuffled_operator();
+}
+
Status PartitionedAggSourceOperatorX::get_block(RuntimeState* state,
vectorized::Block* block,
bool* eos) {
auto& local_state = get_local_state(state);
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
index c388d5fe047..d22d23f20f1 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
@@ -88,6 +88,12 @@ public:
bool is_source() const override { return true; }
bool is_serial_operator() const override;
+ void update_operator(const TPlanNode& tnode, bool
followed_by_shuffled_operator,
+ bool require_bucket_distribution) override;
+
+ DataDistribution required_data_distribution(RuntimeState* state) const
override;
+ bool is_colocated_operator() const override;
+ bool is_shuffled_operator() const override;
private:
friend class PartitionedAggLocalState;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index 22f88d6859a..b744b8f2f7d 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -152,10 +152,6 @@ public:
_distribution_partition_exprs));
}
- bool is_shuffled_operator() const override {
- return _join_distribution == TJoinDistributionType::PARTITIONED;
- }
-
size_t revocable_mem_size(RuntimeState* state) const override;
size_t get_reserve_mem_size(RuntimeState* state) override;
@@ -165,8 +161,17 @@ public:
_inner_sink_operator = sink_operator;
_inner_probe_operator = probe_operator;
}
- bool require_data_distribution() const override {
- return _inner_probe_operator->require_data_distribution();
+ bool is_shuffled_operator() const override {
+ return _inner_probe_operator->is_shuffled_operator();
+ }
+ bool is_colocated_operator() const override {
+ return _inner_probe_operator->is_colocated_operator();
+ }
+
+ void update_operator(const TPlanNode& tnode, bool
followed_by_shuffled_operator,
+ bool require_bucket_distribution) override {
+ _inner_probe_operator->update_operator(tnode,
followed_by_shuffled_operator,
+ require_bucket_distribution);
}
private:
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index f464af25af2..e098d071daa 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -138,18 +138,23 @@ public:
_distribution_partition_exprs);
}
- bool is_shuffled_operator() const override {
- return _join_distribution == TJoinDistributionType::PARTITIONED;
- }
-
void set_inner_operators(const
std::shared_ptr<HashJoinBuildSinkOperatorX>& sink_operator,
const std::shared_ptr<HashJoinProbeOperatorX>&
probe_operator) {
_inner_sink_operator = sink_operator;
_inner_probe_operator = probe_operator;
}
- bool require_data_distribution() const override {
- return _inner_probe_operator->require_data_distribution();
+ bool is_colocated_operator() const override {
+ return _inner_sink_operator->is_colocated_operator();
+ }
+ bool is_shuffled_operator() const override {
+ return _inner_sink_operator->is_shuffled_operator();
+ }
+
+ void update_operator(const TPlanNode& tnode, bool
followed_by_shuffled_operator,
+ bool require_bucket_distribution) override {
+ _inner_sink_operator->update_operator(tnode,
followed_by_shuffled_operator,
+ require_bucket_distribution);
}
private:
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h
b/be/src/pipeline/exec/set_probe_sink_operator.h
index 09b5ee0249b..18cc29c37ce 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.h
+++ b/be/src/pipeline/exec/set_probe_sink_operator.h
@@ -117,6 +117,7 @@ public:
size_t get_reserve_mem_size(RuntimeState* state, bool eos) override;
bool is_shuffled_operator() const override { return true; }
+ bool is_colocated_operator() const override { return _is_colocate; }
private:
void _finalize_probe(SetProbeSinkLocalState<is_intersect>& local_state);
@@ -128,7 +129,7 @@ private:
// every child has its result expr list
vectorized::VExprContextSPtrs _child_exprs;
const bool _is_colocate;
- const std::vector<TExpr> _partition_exprs;
+ std::vector<TExpr> _partition_exprs;
using OperatorBase::_child;
};
diff --git a/be/src/pipeline/exec/set_sink_operator.h
b/be/src/pipeline/exec/set_sink_operator.h
index 1d70c8681a3..c22a09ac07e 100644
--- a/be/src/pipeline/exec/set_sink_operator.h
+++ b/be/src/pipeline/exec/set_sink_operator.h
@@ -121,6 +121,7 @@ public:
size_t get_reserve_mem_size(RuntimeState* state, bool eos) override;
bool is_shuffled_operator() const override { return true; }
+ bool is_colocated_operator() const override { return _is_colocate; }
private:
template <class HashTableContext, bool is_intersected>
@@ -137,7 +138,7 @@ private:
// every child has its result expr list
vectorized::VExprContextSPtrs _child_exprs;
const bool _is_colocate;
- const std::vector<TExpr> _partition_exprs;
+ std::vector<TExpr> _partition_exprs;
using OperatorBase::_child;
const std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
diff --git a/be/src/pipeline/exec/set_source_operator.h
b/be/src/pipeline/exec/set_source_operator.h
index d06987c6613..db56da26a24 100644
--- a/be/src/pipeline/exec/set_source_operator.h
+++ b/be/src/pipeline/exec/set_source_operator.h
@@ -68,14 +68,23 @@ public:
: Base(pool, tnode, operator_id, descs),
_child_quantity(tnode.node_type ==
TPlanNodeType::type::INTERSECT_NODE
?
tnode.intersect_node.result_expr_lists.size()
- :
tnode.except_node.result_expr_lists.size()) {};
+ :
tnode.except_node.result_expr_lists.size()),
+ _is_colocate(is_intersect ? tnode.intersect_node.is_colocate
+ : tnode.except_node.is_colocate) {}
#ifdef BE_TEST
- SetSourceOperatorX(size_t child_quantity) :
_child_quantity(child_quantity) {}
+ SetSourceOperatorX(size_t child_quantity)
+ : _child_quantity(child_quantity), _is_colocate(false) {}
#endif
~SetSourceOperatorX() override = default;
[[nodiscard]] bool is_source() const override { return true; }
+ bool is_shuffled_operator() const override { return true; }
+ bool is_colocated_operator() const override { return _is_colocate; }
+ DataDistribution required_data_distribution(RuntimeState* /*state*/) const
override {
+ return _is_colocate ?
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE)
+ : DataDistribution(ExchangeType::HASH_SHUFFLE);
+ }
Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos)
override;
Status set_child(OperatorPtr child) override {
@@ -94,6 +103,7 @@ private:
HashTableContext& hash_table_ctx,
vectorized::Block* output_block,
const int batch_size, bool* eos);
const size_t _child_quantity;
+ const bool _is_colocate;
};
#include "common/compile_check_end.h"
} // namespace pipeline
diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp
b/be/src/pipeline/exec/sort_sink_operator.cpp
index b85373f7663..6931601bf3f 100644
--- a/be/src/pipeline/exec/sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/sort_sink_operator.cpp
@@ -86,8 +86,7 @@ Status SortSinkLocalState::open(RuntimeState* state) {
}
SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, int operator_id, int
dest_id,
- const TPlanNode& tnode, const
DescriptorTbl& descs,
- const bool require_bucket_distribution)
+ const TPlanNode& tnode, const
DescriptorTbl& descs)
: DataSinkOperatorX(operator_id, tnode, dest_id),
_offset(tnode.sort_node.__isset.offset ? tnode.sort_node.offset : 0),
_pool(pool),
@@ -95,7 +94,6 @@ SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, int
operator_id, int dest
_row_descriptor(descs, tnode.row_tuples),
_merge_by_exchange(tnode.sort_node.merge_by_exchange),
_is_colocate(tnode.sort_node.__isset.is_colocate &&
tnode.sort_node.is_colocate),
- _require_bucket_distribution(require_bucket_distribution),
_is_analytic_sort(tnode.sort_node.__isset.is_analytic_sort
? tnode.sort_node.is_analytic_sort
: false),
diff --git a/be/src/pipeline/exec/sort_sink_operator.h
b/be/src/pipeline/exec/sort_sink_operator.h
index b520dbb3111..b7aaa99b0fb 100644
--- a/be/src/pipeline/exec/sort_sink_operator.h
+++ b/be/src/pipeline/exec/sort_sink_operator.h
@@ -56,7 +56,7 @@ private:
class SortSinkOperatorX final : public DataSinkOperatorX<SortSinkLocalState> {
public:
SortSinkOperatorX(ObjectPool* pool, int operator_id, int dest_id, const
TPlanNode& tnode,
- const DescriptorTbl& descs, const bool
require_bucket_distribution);
+ const DescriptorTbl& descs);
#ifdef BE_TEST
SortSinkOperatorX(ObjectPool* pool, TSortAlgorithm::type type, int64_t
limit, int64_t offset)
: _offset(offset),
@@ -79,7 +79,7 @@ public:
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos)
override;
DataDistribution required_data_distribution(RuntimeState* /*state*/) const
override {
if (_is_analytic_sort) {
- return _is_colocate && _require_bucket_distribution &&
!_followed_by_shuffled_operator
+ return _is_colocate && _require_bucket_distribution
?
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE,
_partition_exprs);
} else if (_merge_by_exchange) {
@@ -89,7 +89,8 @@ public:
return {ExchangeType::NOOP};
}
}
- bool require_data_distribution() const override { return _is_colocate; }
+ bool is_colocated_operator() const override { return _is_colocate; }
+ bool is_shuffled_operator() const override { return _is_analytic_sort; }
size_t get_revocable_mem_size(RuntimeState* state) const;
@@ -121,9 +122,8 @@ private:
const RowDescriptor _row_descriptor;
const bool _merge_by_exchange;
const bool _is_colocate = false;
- const bool _require_bucket_distribution = false;
const bool _is_analytic_sort = false;
- const std::vector<TExpr> _partition_exprs;
+ std::vector<TExpr> _partition_exprs;
const TSortAlgorithm::type _algorithm;
const bool _reuse_mem;
const int64_t _max_buffered_bytes;
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index 9669f508d7a..52e820ca62b 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -106,12 +106,11 @@ bool SpillSortSinkLocalState::is_blockable() const {
}
SpillSortSinkOperatorX::SpillSortSinkOperatorX(ObjectPool* pool, int
operator_id, int dest_id,
- const TPlanNode& tnode, const
DescriptorTbl& descs,
- bool
require_bucket_distribution)
+ const TPlanNode& tnode, const
DescriptorTbl& descs)
: DataSinkOperatorX(operator_id, tnode.node_id, dest_id) {
_spillable = true;
- _sort_sink_operator = std::make_unique<SortSinkOperatorX>(pool,
operator_id, dest_id, tnode,
- descs,
require_bucket_distribution);
+ _sort_sink_operator =
+ std::make_unique<SortSinkOperatorX>(pool, operator_id, dest_id,
tnode, descs);
}
Status SpillSortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState*
state) {
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h
b/be/src/pipeline/exec/spill_sort_sink_operator.h
index d2efcc90512..a3ddb8a24e9 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.h
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.h
@@ -68,7 +68,7 @@ class SpillSortSinkOperatorX final : public
DataSinkOperatorX<SpillSortSinkLocal
public:
using LocalStateType = SpillSortSinkLocalState;
SpillSortSinkOperatorX(ObjectPool* pool, int operator_id, int dest_id,
const TPlanNode& tnode,
- const DescriptorTbl& descs, bool
require_bucket_distribution);
+ const DescriptorTbl& descs);
Status init(const TDataSink& tsink) override {
return Status::InternalError("{} should not init with TPlanNode",
DataSinkOperatorX<SpillSortSinkLocalState>::_name);
@@ -81,8 +81,16 @@ public:
DataDistribution required_data_distribution(RuntimeState* state) const
override {
return _sort_sink_operator->required_data_distribution(state);
}
- bool require_data_distribution() const override {
- return _sort_sink_operator->require_data_distribution();
+ void update_operator(const TPlanNode& tnode, bool
followed_by_shuffled_operator,
+ bool require_bucket_distribution) override {
+ _sort_sink_operator->update_operator(tnode,
followed_by_shuffled_operator,
+ require_bucket_distribution);
+ }
+ bool is_colocated_operator() const override {
+ return _sort_sink_operator->is_colocated_operator();
+ }
+ bool is_shuffled_operator() const override {
+ return _sort_sink_operator->is_shuffled_operator();
}
Status set_child(OperatorPtr child) override {
RETURN_IF_ERROR(DataSinkOperatorX<SpillSortSinkLocalState>::set_child(child));
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
index 5959ef2cf03..fc7e8fe147c 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
@@ -818,29 +818,34 @@ void
StreamingAggLocalState::_emplace_into_hash_table(vectorized::AggregateDataP
}
StreamingAggOperatorX::StreamingAggOperatorX(ObjectPool* pool, int operator_id,
- const TPlanNode& tnode, const
DescriptorTbl& descs,
- bool require_bucket_distribution)
+ const TPlanNode& tnode, const
DescriptorTbl& descs)
: StatefulOperatorX<StreamingAggLocalState>(pool, tnode, operator_id,
descs),
_intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id),
_output_tuple_id(tnode.agg_node.output_tuple_id),
_needs_finalize(tnode.agg_node.need_finalize),
_is_first_phase(tnode.agg_node.__isset.is_first_phase &&
tnode.agg_node.is_first_phase),
_have_conjuncts(tnode.__isset.vconjunct &&
!tnode.vconjunct.nodes.empty()),
- _agg_fn_output_row_descriptor(descs, tnode.row_tuples),
- _partition_exprs(
- tnode.__isset.distribute_expr_lists &&
- (require_bucket_distribution ||
- std::any_of(
-
tnode.agg_node.aggregate_functions.begin(),
-
tnode.agg_node.aggregate_functions.end(),
- [](const TExpr& texpr) -> bool {
- return texpr.nodes[0]
-
.fn.name.function_name.starts_with(
- vectorized::
-
DISTINCT_FUNCTION_PREFIX);
- }))
- ? tnode.distribute_expr_lists[0]
- : tnode.agg_node.grouping_exprs) {}
+ _agg_fn_output_row_descriptor(descs, tnode.row_tuples) {}
+
+void StreamingAggOperatorX::update_operator(const TPlanNode& tnode,
+ bool followed_by_shuffled_operator,
+ bool require_bucket_distribution) {
+ _followed_by_shuffled_operator = followed_by_shuffled_operator;
+ _require_bucket_distribution = require_bucket_distribution;
+ _partition_exprs =
+ tnode.__isset.distribute_expr_lists &&
+ (StatefulOperatorX<
+
StreamingAggLocalState>::_followed_by_shuffled_operator ||
+ std::any_of(
+
tnode.agg_node.aggregate_functions.begin(),
+ tnode.agg_node.aggregate_functions.end(),
+ [](const TExpr& texpr) -> bool {
+ return
texpr.nodes[0].fn.name.function_name.starts_with(
+
vectorized::DISTINCT_FUNCTION_PREFIX);
+ }))
+ ? tnode.distribute_expr_lists[0]
+ : tnode.agg_node.grouping_exprs;
+}
Status StreamingAggOperatorX::init(const TPlanNode& tnode, RuntimeState*
state) {
RETURN_IF_ERROR(StatefulOperatorX<StreamingAggLocalState>::init(tnode,
state));
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.h
b/be/src/pipeline/exec/streaming_aggregation_operator.h
index 71396f351e3..a6d82f569f0 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.h
@@ -204,13 +204,15 @@ private:
class StreamingAggOperatorX MOCK_REMOVE(final) : public
StatefulOperatorX<StreamingAggLocalState> {
public:
StreamingAggOperatorX(ObjectPool* pool, int operator_id, const TPlanNode&
tnode,
- const DescriptorTbl& descs, bool
require_bucket_distribution);
+ const DescriptorTbl& descs);
#ifdef BE_TEST
StreamingAggOperatorX() : _is_first_phase {false} {}
#endif
~StreamingAggOperatorX() override = default;
Status init(const TPlanNode& tnode, RuntimeState* state) override;
+ void update_operator(const TPlanNode& tnode, bool
followed_by_shuffled_operator,
+ bool require_bucket_distribution) override;
Status prepare(RuntimeState* state) override;
Status pull(RuntimeState* state, vectorized::Block* block, bool* eos)
const override;
Status push(RuntimeState* state, vectorized::Block* input_block, bool eos)
const override;
@@ -271,7 +273,7 @@ private:
std::vector<int> _order_directions;
std::vector<int> _null_directions;
- const std::vector<TExpr> _partition_exprs;
+ std::vector<TExpr> _partition_exprs;
};
} // namespace pipeline
diff --git a/be/src/pipeline/exec/union_sink_operator.h
b/be/src/pipeline/exec/union_sink_operator.h
index ee575e8f8d5..2b32268ffca 100644
--- a/be/src/pipeline/exec/union_sink_operator.h
+++ b/be/src/pipeline/exec/union_sink_operator.h
@@ -117,18 +117,14 @@ public:
}
}
- bool require_shuffled_data_distribution(RuntimeState* /*state*/) const
override {
- return _followed_by_shuffled_operator;
- }
-
- DataDistribution required_data_distribution(RuntimeState* /*state*/) const
override {
- if (_child->is_serial_operator() && _followed_by_shuffled_operator) {
- return DataDistribution(ExchangeType::HASH_SHUFFLE,
_distribute_exprs);
+ DataDistribution required_data_distribution(RuntimeState* state) const
override {
+ if (_require_bucket_distribution) {
+ return DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE,
_distribute_exprs);
}
- if (_child->is_serial_operator()) {
- return DataDistribution(ExchangeType::PASSTHROUGH);
+ if (_followed_by_shuffled_operator) {
+ return DataDistribution(ExchangeType::HASH_SHUFFLE,
_distribute_exprs);
}
- return DataDistribution(ExchangeType::NOOP);
+ return Base::required_data_distribution(state);
}
void set_low_memory_mode(RuntimeState* state) override {
@@ -136,8 +132,6 @@ public:
local_state._shared_state->data_queue.set_low_memory_mode();
}
- bool is_shuffled_operator() const override { return
_followed_by_shuffled_operator; }
-
private:
int _get_first_materialized_child_idx() const { return
_first_materialized_child_idx; }
diff --git a/be/src/pipeline/exec/union_source_operator.h
b/be/src/pipeline/exec/union_source_operator.h
index 255c778f1e8..81040b9d417 100644
--- a/be/src/pipeline/exec/union_source_operator.h
+++ b/be/src/pipeline/exec/union_source_operator.h
@@ -102,8 +102,15 @@ public:
return Status::OK();
}
[[nodiscard]] int get_child_count() const { return _child_size; }
- bool require_shuffled_data_distribution(RuntimeState* /*state*/) const
override {
- return _followed_by_shuffled_operator;
+
+ DataDistribution required_data_distribution(RuntimeState* state) const
override {
+ if (_require_bucket_distribution) {
+ return DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE);
+ }
+ if (_followed_by_shuffled_operator) {
+ return DataDistribution(ExchangeType::HASH_SHUFFLE);
+ }
+ return Base::required_data_distribution(state);
}
void set_low_memory_mode(RuntimeState* state) override {
@@ -113,7 +120,6 @@ public:
}
}
- bool is_shuffled_operator() const override { return
_followed_by_shuffled_operator; }
Status set_child(OperatorPtr child) override {
Base::_child = child;
return Status::OK();
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index cbe8555ad47..e66e2326789 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -699,13 +699,15 @@ Status
PipelineFragmentContext::_create_tree_helper(ObjectPool* pool,
* If an operator's is followed by a local exchange without shuffle (e.g.
passthrough), a
* shuffled local exchanger will be used before join so it is not followed
by shuffle join.
*/
- auto require_shuffled_data_distribution =
+ auto required_data_distribution =
cur_pipe->operators().empty()
- ?
cur_pipe->sink()->require_shuffled_data_distribution(_runtime_state.get())
- :
op->require_shuffled_data_distribution(_runtime_state.get());
+ ?
cur_pipe->sink()->required_data_distribution(_runtime_state.get())
+ : op->required_data_distribution(_runtime_state.get());
current_followed_by_shuffled_operator =
- (followed_by_shuffled_operator || op->is_shuffled_operator()) &&
- require_shuffled_data_distribution;
+ (followed_by_shuffled_operator ||
+ (cur_pipe->operators().empty() ?
cur_pipe->sink()->is_shuffled_operator()
+ : op->is_shuffled_operator())) &&
+
Pipeline::is_hash_exchange(required_data_distribution.distribution_type);
if (num_children == 0) {
_use_serial_source = op->is_serial_operator();
@@ -1197,6 +1199,19 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
PipelinePtr& cur_pipe, int
parent_idx,
int child_idx,
const bool
followed_by_shuffled_operator) {
+ std::vector<DataSinkOperatorPtr> sink_ops;
+ Defer defer = Defer([&]() {
+ if (op) {
+ op->update_operator(tnode, followed_by_shuffled_operator,
_require_bucket_distribution);
+ _require_bucket_distribution =
+ _require_bucket_distribution ||
op->is_colocated_operator();
+ }
+ for (auto& s : sink_ops) {
+ s->update_operator(tnode, followed_by_shuffled_operator,
_require_bucket_distribution);
+ _require_bucket_distribution =
+ _require_bucket_distribution || s->is_colocated_operator();
+ }
+ });
// We directly construct the operator from Thrift because the given array
is in the order of preorder traversal.
// Therefore, here we need to use a stack-like structure.
_pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx);
@@ -1309,19 +1324,14 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
PipelinePtr new_pipe;
RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
- op = std::make_shared<DistinctStreamingAggOperatorX>(
- pool, next_operator_id(), tnode, descs,
_require_bucket_distribution);
- op->set_followed_by_shuffled_operator(false);
- _require_bucket_distribution = true;
+ op = std::make_shared<DistinctStreamingAggOperatorX>(pool,
next_operator_id(),
+ tnode,
descs);
RETURN_IF_ERROR(new_pipe->add_operator(op,
_parallel_instances));
RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
cur_pipe = new_pipe;
} else {
- op = std::make_shared<DistinctStreamingAggOperatorX>(
- pool, next_operator_id(), tnode, descs,
_require_bucket_distribution);
-
op->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
- _require_bucket_distribution =
- _require_bucket_distribution ||
op->require_data_distribution();
+ op = std::make_shared<DistinctStreamingAggOperatorX>(pool,
next_operator_id(),
+ tnode,
descs);
RETURN_IF_ERROR(cur_pipe->add_operator(op,
_parallel_instances));
}
} else if (is_streaming_agg) {
@@ -1329,14 +1339,14 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
PipelinePtr new_pipe;
RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
- op = std::make_shared<StreamingAggOperatorX>(pool,
next_operator_id(), tnode, descs,
-
_require_bucket_distribution);
+ op = std::make_shared<StreamingAggOperatorX>(pool,
next_operator_id(), tnode,
+ descs);
RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
RETURN_IF_ERROR(new_pipe->add_operator(op,
_parallel_instances));
cur_pipe = new_pipe;
} else {
- op = std::make_shared<StreamingAggOperatorX>(pool,
next_operator_id(), tnode, descs,
-
_require_bucket_distribution);
+ op = std::make_shared<StreamingAggOperatorX>(pool,
next_operator_id(), tnode,
+ descs);
RETURN_IF_ERROR(cur_pipe->add_operator(op,
_parallel_instances));
}
} else {
@@ -1367,20 +1377,14 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
cur_pipe = add_pipeline(cur_pipe);
_dag[downstream_pipeline_id].push_back(cur_pipe->id());
- DataSinkOperatorPtr sink;
if (enable_spill) {
- sink = std::make_shared<PartitionedAggSinkOperatorX>(
- pool, next_sink_operator_id(), op->operator_id(),
tnode, descs,
- _require_bucket_distribution);
+
sink_ops.push_back(std::make_shared<PartitionedAggSinkOperatorX>(
+ pool, next_sink_operator_id(), op->operator_id(),
tnode, descs));
} else {
- sink = std::make_shared<AggSinkOperatorX>(pool,
next_sink_operator_id(),
- op->operator_id(),
tnode, descs,
-
_require_bucket_distribution);
+ sink_ops.push_back(std::make_shared<AggSinkOperatorX>(
+ pool, next_sink_operator_id(), op->operator_id(),
tnode, descs));
}
-
sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
- _require_bucket_distribution =
- _require_bucket_distribution ||
sink->require_data_distribution();
- RETURN_IF_ERROR(cur_pipe->set_sink(sink));
+ RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
RETURN_IF_ERROR(cur_pipe->sink()->init(tnode,
_runtime_state.get()));
}
break;
@@ -1426,14 +1430,12 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
RETURN_IF_ERROR(inner_sink_operator->init(tnode,
_runtime_state.get()));
sink_operator->set_inner_operators(inner_sink_operator,
inner_probe_operator);
- DataSinkOperatorPtr sink = std::move(sink_operator);
- RETURN_IF_ERROR(build_side_pipe->set_sink(sink));
+ sink_ops.push_back(std::move(sink_operator));
+ RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode_,
_runtime_state.get()));
_pipeline_parent_map.push(op->node_id(), cur_pipe);
_pipeline_parent_map.push(op->node_id(), build_side_pipe);
-
sink->set_followed_by_shuffled_operator(sink->is_shuffled_operator());
- op->set_followed_by_shuffled_operator(op->is_shuffled_operator());
} else {
op = std::make_shared<HashJoinProbeOperatorX>(pool, tnode,
next_operator_id(), descs);
RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
@@ -1445,16 +1447,13 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
_dag[downstream_pipeline_id].push_back(build_side_pipe->id());
- DataSinkOperatorPtr sink;
- sink = std::make_shared<HashJoinBuildSinkOperatorX>(pool,
next_sink_operator_id(),
-
op->operator_id(), tnode, descs);
- RETURN_IF_ERROR(build_side_pipe->set_sink(sink));
+ sink_ops.push_back(std::make_shared<HashJoinBuildSinkOperatorX>(
+ pool, next_sink_operator_id(), op->operator_id(), tnode,
descs));
+ RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode,
_runtime_state.get()));
_pipeline_parent_map.push(op->node_id(), cur_pipe);
_pipeline_parent_map.push(op->node_id(), build_side_pipe);
-
sink->set_followed_by_shuffled_operator(sink->is_shuffled_operator());
- op->set_followed_by_shuffled_operator(op->is_shuffled_operator());
}
if (is_broadcast_join &&
_runtime_state->enable_share_hash_table_for_broadcast_join()) {
std::shared_ptr<HashJoinSharedState> shared_state =
@@ -1470,8 +1469,6 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
_op_id_to_shared_state.insert(
{op->operator_id(), {shared_state,
shared_state->sink_deps}});
}
- _require_bucket_distribution =
- _require_bucket_distribution ||
op->require_data_distribution();
break;
}
case TPlanNodeType::CROSS_JOIN_NODE: {
@@ -1485,10 +1482,9 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
_dag[downstream_pipeline_id].push_back(build_side_pipe->id());
- DataSinkOperatorPtr sink;
- sink = std::make_shared<NestedLoopJoinBuildSinkOperatorX>(pool,
next_sink_operator_id(),
-
op->operator_id(), tnode, descs);
- RETURN_IF_ERROR(build_side_pipe->set_sink(sink));
+ sink_ops.push_back(std::make_shared<NestedLoopJoinBuildSinkOperatorX>(
+ pool, next_sink_operator_id(), op->operator_id(), tnode,
descs));
+ RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode,
_runtime_state.get()));
_pipeline_parent_map.push(op->node_id(), cur_pipe);
_pipeline_parent_map.push(op->node_id(), build_side_pipe);
@@ -1497,7 +1493,6 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
case TPlanNodeType::UNION_NODE: {
int child_count = tnode.num_children;
op = std::make_shared<UnionSourceOperatorX>(pool, tnode,
next_operator_id(), descs);
- op->set_followed_by_shuffled_operator(_require_bucket_distribution);
RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
const auto downstream_pipeline_id = cur_pipe->id();
@@ -1507,11 +1502,9 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
for (int i = 0; i < child_count; i++) {
PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
_dag[downstream_pipeline_id].push_back(build_side_pipe->id());
- DataSinkOperatorPtr sink;
- sink = std::make_shared<UnionSinkOperatorX>(i,
next_sink_operator_id(),
- op->operator_id(),
pool, tnode, descs);
-
sink->set_followed_by_shuffled_operator(_require_bucket_distribution);
- RETURN_IF_ERROR(build_side_pipe->set_sink(sink));
+ sink_ops.push_back(std::make_shared<UnionSinkOperatorX>(
+ i, next_sink_operator_id(), op->operator_id(), pool,
tnode, descs));
+ RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode,
_runtime_state.get()));
// preset children pipelines. if any pipeline found this as its
father, will use the prepared pipeline to build.
_pipeline_parent_map.push(op->node_id(), build_side_pipe);
@@ -1540,20 +1533,14 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
cur_pipe = add_pipeline(cur_pipe);
_dag[downstream_pipeline_id].push_back(cur_pipe->id());
- DataSinkOperatorPtr sink;
if (should_spill) {
- sink = std::make_shared<SpillSortSinkOperatorX>(pool,
next_sink_operator_id(),
- op->operator_id(),
tnode, descs,
-
_require_bucket_distribution);
+ sink_ops.push_back(std::make_shared<SpillSortSinkOperatorX>(
+ pool, next_sink_operator_id(), op->operator_id(), tnode,
descs));
} else {
- sink = std::make_shared<SortSinkOperatorX>(pool,
next_sink_operator_id(),
- op->operator_id(),
tnode, descs,
-
_require_bucket_distribution);
- }
- sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
- _require_bucket_distribution =
- _require_bucket_distribution ||
sink->require_data_distribution();
- RETURN_IF_ERROR(cur_pipe->set_sink(sink));
+ sink_ops.push_back(std::make_shared<SortSinkOperatorX>(
+ pool, next_sink_operator_id(), op->operator_id(), tnode,
descs));
+ }
+ RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
break;
}
@@ -1568,10 +1555,9 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
cur_pipe = add_pipeline(cur_pipe);
_dag[downstream_pipeline_id].push_back(cur_pipe->id());
- DataSinkOperatorPtr sink;
- sink = std::make_shared<PartitionSortSinkOperatorX>(pool,
next_sink_operator_id(),
- op->operator_id(),
tnode, descs);
- RETURN_IF_ERROR(cur_pipe->set_sink(sink));
+ sink_ops.push_back(std::make_shared<PartitionSortSinkOperatorX>(
+ pool, next_sink_operator_id(), op->operator_id(), tnode,
descs));
+ RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
break;
}
@@ -1586,14 +1572,9 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
cur_pipe = add_pipeline(cur_pipe);
_dag[downstream_pipeline_id].push_back(cur_pipe->id());
- DataSinkOperatorPtr sink;
- sink = std::make_shared<AnalyticSinkOperatorX>(pool,
next_sink_operator_id(),
- op->operator_id(),
tnode, descs,
-
_require_bucket_distribution);
- sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
- _require_bucket_distribution =
- _require_bucket_distribution ||
sink->require_data_distribution();
- RETURN_IF_ERROR(cur_pipe->set_sink(sink));
+ sink_ops.push_back(std::make_shared<AnalyticSinkOperatorX>(
+ pool, next_sink_operator_id(), op->operator_id(), tnode,
descs));
+ RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
break;
}
@@ -1605,15 +1586,13 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
case TPlanNodeType::INTERSECT_NODE: {
RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(
pool, tnode, descs, op, cur_pipe, parent_idx, child_idx,
- !tnode.intersect_node.is_colocate));
- _require_bucket_distribution = tnode.intersect_node.is_colocate;
+ !tnode.intersect_node.is_colocate ||
followed_by_shuffled_operator, sink_ops));
break;
}
case TPlanNodeType::EXCEPT_NODE: {
RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(
pool, tnode, descs, op, cur_pipe, parent_idx, child_idx,
- !tnode.except_node.is_colocate));
- _require_bucket_distribution = tnode.except_node.is_colocate;
+ !tnode.except_node.is_colocate ||
followed_by_shuffled_operator, sink_ops));
break;
}
case TPlanNodeType::REPEAT_NODE: {
@@ -1710,9 +1689,9 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
template <bool is_intersect>
Status PipelineFragmentContext::_build_operators_for_set_operation_node(
ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs,
OperatorPtr& op,
- PipelinePtr& cur_pipe, int parent_idx, int child_idx, bool
followed_by_shuffled_operator) {
+ PipelinePtr& cur_pipe, int parent_idx, int child_idx, bool
followed_by_shuffled_operator,
+ std::vector<DataSinkOperatorPtr>& sink_ops) {
op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode,
next_operator_id(), descs));
- op->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
const auto downstream_pipeline_id = cur_pipe->id();
@@ -1724,16 +1703,14 @@ Status
PipelineFragmentContext::_build_operators_for_set_operation_node(
PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
_dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
- DataSinkOperatorPtr sink;
if (child_id == 0) {
- sink.reset(new SetSinkOperatorX<is_intersect>(child_id,
next_sink_operator_id(),
- op->operator_id(),
pool, tnode, descs));
+
sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
+ child_id, next_sink_operator_id(), op->operator_id(),
pool, tnode, descs));
} else {
- sink.reset(new SetProbeSinkOperatorX<is_intersect>(
+
sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
child_id, next_sink_operator_id(), op->operator_id(),
pool, tnode, descs));
}
- sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
- RETURN_IF_ERROR(probe_side_pipe->set_sink(sink));
+ RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode,
_runtime_state.get()));
// prepare children pipelines. if any pipeline found this as its
father, will use the prepared pipeline to build.
_pipeline_parent_map.push(op->node_id(), probe_side_pipe);
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index abe007dadb3..f908abc6b31 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -154,7 +154,8 @@ private:
const DescriptorTbl& descs,
OperatorPtr& op,
PipelinePtr& cur_pipe, int
parent_idx,
int child_idx,
- bool
followed_by_shuffled_operator);
+ bool
followed_by_shuffled_operator,
+
std::vector<DataSinkOperatorPtr>& sink_ops);
Status _create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink,
const std::vector<TExpr>& output_exprs,
diff --git a/be/test/pipeline/operator/hashjoin_probe_operator_test.cpp
b/be/test/pipeline/operator/hashjoin_probe_operator_test.cpp
index b94a33f197d..5540c6d06f6 100644
--- a/be/test/pipeline/operator/hashjoin_probe_operator_test.cpp
+++ b/be/test/pipeline/operator/hashjoin_probe_operator_test.cpp
@@ -202,8 +202,8 @@ public:
ASSERT_EQ(sink_operator->should_dry_run(_helper.runtime_state.get()),
join_params.is_broadcast_join && !should_build_hash_table);
- ASSERT_EQ(sink_operator->require_data_distribution(), false);
- ASSERT_EQ(probe_operator->require_data_distribution(), false);
+ ASSERT_EQ(sink_operator->is_colocated_operator(), false);
+ ASSERT_EQ(probe_operator->is_colocated_operator(), false);
ASSERT_FALSE(sink_operator->is_shuffled_operator());
ASSERT_FALSE(probe_operator->is_shuffled_operator());
std::cout << "sink distribution: "
diff --git a/be/test/pipeline/operator/partitioned_aggregation_test_helper.cpp
b/be/test/pipeline/operator/partitioned_aggregation_test_helper.cpp
index 060e8f7a6a8..f3eb1214a47 100644
--- a/be/test/pipeline/operator/partitioned_aggregation_test_helper.cpp
+++ b/be/test/pipeline/operator/partitioned_aggregation_test_helper.cpp
@@ -165,8 +165,8 @@ PartitionedAggregationTestHelper::create_operators() {
auto source_operator =
std::make_shared<PartitionedAggSourceOperatorX>(obj_pool.get(),
tnode, 0, desc_tbl);
- auto sink_operator =
std::make_shared<PartitionedAggSinkOperatorX>(obj_pool.get(), 0, 0, tnode,
-
desc_tbl, false);
+ auto sink_operator =
+ std::make_shared<PartitionedAggSinkOperatorX>(obj_pool.get(), 0,
0, tnode, desc_tbl);
auto child_operator = std::make_shared<MockChildOperator>();
auto probe_side_source_operator = std::make_shared<MockChildOperator>();
diff --git a/be/test/pipeline/operator/partitioned_aggregation_test_helper.h
b/be/test/pipeline/operator/partitioned_aggregation_test_helper.h
index 3e65201b84c..3b434378144 100644
--- a/be/test/pipeline/operator/partitioned_aggregation_test_helper.h
+++ b/be/test/pipeline/operator/partitioned_aggregation_test_helper.h
@@ -72,7 +72,7 @@ class MockPartitionedAggSinkOperatorX : public
PartitionedAggSinkOperatorX {
public:
MockPartitionedAggSinkOperatorX(ObjectPool* pool, int operator_id, int
dest_id,
const TPlanNode& tnode, const
DescriptorTbl& descs)
- : PartitionedAggSinkOperatorX(pool, operator_id, dest_id, tnode,
descs, false) {}
+ : PartitionedAggSinkOperatorX(pool, operator_id, dest_id, tnode,
descs) {}
~MockPartitionedAggSinkOperatorX() override = default;
Status prepare(RuntimeState* state) override { return Status::OK(); }
diff --git a/be/test/pipeline/operator/spill_sort_sink_operator_test.cpp
b/be/test/pipeline/operator/spill_sort_sink_operator_test.cpp
index 2711417b3d8..c2b54ab157b 100644
--- a/be/test/pipeline/operator/spill_sort_sink_operator_test.cpp
+++ b/be/test/pipeline/operator/spill_sort_sink_operator_test.cpp
@@ -80,8 +80,8 @@ TEST_F(SpillSortSinkOperatorTest, Basic) {
_helper.runtime_state.get());
ASSERT_EQ(data_distribution.distribution_type,
inner_data_distribution.distribution_type);
- ASSERT_EQ(sink_operator->require_data_distribution(),
- sink_operator->_sort_sink_operator->require_data_distribution());
+ ASSERT_EQ(sink_operator->is_colocated_operator(),
+ sink_operator->_sort_sink_operator->is_colocated_operator());
st = sink_local_state->close(_helper.runtime_state.get(), st);
ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string();
diff --git a/be/test/pipeline/operator/spill_sort_test_helper.cpp
b/be/test/pipeline/operator/spill_sort_test_helper.cpp
index 8c8836a43d9..3dafb39778f 100644
--- a/be/test/pipeline/operator/spill_sort_test_helper.cpp
+++ b/be/test/pipeline/operator/spill_sort_test_helper.cpp
@@ -144,7 +144,7 @@ SpillSortTestHelper::create_operators() {
auto source_operator =
std::make_shared<SpillSortSourceOperatorX>(obj_pool.get(), tnode,
0, desc_tbl);
auto sink_operator =
- std::make_shared<SpillSortSinkOperatorX>(obj_pool.get(), 0, 0,
tnode, desc_tbl, false);
+ std::make_shared<SpillSortSinkOperatorX>(obj_pool.get(), 0, 0,
tnode, desc_tbl);
auto child_operator = std::make_shared<MockChildOperator>();
auto probe_side_source_operator = std::make_shared<MockChildOperator>();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]