This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 9e0a2e861cf [pipelineX](refactor) rename functions (#28846)
9e0a2e861cf is described below
commit 9e0a2e861cf05fcb7cb22fbcfe95955b16ab7f73
Author: Gabriel <[email protected]>
AuthorDate: Fri Dec 22 17:24:39 2023 +0800
[pipelineX](refactor) rename functions (#28846)
---
be/src/pipeline/exec/aggregation_sink_operator.h | 4 ++--
be/src/pipeline/exec/analytic_sink_operator.h | 4 ++--
be/src/pipeline/exec/assert_num_rows_operator.h | 2 +-
be/src/pipeline/exec/exchange_source_operator.h | 2 +-
be/src/pipeline/exec/hashjoin_build_sink.h | 2 +-
be/src/pipeline/exec/hashjoin_probe_operator.h | 2 +-
be/src/pipeline/exec/nested_loop_join_build_operator.h | 2 +-
be/src/pipeline/exec/nested_loop_join_probe_operator.h | 2 +-
be/src/pipeline/exec/partition_sort_sink_operator.h | 4 ++--
be/src/pipeline/exec/scan_operator.h | 2 +-
be/src/pipeline/exec/set_probe_sink_operator.h | 2 +-
be/src/pipeline/exec/set_sink_operator.h | 2 +-
be/src/pipeline/exec/sort_sink_operator.h | 4 ++--
be/src/pipeline/exec/streaming_aggregation_sink_operator.h | 2 +-
be/src/pipeline/pipeline.h | 2 +-
be/src/pipeline/pipeline_x/operator.h | 4 ++--
be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp | 8 ++++----
17 files changed, 25 insertions(+), 25 deletions(-)
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h
b/be/src/pipeline/exec/aggregation_sink_operator.h
index 2cd6ef50939..97be9dcd6a3 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -366,12 +366,12 @@ public:
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;
- DataDistribution get_local_exchange_type() const override {
+ DataDistribution required_data_distribution() const override {
if (_probe_expr_ctxs.empty()) {
return _needs_finalize ||
DataSinkOperatorX<LocalStateType>::_child_x
->ignore_data_distribution()
? DataDistribution(ExchangeType::PASSTHROUGH)
- :
DataSinkOperatorX<LocalStateType>::get_local_exchange_type();
+ :
DataSinkOperatorX<LocalStateType>::required_data_distribution();
}
return _is_colocate ?
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE,
_partition_exprs);
diff --git a/be/src/pipeline/exec/analytic_sink_operator.h
b/be/src/pipeline/exec/analytic_sink_operator.h
index 14ed8c815b1..3e0eb85f76d 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.h
+++ b/be/src/pipeline/exec/analytic_sink_operator.h
@@ -107,7 +107,7 @@ public:
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;
- DataDistribution get_local_exchange_type() const override {
+ DataDistribution required_data_distribution() const override {
if (_partition_by_eq_expr_ctxs.empty()) {
return {ExchangeType::PASSTHROUGH};
} else if (_order_by_eq_expr_ctxs.empty()) {
@@ -115,7 +115,7 @@ public:
?
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE,
_partition_exprs);
}
- return
DataSinkOperatorX<AnalyticSinkLocalState>::get_local_exchange_type();
+ return
DataSinkOperatorX<AnalyticSinkLocalState>::required_data_distribution();
}
private:
diff --git a/be/src/pipeline/exec/assert_num_rows_operator.h
b/be/src/pipeline/exec/assert_num_rows_operator.h
index 1e796b622dc..bb5e65168b6 100644
--- a/be/src/pipeline/exec/assert_num_rows_operator.h
+++ b/be/src/pipeline/exec/assert_num_rows_operator.h
@@ -57,7 +57,7 @@ public:
[[nodiscard]] bool is_source() const override { return false; }
- DataDistribution get_local_exchange_type() const override {
+ DataDistribution required_data_distribution() const override {
return {ExchangeType::PASSTHROUGH};
}
diff --git a/be/src/pipeline/exec/exchange_source_operator.h
b/be/src/pipeline/exec/exchange_source_operator.h
index 221a43779a1..b621da38072 100644
--- a/be/src/pipeline/exec/exchange_source_operator.h
+++ b/be/src/pipeline/exec/exchange_source_operator.h
@@ -117,7 +117,7 @@ public:
return _sub_plan_query_statistics_recvr;
}
- DataDistribution get_local_exchange_type() const override {
+ DataDistribution required_data_distribution() const override {
if (OperatorX<ExchangeLocalState>::ignore_data_distribution()) {
return {ExchangeType::NOOP};
}
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h
b/be/src/pipeline/exec/hashjoin_build_sink.h
index ecf0a4a3122..24faa4115dd 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -156,7 +156,7 @@ public:
._should_build_hash_table;
}
- DataDistribution get_local_exchange_type() const override {
+ DataDistribution required_data_distribution() const override {
if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
return {ExchangeType::NOOP};
} else if (_is_broadcast_join) {
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index 16b455e4f6c..5dde597ec76 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -163,7 +163,7 @@ public:
SourceState& source_state) const override;
bool need_more_input_data(RuntimeState* state) const override;
- DataDistribution get_local_exchange_type() const override {
+ DataDistribution required_data_distribution() const override {
if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
return {ExchangeType::NOOP};
}
diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.h
b/be/src/pipeline/exec/nested_loop_join_build_operator.h
index daa976b4e78..ea0820253cc 100644
--- a/be/src/pipeline/exec/nested_loop_join_build_operator.h
+++ b/be/src/pipeline/exec/nested_loop_join_build_operator.h
@@ -102,7 +102,7 @@ public:
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;
- DataDistribution get_local_exchange_type() const override {
+ DataDistribution required_data_distribution() const override {
if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
return {ExchangeType::NOOP};
}
diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.h
b/be/src/pipeline/exec/nested_loop_join_probe_operator.h
index 5e57399eae8..bc8913f5d08 100644
--- a/be/src/pipeline/exec/nested_loop_join_probe_operator.h
+++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.h
@@ -227,7 +227,7 @@ public:
return _old_version_flag ? _row_descriptor : *_intermediate_row_desc;
}
- DataDistribution get_local_exchange_type() const override {
+ DataDistribution required_data_distribution() const override {
if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
return {ExchangeType::NOOP};
}
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.h
b/be/src/pipeline/exec/partition_sort_sink_operator.h
index 1a47e0fa133..486e7056213 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.h
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.h
@@ -105,9 +105,9 @@ public:
Status open(RuntimeState* state) override;
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;
- DataDistribution get_local_exchange_type() const override {
+ DataDistribution required_data_distribution() const override {
if (_topn_phase == TPartTopNPhase::TWO_PHASE_GLOBAL) {
- return
DataSinkOperatorX<PartitionSortSinkLocalState>::get_local_exchange_type();
+ return
DataSinkOperatorX<PartitionSortSinkLocalState>::required_data_distribution();
}
return {ExchangeType::PASSTHROUGH};
}
diff --git a/be/src/pipeline/exec/scan_operator.h
b/be/src/pipeline/exec/scan_operator.h
index 9bc42453c79..3690e9eb39c 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -434,7 +434,7 @@ public:
TPushAggOp::type get_push_down_agg_type() { return _push_down_agg_type; }
- DataDistribution get_local_exchange_type() const override {
+ DataDistribution required_data_distribution() const override {
if (_col_distribute_ids.empty() ||
OperatorX<LocalStateType>::ignore_data_distribution()) {
// 1. `_col_distribute_ids` is empty means storage distribution is
not effective, so we prefer to do local shuffle.
// 2. `ignore_data_distribution()` returns true means we ignore
the distribution.
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h
b/be/src/pipeline/exec/set_probe_sink_operator.h
index a86bf491721..6f453ff31fc 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.h
+++ b/be/src/pipeline/exec/set_probe_sink_operator.h
@@ -144,7 +144,7 @@ public:
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;
- DataDistribution get_local_exchange_type() const override {
+ DataDistribution required_data_distribution() const override {
return _is_colocate ?
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE,
_partition_exprs);
}
diff --git a/be/src/pipeline/exec/set_sink_operator.h
b/be/src/pipeline/exec/set_sink_operator.h
index 375906b5aa3..635d1ee8675 100644
--- a/be/src/pipeline/exec/set_sink_operator.h
+++ b/be/src/pipeline/exec/set_sink_operator.h
@@ -129,7 +129,7 @@ public:
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;
- DataDistribution get_local_exchange_type() const override {
+ DataDistribution required_data_distribution() const override {
return _is_colocate ?
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE,
_partition_exprs);
}
diff --git a/be/src/pipeline/exec/sort_sink_operator.h
b/be/src/pipeline/exec/sort_sink_operator.h
index 3146e915eef..2f5512e108b 100644
--- a/be/src/pipeline/exec/sort_sink_operator.h
+++ b/be/src/pipeline/exec/sort_sink_operator.h
@@ -93,12 +93,12 @@ public:
Status open(RuntimeState* state) override;
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;
- DataDistribution get_local_exchange_type() const override {
+ DataDistribution required_data_distribution() const override {
if (_merge_by_exchange) {
// The current sort node is used for the ORDER BY
return {ExchangeType::PASSTHROUGH};
}
- return
DataSinkOperatorX<SortSinkLocalState>::get_local_exchange_type();
+ return
DataSinkOperatorX<SortSinkLocalState>::required_data_distribution();
}
private:
diff --git a/be/src/pipeline/exec/streaming_aggregation_sink_operator.h
b/be/src/pipeline/exec/streaming_aggregation_sink_operator.h
index ef7f71b7e29..a7fcdcf847b 100644
--- a/be/src/pipeline/exec/streaming_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/streaming_aggregation_sink_operator.h
@@ -120,7 +120,7 @@ public:
Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;
- DataDistribution get_local_exchange_type() const override {
+ DataDistribution required_data_distribution() const override {
return {ExchangeType::PASSTHROUGH};
}
};
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index 305676856a0..2775c45019e 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -148,7 +148,7 @@ public:
}
}
void init_data_distribution() {
- set_data_distribution(operatorXs.front()->get_local_exchange_type());
+
set_data_distribution(operatorXs.front()->required_data_distribution());
}
void set_data_distribution(const DataDistribution& data_distribution) {
_data_distribution = data_distribution;
diff --git a/be/src/pipeline/pipeline_x/operator.h
b/be/src/pipeline/pipeline_x/operator.h
index fc95785924b..da52706b56c 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -181,7 +181,7 @@ public:
}
[[nodiscard]] std::string get_name() const override { return _op_name; }
[[nodiscard]] virtual DependencySPtr get_dependency(QueryContext* ctx) = 0;
- [[nodiscard]] virtual DataDistribution get_local_exchange_type() const {
+ [[nodiscard]] virtual DataDistribution required_data_distribution() const {
return _child_x && _child_x->ignore_data_distribution() && !is_source()
? DataDistribution(ExchangeType::PASSTHROUGH)
: DataDistribution(ExchangeType::NOOP);
@@ -481,7 +481,7 @@ public:
}
virtual void get_dependency(std::vector<DependencySPtr>& dependency,
QueryContext* ctx) = 0;
- virtual DataDistribution get_local_exchange_type() const {
+ virtual DataDistribution required_data_distribution() const {
return _child_x && _child_x->ignore_data_distribution()
? DataDistribution(ExchangeType::PASSTHROUGH)
: DataDistribution(ExchangeType::NOOP);
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index fe7388735ed..7efe476c6de 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -288,10 +288,10 @@ Status PipelineXFragmentContext::_plan_local_exchange(
do_local_exchange = false;
// Plan local exchange for each operator.
for (; idx < ops.size();) {
- if (ops[idx]->get_local_exchange_type().need_local_exchange()) {
+ if (ops[idx]->required_data_distribution().need_local_exchange()) {
RETURN_IF_ERROR(_add_local_exchange(
pip_idx, idx, ops[idx]->node_id(),
_runtime_state->obj_pool(), pip,
- ops[idx]->get_local_exchange_type(),
&do_local_exchange, num_buckets,
+ ops[idx]->required_data_distribution(),
&do_local_exchange, num_buckets,
bucket_seq_to_instance_idx,
ignore_data_hash_distribution));
}
if (do_local_exchange) {
@@ -305,10 +305,10 @@ Status PipelineXFragmentContext::_plan_local_exchange(
idx++;
}
} while (do_local_exchange);
- if (pip->sink_x()->get_local_exchange_type().need_local_exchange()) {
+ if (pip->sink_x()->required_data_distribution().need_local_exchange()) {
RETURN_IF_ERROR(_add_local_exchange(
pip_idx, idx, pip->sink_x()->node_id(),
_runtime_state->obj_pool(), pip,
- pip->sink_x()->get_local_exchange_type(), &do_local_exchange,
num_buckets,
+ pip->sink_x()->required_data_distribution(),
&do_local_exchange, num_buckets,
bucket_seq_to_instance_idx, ignore_data_hash_distribution));
}
return Status::OK();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]