This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 7155711431e [cherry-pick](branch-2.1) Improve local shuffle strategy
(#40030)
7155711431e is described below
commit 7155711431e9fba015f6e459b75290c0926d6636
Author: Gabriel <[email protected]>
AuthorDate: Thu Aug 29 14:16:16 2024 +0800
[cherry-pick](branch-2.1) Improve local shuffle strategy (#40030)
pick #34122 #35454 #35716 #37195
---
be/src/pipeline/exec/aggregation_sink_operator.cpp | 8 +++--
be/src/pipeline/exec/aggregation_sink_operator.h | 9 ++++--
be/src/pipeline/exec/analytic_sink_operator.cpp | 9 ++++--
be/src/pipeline/exec/analytic_sink_operator.h | 7 +++--
be/src/pipeline/exec/datagen_operator.cpp | 4 +--
.../distinct_streaming_aggregation_operator.cpp | 11 ++++---
.../exec/distinct_streaming_aggregation_operator.h | 7 +++--
be/src/pipeline/exec/hashjoin_build_sink.h | 4 +++
be/src/pipeline/exec/hashjoin_probe_operator.h | 4 +++
be/src/pipeline/exec/operator.h | 3 +-
.../exec/partitioned_aggregation_sink_operator.cpp | 6 ++--
.../exec/partitioned_aggregation_sink_operator.h | 6 +++-
.../exec/partitioned_hash_join_probe_operator.h | 3 ++
.../exec/partitioned_hash_join_sink_operator.h | 4 +++
be/src/pipeline/exec/sort_sink_operator.cpp | 5 ++--
be/src/pipeline/exec/sort_sink_operator.h | 6 ++--
be/src/pipeline/exec/spill_sort_sink_operator.cpp | 6 ++--
be/src/pipeline/exec/spill_sort_sink_operator.h | 5 +++-
.../pipeline_x/pipeline_x_fragment_context.cpp | 34 ++++++++++++++++++----
.../pipeline_x/pipeline_x_fragment_context.h | 2 ++
.../org/apache/doris/planner/DataGenScanNode.java | 9 ++++++
.../pipeline/p1/conf/regression-conf.groovy | 1 +
.../correctness_p0/test_assert_row_num.groovy | 2 +-
.../external_table_p0/tvf/test_numbers.groovy | 6 ++--
24 files changed, 121 insertions(+), 40 deletions(-)
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index 730337561e8..704c256737a 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -615,7 +615,7 @@ void AggSinkLocalState::_init_hash_method(const
vectorized::VExprContextSPtrs& p
}
AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int operator_id, const
TPlanNode& tnode,
- const DescriptorTbl& descs)
+ const DescriptorTbl& descs, bool
require_bucket_distribution)
: DataSinkOperatorX<AggSinkLocalState>(operator_id, tnode.node_id),
_intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id),
_intermediate_tuple_desc(nullptr),
@@ -628,9 +628,11 @@ AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int
operator_id, const TPla
_limit(tnode.limit),
_have_conjuncts((tnode.__isset.vconjunct &&
!tnode.vconjunct.nodes.empty()) ||
(tnode.__isset.conjuncts &&
!tnode.conjuncts.empty())),
- _partition_exprs(tnode.__isset.distribute_expr_lists ?
tnode.distribute_expr_lists[0]
- :
std::vector<TExpr> {}),
+ _partition_exprs(tnode.__isset.distribute_expr_lists &&
require_bucket_distribution
+ ? tnode.distribute_expr_lists[0]
+ : tnode.agg_node.grouping_exprs),
_is_colocate(tnode.agg_node.__isset.is_colocate &&
tnode.agg_node.is_colocate),
+ _require_bucket_distribution(require_bucket_distribution),
_agg_fn_output_row_descriptor(descs, tnode.row_tuples,
tnode.nullable_tuples) {}
Status AggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h
b/be/src/pipeline/exec/aggregation_sink_operator.h
index b3ffa19d6db..3124a3981b4 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -143,7 +143,7 @@ protected:
class AggSinkOperatorX final : public DataSinkOperatorX<AggSinkLocalState> {
public:
AggSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode,
- const DescriptorTbl& descs);
+ const DescriptorTbl& descs, bool
require_bucket_distribution);
~AggSinkOperatorX() override = default;
Status init(const TDataSink& tsink) override {
return Status::InternalError("{} should not init with TPlanNode",
@@ -164,9 +164,11 @@ public:
? DataDistribution(ExchangeType::PASSTHROUGH)
:
DataSinkOperatorX<AggSinkLocalState>::required_data_distribution();
}
- return _is_colocate ?
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
- : DataDistribution(ExchangeType::HASH_SHUFFLE,
_partition_exprs);
+ return _is_colocate && _require_bucket_distribution
+ ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE,
_partition_exprs)
+ : DataDistribution(ExchangeType::HASH_SHUFFLE,
_partition_exprs);
}
+ bool require_data_distribution() const override { return _is_colocate; }
size_t get_revocable_mem_size(RuntimeState* state) const;
vectorized::AggregatedDataVariants* get_agg_data(RuntimeState* state) {
@@ -213,6 +215,7 @@ protected:
const std::vector<TExpr> _partition_exprs;
const bool _is_colocate;
+ const bool _require_bucket_distribution;
RowDescriptor _agg_fn_output_row_descriptor;
};
diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp
b/be/src/pipeline/exec/analytic_sink_operator.cpp
index a1d3384edc6..5b4f5cee5cb 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.cpp
+++ b/be/src/pipeline/exec/analytic_sink_operator.cpp
@@ -193,14 +193,17 @@ vectorized::BlockRowPos
AnalyticSinkLocalState::_get_partition_by_end() {
}
AnalyticSinkOperatorX::AnalyticSinkOperatorX(ObjectPool* pool, int operator_id,
- const TPlanNode& tnode, const
DescriptorTbl& descs)
+ const TPlanNode& tnode, const
DescriptorTbl& descs,
+ bool require_bucket_distribution)
: DataSinkOperatorX(operator_id, tnode.node_id),
_buffered_tuple_id(tnode.analytic_node.__isset.buffered_tuple_id
? tnode.analytic_node.buffered_tuple_id
: 0),
_is_colocate(tnode.analytic_node.__isset.is_colocate &&
tnode.analytic_node.is_colocate),
- _partition_exprs(tnode.__isset.distribute_expr_lists ?
tnode.distribute_expr_lists[0]
- :
std::vector<TExpr> {}) {}
+ _require_bucket_distribution(require_bucket_distribution),
+ _partition_exprs(tnode.__isset.distribute_expr_lists &&
require_bucket_distribution
+ ? tnode.distribute_expr_lists[0]
+ : tnode.analytic_node.partition_exprs) {}
Status AnalyticSinkOperatorX::init(const TPlanNode& tnode, RuntimeState*
state) {
RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state));
diff --git a/be/src/pipeline/exec/analytic_sink_operator.h
b/be/src/pipeline/exec/analytic_sink_operator.h
index 3ae4a7b5cff..d974f68cefa 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.h
+++ b/be/src/pipeline/exec/analytic_sink_operator.h
@@ -86,7 +86,7 @@ private:
class AnalyticSinkOperatorX final : public
DataSinkOperatorX<AnalyticSinkLocalState> {
public:
AnalyticSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode&
tnode,
- const DescriptorTbl& descs);
+ const DescriptorTbl& descs, bool
require_bucket_distribution);
Status init(const TDataSink& tsink) override {
return Status::InternalError("{} should not init with TPlanNode",
DataSinkOperatorX<AnalyticSinkLocalState>::_name);
@@ -102,13 +102,15 @@ public:
if (_partition_by_eq_expr_ctxs.empty()) {
return {ExchangeType::PASSTHROUGH};
} else if (_order_by_eq_expr_ctxs.empty()) {
- return _is_colocate
+ return _is_colocate && _require_bucket_distribution
?
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE,
_partition_exprs);
}
return
DataSinkOperatorX<AnalyticSinkLocalState>::required_data_distribution();
}
+ bool require_data_distribution() const override { return true; }
+
private:
Status _insert_range_column(vectorized::Block* block, const
vectorized::VExprContextSPtr& expr,
vectorized::IColumn* dst_column, size_t
length);
@@ -125,6 +127,7 @@ private:
std::vector<size_t> _num_agg_input;
const bool _is_colocate;
+ const bool _require_bucket_distribution;
const std::vector<TExpr> _partition_exprs;
};
diff --git a/be/src/pipeline/exec/datagen_operator.cpp
b/be/src/pipeline/exec/datagen_operator.cpp
index 4fbe21f71d5..1f84bbf145a 100644
--- a/be/src/pipeline/exec/datagen_operator.cpp
+++ b/be/src/pipeline/exec/datagen_operator.cpp
@@ -97,8 +97,8 @@ Status DataGenLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
// TODO: use runtime filter to filte result block, maybe this node need
derive from vscan_node.
for (const auto& filter_desc : p._runtime_filter_descs) {
IRuntimeFilter* runtime_filter = nullptr;
- RETURN_IF_ERROR(state->register_consumer_runtime_filter(filter_desc,
false, p.node_id(),
-
&runtime_filter));
+ RETURN_IF_ERROR(state->register_consumer_runtime_filter(
+ filter_desc, p.ignore_data_distribution(), p.node_id(),
&runtime_filter));
runtime_filter->init_profile(_runtime_profile.get());
}
return Status::OK();
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
index c33b436ba03..16c0df07b49 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
@@ -374,7 +374,8 @@ void
DistinctStreamingAggLocalState::_emplace_into_hash_table_to_distinct(
DistinctStreamingAggOperatorX::DistinctStreamingAggOperatorX(ObjectPool* pool,
int operator_id,
const TPlanNode&
tnode,
- const
DescriptorTbl& descs)
+ const
DescriptorTbl& descs,
+ bool
require_bucket_distribution)
: StatefulOperatorX<DistinctStreamingAggLocalState>(pool, tnode,
operator_id, descs),
_intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id),
_intermediate_tuple_desc(nullptr),
@@ -382,9 +383,11 @@
DistinctStreamingAggOperatorX::DistinctStreamingAggOperatorX(ObjectPool* pool, i
_output_tuple_desc(nullptr),
_needs_finalize(tnode.agg_node.need_finalize),
_is_first_phase(tnode.agg_node.__isset.is_first_phase &&
tnode.agg_node.is_first_phase),
- _partition_exprs(tnode.__isset.distribute_expr_lists ?
tnode.distribute_expr_lists[0]
- :
std::vector<TExpr> {}),
- _is_colocate(tnode.agg_node.__isset.is_colocate &&
tnode.agg_node.is_colocate) {
+ _partition_exprs(tnode.__isset.distribute_expr_lists &&
require_bucket_distribution
+ ? tnode.distribute_expr_lists[0]
+ : tnode.agg_node.grouping_exprs),
+ _is_colocate(tnode.agg_node.__isset.is_colocate &&
tnode.agg_node.is_colocate),
+ _require_bucket_distribution(require_bucket_distribution) {
if (tnode.agg_node.__isset.use_streaming_preaggregation) {
_is_streaming_preagg = tnode.agg_node.use_streaming_preaggregation;
if (_is_streaming_preagg) {
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
index ca091f743bd..d0b0d963ead 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
@@ -97,7 +97,7 @@ class DistinctStreamingAggOperatorX final
: public StatefulOperatorX<DistinctStreamingAggLocalState> {
public:
DistinctStreamingAggOperatorX(ObjectPool* pool, int operator_id, const
TPlanNode& tnode,
- const DescriptorTbl& descs);
+ const DescriptorTbl& descs, bool
require_bucket_distribution);
Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;
@@ -107,13 +107,15 @@ public:
DataDistribution required_data_distribution() const override {
if (_needs_finalize || (!_probe_expr_ctxs.empty() &&
!_is_streaming_preagg)) {
- return _is_colocate
+ return _is_colocate && _require_bucket_distribution
?
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE,
_partition_exprs);
}
return
StatefulOperatorX<DistinctStreamingAggLocalState>::required_data_distribution();
}
+ bool require_data_distribution() const override { return _is_colocate; }
+
private:
friend class DistinctStreamingAggLocalState;
TupleId _intermediate_tuple_id;
@@ -125,6 +127,7 @@ private:
const bool _is_first_phase;
const std::vector<TExpr> _partition_exprs;
const bool _is_colocate;
+ const bool _require_bucket_distribution;
// group by k1,k2
vectorized::VExprContextSPtrs _probe_expr_ctxs;
std::vector<vectorized::AggFnEvaluator*> _aggregate_evaluators;
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h
b/be/src/pipeline/exec/hashjoin_build_sink.h
index 2dab03d5a19..d445e2f309c 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -167,6 +167,10 @@ public:
bool is_shuffled_hash_join() const override {
return _join_distribution == TJoinDistributionType::PARTITIONED;
}
+ bool require_data_distribution() const override {
+ return _join_distribution == TJoinDistributionType::COLOCATE ||
+ _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE;
+ }
private:
friend class HashJoinBuildSinkLocalState;
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index 5cdfe9feeb7..264f177bcc9 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -178,6 +178,10 @@ public:
bool is_shuffled_hash_join() const override {
return _join_distribution == TJoinDistributionType::PARTITIONED;
}
+ bool require_data_distribution() const override {
+ return _join_distribution == TJoinDistributionType::COLOCATE ||
+ _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE;
+ }
private:
Status _do_evaluate(vectorized::Block& block,
vectorized::VExprContextSPtrs& exprs,
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index c93cc8f592e..7c3fb945a2d 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -252,7 +252,8 @@ public:
virtual size_t revocable_mem_size(RuntimeState* state) const { return 0; }
- virtual Status revoke_memory(RuntimeState* state) { return Status::OK(); };
+ virtual Status revoke_memory(RuntimeState* state) { return Status::OK(); }
+ [[nodiscard]] virtual bool require_data_distribution() const { return
false; }
protected:
OperatorBuilderBase* _operator_builder = nullptr;
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index 053e6dee0cb..9c5c1d6a81c 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -124,9 +124,11 @@ void
PartitionedAggSinkLocalState::update_profile(RuntimeProfile* child_profile)
PartitionedAggSinkOperatorX::PartitionedAggSinkOperatorX(ObjectPool* pool, int
operator_id,
const TPlanNode&
tnode,
- const DescriptorTbl&
descs)
+ const DescriptorTbl&
descs,
+ bool
require_bucket_distribution)
: DataSinkOperatorX<PartitionedAggSinkLocalState>(operator_id,
tnode.node_id) {
- _agg_sink_operator = std::make_unique<AggSinkOperatorX>(pool, operator_id,
tnode, descs);
+ _agg_sink_operator = std::make_unique<AggSinkOperatorX>(pool, operator_id,
tnode, descs,
+
require_bucket_distribution);
}
Status PartitionedAggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState*
state) {
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
index 016869374bd..d79ba6fd3d4 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
@@ -289,7 +289,7 @@ public:
class PartitionedAggSinkOperatorX : public
DataSinkOperatorX<PartitionedAggSinkLocalState> {
public:
PartitionedAggSinkOperatorX(ObjectPool* pool, int operator_id, const
TPlanNode& tnode,
- const DescriptorTbl& descs);
+ const DescriptorTbl& descs, bool
require_bucket_distribution);
~PartitionedAggSinkOperatorX() override = default;
Status init(const TDataSink& tsink) override {
return Status::InternalError("{} should not init with TPlanNode",
@@ -308,6 +308,10 @@ public:
return _agg_sink_operator->required_data_distribution();
}
+ bool require_data_distribution() const override {
+ return _agg_sink_operator->require_data_distribution();
+ }
+
Status set_child(OperatorXPtr child) override {
RETURN_IF_ERROR(DataSinkOperatorX<PartitionedAggSinkLocalState>::set_child(child));
return _agg_sink_operator->set_child(child);
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index db20efda67e..b10c514b2f4 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -182,6 +182,9 @@ public:
_inner_sink_operator = sink_operator;
_inner_probe_operator = probe_operator;
}
+ bool require_data_distribution() const override {
+ return _inner_probe_operator->require_data_distribution();
+ }
private:
Status _revoke_memory(RuntimeState* state);
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index 82fe5eacd94..2fae1f15bfa 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -129,6 +129,10 @@ public:
_inner_probe_operator = probe_operator;
}
+ bool require_data_distribution() const override {
+ return _inner_probe_operator->require_data_distribution();
+ }
+
private:
friend class PartitionedHashJoinSinkLocalState;
diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp
b/be/src/pipeline/exec/sort_sink_operator.cpp
index d89e54614d1..61c35427e57 100644
--- a/be/src/pipeline/exec/sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/sort_sink_operator.cpp
@@ -75,7 +75,7 @@ Status SortSinkLocalState::open(RuntimeState* state) {
}
SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, int operator_id, const
TPlanNode& tnode,
- const DescriptorTbl& descs)
+ const DescriptorTbl& descs, bool
require_bucket_distribution)
: DataSinkOperatorX(operator_id, tnode.node_id),
_offset(tnode.sort_node.__isset.offset ? tnode.sort_node.offset : 0),
_pool(pool),
@@ -85,7 +85,8 @@ SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, int
operator_id, const TP
_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples),
_use_two_phase_read(tnode.sort_node.sort_info.use_two_phase_read),
_merge_by_exchange(tnode.sort_node.merge_by_exchange),
- _is_colocate(tnode.sort_node.__isset.is_colocate ?
tnode.sort_node.is_colocate : false),
+ _is_colocate(tnode.sort_node.__isset.is_colocate &&
tnode.sort_node.is_colocate),
+ _require_bucket_distribution(require_bucket_distribution),
_is_analytic_sort(tnode.sort_node.__isset.is_analytic_sort
? tnode.sort_node.is_analytic_sort
: false),
diff --git a/be/src/pipeline/exec/sort_sink_operator.h
b/be/src/pipeline/exec/sort_sink_operator.h
index ad9c23401b4..f29d9bbde09 100644
--- a/be/src/pipeline/exec/sort_sink_operator.h
+++ b/be/src/pipeline/exec/sort_sink_operator.h
@@ -74,7 +74,7 @@ private:
class SortSinkOperatorX final : public DataSinkOperatorX<SortSinkLocalState> {
public:
SortSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode&
tnode,
- const DescriptorTbl& descs);
+ const DescriptorTbl& descs, bool
require_bucket_distribution);
Status init(const TDataSink& tsink) override {
return Status::InternalError("{} should not init with TPlanNode",
DataSinkOperatorX<SortSinkLocalState>::_name);
@@ -87,7 +87,7 @@ public:
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos)
override;
DataDistribution required_data_distribution() const override {
if (_is_analytic_sort) {
- return _is_colocate
+ return _is_colocate && _require_bucket_distribution
?
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE,
_partition_exprs);
} else if (_merge_by_exchange) {
@@ -96,6 +96,7 @@ public:
}
return
DataSinkOperatorX<SortSinkLocalState>::required_data_distribution();
}
+ bool require_data_distribution() const override { return _is_colocate; }
bool is_full_sort() const { return _algorithm == SortAlgorithm::FULL_SORT;
}
@@ -128,6 +129,7 @@ private:
const bool _use_two_phase_read;
const bool _merge_by_exchange;
const bool _is_colocate = false;
+ const bool _require_bucket_distribution = false;
const bool _is_analytic_sort = false;
const std::vector<TExpr> _partition_exprs;
};
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index dfda2ff61e1..92cd1f542d8 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -112,9 +112,11 @@ Status
SpillSortSinkLocalState::setup_in_memory_sort_op(RuntimeState* state) {
}
SpillSortSinkOperatorX::SpillSortSinkOperatorX(ObjectPool* pool, int
operator_id,
- const TPlanNode& tnode, const
DescriptorTbl& descs)
+ const TPlanNode& tnode, const
DescriptorTbl& descs,
+ bool
require_bucket_distribution)
: DataSinkOperatorX(operator_id, tnode.node_id) {
- _sort_sink_operator = std::make_unique<SortSinkOperatorX>(pool,
operator_id, tnode, descs);
+ _sort_sink_operator = std::make_unique<SortSinkOperatorX>(pool,
operator_id, tnode, descs,
+
require_bucket_distribution);
}
Status SpillSortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState*
state) {
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h
b/be/src/pipeline/exec/spill_sort_sink_operator.h
index 9382edd6933..fae5fe3270f 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.h
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.h
@@ -64,7 +64,7 @@ class SpillSortSinkOperatorX final : public
DataSinkOperatorX<SpillSortSinkLocal
public:
using LocalStateType = SpillSortSinkLocalState;
SpillSortSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode&
tnode,
- const DescriptorTbl& descs);
+ const DescriptorTbl& descs, bool
require_bucket_distribution);
Status init(const TDataSink& tsink) override {
return Status::InternalError("{} should not init with TPlanNode",
DataSinkOperatorX<SpillSortSinkLocalState>::_name);
@@ -78,6 +78,9 @@ public:
DataDistribution required_data_distribution() const override {
return _sort_sink_operator->required_data_distribution();
}
+ bool require_data_distribution() const override {
+ return _sort_sink_operator->require_data_distribution();
+ }
Status set_child(OperatorXPtr child) override {
RETURN_IF_ERROR(DataSinkOperatorX<SpillSortSinkLocalState>::set_child(child));
return _sort_sink_operator->set_child(child);
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 7d90cebc8d2..18eb9582a4b 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -1042,8 +1042,11 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
request.query_options.__isset.enable_distinct_streaming_aggregation &&
request.query_options.enable_distinct_streaming_aggregation &&
!tnode.agg_node.grouping_exprs.empty()) {
- op.reset(new DistinctStreamingAggOperatorX(pool,
next_operator_id(), tnode, descs));
+ op.reset(new DistinctStreamingAggOperatorX(pool,
next_operator_id(), tnode, descs,
+
_require_bucket_distribution));
RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ _require_bucket_distribution =
+ _require_bucket_distribution ||
op->require_data_distribution();
} else if (tnode.agg_node.__isset.use_streaming_preaggregation &&
tnode.agg_node.use_streaming_preaggregation &&
!tnode.agg_node.grouping_exprs.empty()) {
@@ -1067,14 +1070,18 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
DataSinkOperatorXPtr sink;
if (_runtime_state->enable_agg_spill() &&
!tnode.agg_node.grouping_exprs.empty()) {
sink.reset(new PartitionedAggSinkOperatorX(pool,
next_sink_operator_id(), tnode,
- descs));
+ descs,
_require_bucket_distribution));
} else {
- sink.reset(new AggSinkOperatorX(pool, next_sink_operator_id(),
tnode, descs));
+ sink.reset(new AggSinkOperatorX(pool, next_sink_operator_id(),
tnode, descs,
+ _require_bucket_distribution));
}
+ _require_bucket_distribution =
+ _require_bucket_distribution ||
sink->require_data_distribution();
sink->set_dests_id({op->operator_id()});
RETURN_IF_ERROR(cur_pipe->set_sink(sink));
RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode,
_runtime_state.get()));
}
+ _require_bucket_distribution = true;
break;
}
case TPlanNodeType::HASH_JOIN_NODE: {
@@ -1139,6 +1146,8 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
_pipeline_parent_map.push(op->node_id(), cur_pipe);
_pipeline_parent_map.push(op->node_id(), build_side_pipe);
}
+ _require_bucket_distribution =
+ _require_bucket_distribution ||
op->require_data_distribution();
break;
}
case TPlanNodeType::CROSS_JOIN_NODE: {
@@ -1201,10 +1210,14 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
DataSinkOperatorXPtr sink;
if (_runtime_state->enable_sort_spill()) {
- sink.reset(new SpillSortSinkOperatorX(pool,
next_sink_operator_id(), tnode, descs));
+ sink.reset(new SpillSortSinkOperatorX(pool,
next_sink_operator_id(), tnode, descs,
+
_require_bucket_distribution));
} else {
- sink.reset(new SortSinkOperatorX(pool, next_sink_operator_id(),
tnode, descs));
+ sink.reset(new SortSinkOperatorX(pool, next_sink_operator_id(),
tnode, descs,
+ _require_bucket_distribution));
}
+ _require_bucket_distribution =
+ _require_bucket_distribution ||
sink->require_data_distribution();
sink->set_dests_id({op->operator_id()});
RETURN_IF_ERROR(cur_pipe->set_sink(sink));
RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get()));
@@ -1240,7 +1253,10 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
_dag[downstream_pipeline_id].push_back(cur_pipe->id());
DataSinkOperatorXPtr sink;
- sink.reset(new AnalyticSinkOperatorX(pool, next_sink_operator_id(),
tnode, descs));
+ sink.reset(new AnalyticSinkOperatorX(pool, next_sink_operator_id(),
tnode, descs,
+ _require_bucket_distribution));
+ _require_bucket_distribution =
+ _require_bucket_distribution ||
sink->require_data_distribution();
sink->set_dests_id({op->operator_id()});
RETURN_IF_ERROR(cur_pipe->set_sink(sink));
RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get()));
@@ -1279,6 +1295,10 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
case TPlanNodeType::DATA_GEN_SCAN_NODE: {
op.reset(new DataGenSourceOperatorX(pool, tnode, next_operator_id(),
descs));
RETURN_IF_ERROR(cur_pipe->add_operator(op));
+ if (request.__isset.parallel_instances) {
+ cur_pipe->set_num_tasks(request.parallel_instances);
+ op->set_ignore_data_distribution();
+ }
break;
}
case TPlanNodeType::SCHEMA_SCAN_NODE: {
@@ -1301,6 +1321,8 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
print_plan_node_type(tnode.node_type));
}
+ _require_bucket_distribution = true;
+
return Status::OK();
}
// NOLINTEND(readability-function-cognitive-complexity)
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
index 55866400374..14e4b05d7e8 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
@@ -235,6 +235,8 @@ private:
// Total instance num running on all BEs
int _total_instances = -1;
+
+ bool _require_bucket_distribution = false;
};
} // namespace pipeline
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java
index 14a50160d63..f4e6dc93130 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java
@@ -22,6 +22,7 @@ import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.common.NereidsException;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalScanNode;
+import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.tablefunction.DataGenTableValuedFunction;
import org.apache.doris.tablefunction.TableValuedFunctionTask;
@@ -116,6 +117,14 @@ public class DataGenScanNode extends ExternalScanNode {
// by multi-processes or multi-threads. So we assign instance number to 1.
@Override
public int getNumInstances() {
+ if
(ConnectContext.get().getSessionVariable().isIgnoreStorageDataDistribution()) {
+ return
ConnectContext.get().getSessionVariable().getParallelExecInstanceNum();
+ }
+ return 1;
+ }
+
+ @Override
+ public int getScanRangeNum() {
return 1;
}
diff --git a/regression-test/pipeline/p1/conf/regression-conf.groovy
b/regression-test/pipeline/p1/conf/regression-conf.groovy
index 8f8458e47a6..2a0156e16b4 100644
--- a/regression-test/pipeline/p1/conf/regression-conf.groovy
+++ b/regression-test/pipeline/p1/conf/regression-conf.groovy
@@ -60,6 +60,7 @@ excludeSuites = "000_the_start_sentinel_do_not_touch," + //
keep this line as th
"test_profile," +
"test_refresh_mtmv," +
"test_spark_load," +
+ "test_iot_auto_detect_concurrent," +
"zzz_the_end_sentinel_do_not_touch" // keep this line as the last line
// this dir will not be executed
diff --git a/regression-test/suites/correctness_p0/test_assert_row_num.groovy
b/regression-test/suites/correctness_p0/test_assert_row_num.groovy
index 818213f56fe..68e9740a321 100644
--- a/regression-test/suites/correctness_p0/test_assert_row_num.groovy
+++ b/regression-test/suites/correctness_p0/test_assert_row_num.groovy
@@ -21,7 +21,7 @@ suite("test_assert_num_rows") {
"""
qt_sql_2 """
- SELECT * from numbers("number"="10") WHERE ( SELECT * FROM (SELECT 3)
__DORIS_DUAL__ ) IS NOT NULL
+ SELECT * from numbers("number"="10") WHERE ( SELECT * FROM (SELECT 3)
__DORIS_DUAL__ ) IS NOT NULL ORDER BY number
"""
sql """
DROP TABLE IF EXISTS table_9_undef_undef;
diff --git a/regression-test/suites/external_table_p0/tvf/test_numbers.groovy
b/regression-test/suites/external_table_p0/tvf/test_numbers.groovy
index 6f0f74f6433..c0f2cafa403 100644
--- a/regression-test/suites/external_table_p0/tvf/test_numbers.groovy
+++ b/regression-test/suites/external_table_p0/tvf/test_numbers.groovy
@@ -39,17 +39,17 @@
order_qt_inner_join1 """
select a.number as num1, b.number as num2
from numbers("number" = "10") a inner join
numbers("number" = "10") b
- on a.number=b.number;
+ on a.number=b.number ORDER BY a.number,b.number;
"""
order_qt_inner_join2 """
select a.number as num1, b.number as num2
from numbers("number" = "6") a inner join numbers("number"
= "6") b
- on a.number>b.number;
+ on a.number>b.number ORDER BY a.number,b.number;
"""
order_qt_inner_join3 """
select a.number as num1, b.number as num2
from numbers("number" = "10") a inner join
numbers("number" = "10") b
- on a.number=b.number and b.number%2 = 0;
+ on a.number=b.number and b.number%2 = 0 ORDER BY
a.number,b.number;
"""
order_qt_left_join """
select a.number as num1, b.number as num2
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]