This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 421ab56c3e0 [pipelineX](improvement) Support local shuffle for join
and agg (#27852)
421ab56c3e0 is described below
commit 421ab56c3e00799f6b8259f1adc9f5dd1a7c2be8
Author: Gabriel <[email protected]>
AuthorDate: Sat Dec 2 20:17:18 2023 +0800
[pipelineX](improvement) Support local shuffle for join and agg (#27852)
---
be/src/pipeline/exec/aggregation_sink_operator.cpp | 3 +-
be/src/pipeline/exec/aggregation_sink_operator.h | 10 ++
.../distinct_streaming_aggregation_sink_operator.h | 1 +
be/src/pipeline/exec/exchange_sink_operator.cpp | 2 +-
be/src/pipeline/exec/exchange_sink_operator.h | 2 +-
be/src/pipeline/exec/exchange_source_operator.cpp | 5 +
be/src/pipeline/exec/exchange_source_operator.h | 6 +-
be/src/pipeline/exec/file_scan_operator.cpp | 5 +
be/src/pipeline/exec/file_scan_operator.h | 7 +-
be/src/pipeline/exec/hashjoin_build_sink.cpp | 1 +
be/src/pipeline/exec/hashjoin_build_sink.h | 9 +
be/src/pipeline/exec/hashjoin_probe_operator.cpp | 3 +
be/src/pipeline/exec/hashjoin_probe_operator.h | 9 +
be/src/pipeline/exec/jdbc_scan_operator.cpp | 5 +
be/src/pipeline/exec/jdbc_scan_operator.h | 2 +
be/src/pipeline/exec/join_probe_operator.h | 4 +-
.../pipeline/exec/multi_cast_data_stream_sink.cpp | 2 +-
be/src/pipeline/exec/multi_cast_data_stream_sink.h | 2 +-
be/src/pipeline/exec/olap_scan_operator.cpp | 2 +-
be/src/pipeline/exec/olap_scan_operator.h | 7 +-
be/src/pipeline/exec/operator.h | 2 +-
.../exec/streaming_aggregation_sink_operator.h | 1 +
be/src/pipeline/pipeline.h | 1 +
be/src/pipeline/pipeline_fragment_context.cpp | 9 +
be/src/pipeline/pipeline_fragment_context.h | 2 +
be/src/pipeline/pipeline_x/dependency.h | 19 +++
.../local_exchange/local_exchange_sink_operator.h | 14 +-
.../local_exchange_source_operator.cpp | 3 -
.../local_exchange_source_operator.h | 20 +--
.../pipeline_x/local_exchange/local_exchanger.cpp | 10 +-
.../pipeline_x/local_exchange/local_exchanger.h | 7 +-
be/src/pipeline/pipeline_x/operator.cpp | 9 +-
be/src/pipeline/pipeline_x/operator.h | 21 ++-
.../pipeline_x/pipeline_x_fragment_context.cpp | 184 +++++++++++++--------
.../pipeline_x/pipeline_x_fragment_context.h | 6 +-
be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 13 +-
be/src/pipeline/pipeline_x/pipeline_x_task.h | 5 +-
be/src/runtime/fragment_mgr.cpp | 2 +
.../glue/translator/PhysicalPlanTranslator.java | 1 +
.../org/apache/doris/planner/ExchangeNode.java | 7 +
gensrc/thrift/PlanNodes.thrift | 2 +
41 files changed, 281 insertions(+), 144 deletions(-)
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index 49cbe30a3b2..4d6f9636de7 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -730,7 +730,8 @@
AggSinkOperatorX<LocalStateType>::AggSinkOperatorX(ObjectPool* pool, int operato
_pool(pool),
_limit(tnode.limit),
_have_conjuncts(tnode.__isset.vconjunct &&
!tnode.vconjunct.nodes.empty()),
- _is_streaming(is_streaming) {
+ _is_streaming(is_streaming),
+ _partition_exprs(tnode.agg_node.grouping_exprs) {
_is_first_phase = tnode.agg_node.__isset.is_first_phase &&
tnode.agg_node.is_first_phase;
}
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h
b/be/src/pipeline/exec/aggregation_sink_operator.h
index 7c5483e1e17..232a61d50f1 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -366,6 +366,14 @@ public:
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;
+ std::vector<TExpr> get_local_shuffle_exprs() const override { return
_partition_exprs; }
+ ExchangeType get_local_exchange_type() const override {
+ if (_probe_expr_ctxs.empty()) {
+ return _needs_finalize ? ExchangeType::PASSTHROUGH :
ExchangeType::NOOP;
+ }
+ return ExchangeType::SHUFFLE;
+ }
+
using DataSinkOperatorX<LocalStateType>::id;
using DataSinkOperatorX<LocalStateType>::operator_id;
using DataSinkOperatorX<LocalStateType>::get_local_state;
@@ -405,6 +413,8 @@ protected:
int64_t _limit; // -1: no limit
bool _have_conjuncts;
const bool _is_streaming;
+
+ const std::vector<TExpr> _partition_exprs;
};
} // namespace pipeline
diff --git
a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h
b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h
index b30a8298727..c04e35448d1 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h
@@ -110,6 +110,7 @@ public:
Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;
+ ExchangeType get_local_exchange_type() const override { return
ExchangeType::PASSTHROUGH; }
};
} // namespace pipeline
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 20d612d5785..8065ae3082a 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -242,7 +242,7 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
return Status::OK();
}
-std::string ExchangeSinkLocalState::id_name() {
+std::string ExchangeSinkLocalState::name_suffix() {
std::string name = " (id=" + std::to_string(_parent->node_id());
auto& p = _parent->cast<ExchangeSinkOperatorX>();
name += ",dest_id=" + std::to_string(p._dest_node_id);
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h
b/be/src/pipeline/exec/exchange_sink_operator.h
index 048c8d3910c..32a61eda334 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -184,7 +184,7 @@ public:
[[nodiscard]] int sender_id() const { return _sender_id; }
- std::string id_name() override;
+ std::string name_suffix() override;
segment_v2::CompressionTypePB& compression_type();
std::string debug_string(int indentation_level) const override;
diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp
b/be/src/pipeline/exec/exchange_source_operator.cpp
index 1c766f06a82..f64ebcddcac 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -110,6 +110,11 @@
ExchangeSourceOperatorX::ExchangeSourceOperatorX(ObjectPool* pool, const TPlanNo
: OperatorX<ExchangeLocalState>(pool, tnode, operator_id, descs),
_num_senders(num_senders),
_is_merging(tnode.exchange_node.__isset.sort_info),
+ _is_hash_partition(
+ tnode.exchange_node.__isset.partition_type &&
+ (tnode.exchange_node.partition_type ==
TPartitionType::HASH_PARTITIONED ||
+ tnode.exchange_node.partition_type ==
+ TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED)),
_input_row_desc(descs, tnode.exchange_node.input_row_tuples,
std::vector<bool>(tnode.nullable_tuples.begin(),
tnode.nullable_tuples.begin() +
diff --git a/be/src/pipeline/exec/exchange_source_operator.h
b/be/src/pipeline/exec/exchange_source_operator.h
index 479a8799058..633a9c3e29a 100644
--- a/be/src/pipeline/exec/exchange_source_operator.h
+++ b/be/src/pipeline/exec/exchange_source_operator.h
@@ -104,15 +104,13 @@ public:
return _sub_plan_query_statistics_recvr;
}
- bool need_to_local_shuffle() const override {
- // TODO(gabriel):
- return false;
- }
+ bool need_to_local_shuffle() const override { return !_is_hash_partition; }
private:
friend class ExchangeLocalState;
const int _num_senders;
const bool _is_merging;
+ const bool _is_hash_partition;
RowDescriptor _input_row_desc;
std::shared_ptr<QueryStatisticsRecvr> _sub_plan_query_statistics_recvr;
diff --git a/be/src/pipeline/exec/file_scan_operator.cpp
b/be/src/pipeline/exec/file_scan_operator.cpp
index 819575211c9..f34346c9063 100644
--- a/be/src/pipeline/exec/file_scan_operator.cpp
+++ b/be/src/pipeline/exec/file_scan_operator.cpp
@@ -53,6 +53,11 @@ Status
FileScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
return Status::OK();
}
+std::string FileScanLocalState::name_suffix() const {
+ return fmt::format(" (id={}. table name = {})",
std::to_string(_parent->node_id()),
+ _parent->cast<FileScanOperatorX>()._table_name);
+}
+
void FileScanLocalState::set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>&
scan_ranges) {
int max_scanners =
diff --git a/be/src/pipeline/exec/file_scan_operator.h
b/be/src/pipeline/exec/file_scan_operator.h
index 4648ed716a0..6ae3344ed71 100644
--- a/be/src/pipeline/exec/file_scan_operator.h
+++ b/be/src/pipeline/exec/file_scan_operator.h
@@ -54,6 +54,7 @@ public:
void set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges)
override;
int parent_id() { return _parent->node_id(); }
+ std::string name_suffix() const override;
private:
std::vector<TScanRangeParams> _scan_ranges;
@@ -70,7 +71,9 @@ class FileScanOperatorX final : public
ScanOperatorX<FileScanLocalState> {
public:
FileScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int
operator_id,
const DescriptorTbl& descs)
- : ScanOperatorX<FileScanLocalState>(pool, tnode, operator_id,
descs) {
+ : ScanOperatorX<FileScanLocalState>(pool, tnode, operator_id,
descs),
+ _table_name(tnode.file_scan_node.__isset.table_name ?
tnode.file_scan_node.table_name
+ : "") {
_output_tuple_id = tnode.file_scan_node.tuple_id;
}
@@ -78,6 +81,8 @@ public:
private:
friend class FileScanLocalState;
+
+ const std::string _table_name;
};
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 1e31014512e..92c61882e2a 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -414,6 +414,7 @@ Status HashJoinBuildSinkOperatorX::init(const TPlanNode&
tnode, RuntimeState* st
for (const auto& eq_join_conjunct : eq_join_conjuncts) {
vectorized::VExprContextSPtr ctx;
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(eq_join_conjunct.right,
ctx));
+ _partition_exprs.push_back(eq_join_conjunct.right);
_build_expr_ctxs.push_back(ctx);
const auto vexpr = _build_expr_ctxs.back()->root();
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h
b/be/src/pipeline/exec/hashjoin_build_sink.h
index b45c2eed752..cc34f46d4dd 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -155,6 +155,14 @@ public:
._should_build_hash_table;
}
+ std::vector<TExpr> get_local_shuffle_exprs() const override { return
_partition_exprs; }
+ ExchangeType get_local_exchange_type() const override {
+ if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
_is_broadcast_join) {
+ return ExchangeType::NOOP;
+ }
+ return ExchangeType::SHUFFLE;
+ }
+
private:
friend class HashJoinBuildSinkLocalState;
@@ -171,6 +179,7 @@ private:
vectorized::SharedHashTableContextPtr _shared_hash_table_context = nullptr;
std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
+ std::vector<TExpr> _partition_exprs;
};
} // namespace pipeline
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
index 174d102993d..74304bf5351 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
@@ -222,6 +222,8 @@ void HashJoinProbeLocalState::_prepare_probe_block() {
HashJoinProbeOperatorX::HashJoinProbeOperatorX(ObjectPool* pool, const
TPlanNode& tnode,
int operator_id, const
DescriptorTbl& descs)
: JoinProbeOperatorX<HashJoinProbeLocalState>(pool, tnode,
operator_id, descs),
+ _is_broadcast_join(tnode.hash_join_node.__isset.is_broadcast_join &&
+ tnode.hash_join_node.is_broadcast_join),
_hash_output_slot_ids(tnode.hash_join_node.__isset.hash_output_slot_ids
?
tnode.hash_join_node.hash_output_slot_ids
: std::vector<SlotId> {}) {}
@@ -549,6 +551,7 @@ Status HashJoinProbeOperatorX::init(const TPlanNode& tnode,
RuntimeState* state)
for (const auto& eq_join_conjunct : eq_join_conjuncts) {
vectorized::VExprContextSPtr ctx;
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(eq_join_conjunct.left,
ctx));
+ _partition_exprs.push_back(eq_join_conjunct.left);
_probe_expr_ctxs.push_back(ctx);
bool null_aware = eq_join_conjunct.__isset.opcode &&
eq_join_conjunct.opcode == TExprOpcode::EQ_FOR_NULL;
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index 4de50474bfb..263b4d4252b 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -163,6 +163,13 @@ public:
SourceState& source_state) const override;
bool need_more_input_data(RuntimeState* state) const override;
+ std::vector<TExpr> get_local_shuffle_exprs() const override { return
_partition_exprs; }
+ ExchangeType get_local_exchange_type() const override {
+ if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
+ return ExchangeType::NOOP;
+ }
+ return _is_broadcast_join ? ExchangeType::PASSTHROUGH :
ExchangeType::SHUFFLE;
+ }
private:
Status _do_evaluate(vectorized::Block& block,
vectorized::VExprContextSPtrs& exprs,
@@ -170,6 +177,7 @@ private:
std::vector<int>& res_col_ids) const;
friend class HashJoinProbeLocalState;
+ const bool _is_broadcast_join;
// other expr
vectorized::VExprContextSPtrs _other_join_conjuncts;
// probe expr
@@ -182,6 +190,7 @@ private:
std::vector<bool> _left_output_slot_flags;
std::vector<bool> _right_output_slot_flags;
std::vector<std::string> _right_table_column_names;
+ std::vector<TExpr> _partition_exprs;
};
} // namespace pipeline
diff --git a/be/src/pipeline/exec/jdbc_scan_operator.cpp
b/be/src/pipeline/exec/jdbc_scan_operator.cpp
index 1f03939df15..74890f647fc 100644
--- a/be/src/pipeline/exec/jdbc_scan_operator.cpp
+++ b/be/src/pipeline/exec/jdbc_scan_operator.cpp
@@ -22,6 +22,11 @@
namespace doris::pipeline {
+std::string JDBCScanLocalState::name_suffix() const {
+ return fmt::format(" (id={}. table name = {})",
std::to_string(_parent->node_id()),
+ _parent->cast<JDBCScanOperatorX>()._table_name);
+}
+
Status JDBCScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>*
scanners) {
auto& p = _parent->cast<JDBCScanOperatorX>();
std::unique_ptr<vectorized::NewJdbcScanner> scanner =
vectorized::NewJdbcScanner::create_unique(
diff --git a/be/src/pipeline/exec/jdbc_scan_operator.h
b/be/src/pipeline/exec/jdbc_scan_operator.h
index bf954e25dfa..2acf5b5ec9d 100644
--- a/be/src/pipeline/exec/jdbc_scan_operator.h
+++ b/be/src/pipeline/exec/jdbc_scan_operator.h
@@ -45,6 +45,8 @@ public:
: ScanLocalState<JDBCScanLocalState>(state, parent) {}
Status _init_scanners(std::list<vectorized::VScannerSPtr>* scanners)
override;
+ std::string name_suffix() const override;
+
private:
friend class vectorized::NewJdbcScanner;
};
diff --git a/be/src/pipeline/exec/join_probe_operator.h
b/be/src/pipeline/exec/join_probe_operator.h
index 12d89c1049e..6a947c5f6b1 100644
--- a/be/src/pipeline/exec/join_probe_operator.h
+++ b/be/src/pipeline/exec/join_probe_operator.h
@@ -84,7 +84,7 @@ public:
}
Status set_child(OperatorXPtr child) override {
- if (OperatorX<LocalStateType>::_child_x) {
+ if (OperatorX<LocalStateType>::_child_x && _build_side_child ==
nullptr) {
// when there already (probe) child, others is build child.
set_build_side_child(child);
} else {
@@ -113,7 +113,7 @@ protected:
std::unique_ptr<RowDescriptor> _intermediate_row_desc;
// output expr
vectorized::VExprContextSPtrs _output_expr_ctxs;
- OperatorXPtr _build_side_child;
+ OperatorXPtr _build_side_child = nullptr;
const bool _short_circuit_for_null_in_build_side;
};
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp
b/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp
index ad6b3c8cdda..8a45634027f 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp
@@ -25,7 +25,7 @@ OperatorPtr
MultiCastDataStreamSinkOperatorBuilder::build_operator() {
return std::make_shared<MultiCastDataStreamSinkOperator>(this, _sink);
}
-std::string MultiCastDataStreamSinkLocalState::id_name() {
+std::string MultiCastDataStreamSinkLocalState::name_suffix() {
auto& sinks =
static_cast<MultiCastDataStreamSinkOperatorX*>(_parent)->sink_node().sinks;
std::string id_name = " (dst id : ";
for (auto& sink : sinks) {
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_sink.h
b/be/src/pipeline/exec/multi_cast_data_stream_sink.h
index a2ad07e5297..a30361ebe14 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_sink.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_sink.h
@@ -60,7 +60,7 @@ class MultiCastDataStreamSinkLocalState final
using Base = PipelineXSinkLocalState<MultiCastSinkDependency>;
using Parent = MultiCastDataStreamSinkOperatorX;
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
- std::string id_name() override;
+ std::string name_suffix() override;
private:
std::shared_ptr<pipeline::MultiCastDataStreamer> _multi_cast_data_streamer;
diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp
b/be/src/pipeline/exec/olap_scan_operator.cpp
index 2f22daa5190..3f0cc369427 100644
--- a/be/src/pipeline/exec/olap_scan_operator.cpp
+++ b/be/src/pipeline/exec/olap_scan_operator.cpp
@@ -298,7 +298,7 @@ Status
OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
return Status::OK();
}
-TOlapScanNode& OlapScanLocalState::olap_scan_node() {
+TOlapScanNode& OlapScanLocalState::olap_scan_node() const {
return _parent->cast<OlapScanOperatorX>()._olap_scan_node;
}
diff --git a/be/src/pipeline/exec/olap_scan_operator.h
b/be/src/pipeline/exec/olap_scan_operator.h
index 1f0fac55a43..868d3efe555 100644
--- a/be/src/pipeline/exec/olap_scan_operator.h
+++ b/be/src/pipeline/exec/olap_scan_operator.h
@@ -45,7 +45,12 @@ public:
OlapScanLocalState(RuntimeState* state, OperatorXBase* parent)
: ScanLocalState(state, parent) {}
- TOlapScanNode& olap_scan_node();
+ TOlapScanNode& olap_scan_node() const;
+
+ std::string name_suffix() const override {
+ return fmt::format(" (id={}. table name = {})",
std::to_string(_parent->node_id()),
+ olap_scan_node().table_name);
+ }
private:
friend class vectorized::NewOlapScanner;
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index a006cda9436..2beea932a8b 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -269,7 +269,7 @@ protected:
OperatorPtr _child;
// Used on pipeline X
- OperatorXPtr _child_x;
+ OperatorXPtr _child_x = nullptr;
bool _is_closed;
};
diff --git a/be/src/pipeline/exec/streaming_aggregation_sink_operator.h
b/be/src/pipeline/exec/streaming_aggregation_sink_operator.h
index 182946188eb..b0327e71cd1 100644
--- a/be/src/pipeline/exec/streaming_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/streaming_aggregation_sink_operator.h
@@ -120,6 +120,7 @@ public:
Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;
+ ExchangeType get_local_exchange_type() const override { return
ExchangeType::PASSTHROUGH; }
};
} // namespace pipeline
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index a0b6de5c62e..68213fe9c27 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -136,6 +136,7 @@ public:
std::vector<std::shared_ptr<Pipeline>>& children() { return _children; }
void set_children(std::shared_ptr<Pipeline> child) {
_children.push_back(child); }
+ void set_children(std::vector<std::shared_ptr<Pipeline>> children) {
_children = children; }
private:
void _init_profile();
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index ce3d943af70..5812da5b7f5 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -194,6 +194,15 @@ PipelinePtr PipelineFragmentContext::add_pipeline() {
return pipeline;
}
+PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent) {
+ // _prepared、_submitted, _canceled should do not add pipeline
+ PipelineId id = _next_pipeline_id++;
+ auto pipeline = std::make_shared<Pipeline>(id, weak_from_this());
+ _pipelines.emplace_back(pipeline);
+ parent->set_children(pipeline);
+ return pipeline;
+}
+
Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams&
request,
const size_t idx) {
if (_prepared) {
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index f38996a161f..480e4332d44 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -71,6 +71,8 @@ public:
PipelinePtr add_pipeline();
+ PipelinePtr add_pipeline(PipelinePtr parent);
+
TUniqueId get_fragment_instance_id() const { return _fragment_instance_id;
}
virtual RuntimeState* get_runtime_state(UniqueId /*fragment_instance_id*/)
{
diff --git a/be/src/pipeline/pipeline_x/dependency.h
b/be/src/pipeline/pipeline_x/dependency.h
index 49dc327c796..23ce13a1db6 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -574,6 +574,25 @@ public:
}
};
+enum class ExchangeType : uint8_t {
+ NOOP = 0,
+ SHUFFLE = 1,
+ PASSTHROUGH = 2,
+};
+
+inline std::string get_exchange_type_name(ExchangeType idx) {
+ switch (idx) {
+ case ExchangeType::NOOP:
+ return "NOOP";
+ case ExchangeType::SHUFFLE:
+ return "SHUFFLE";
+ case ExchangeType::PASSTHROUGH:
+ return "PASSTHROUGH";
+ }
+ LOG(FATAL) << "__builtin_unreachable";
+ __builtin_unreachable();
+}
+
class Exchanger;
struct LocalExchangeSharedState : public BasicSharedState {
diff --git
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
index c5fe9dc7dc4..f5f1d0f48a7 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
@@ -89,10 +89,10 @@ public:
return Status::InternalError("{} should not init with TPlanNode",
Base::_name);
}
- Status init(bool need_partitioner) override {
- _name = "LOCAL_EXCHANGE_SINK_OPERATOR";
- _need_partitioner = need_partitioner;
- if (_need_partitioner) {
+ Status init(ExchangeType type) override {
+ _name = "LOCAL_EXCHANGE_SINK_OPERATOR (" +
get_exchange_type_name(type) + ")";
+ _type = type;
+ if (_type == ExchangeType::SHUFFLE) {
_partitioner.reset(
new
vectorized::Crc32HashPartitioner<LocalExchangeChannelIds>(_num_partitions));
RETURN_IF_ERROR(_partitioner->init(_texprs));
@@ -102,7 +102,7 @@ public:
}
Status prepare(RuntimeState* state) override {
- if (_need_partitioner) {
+ if (_type == ExchangeType::SHUFFLE) {
RETURN_IF_ERROR(_partitioner->prepare(state,
_child_x->row_desc()));
}
@@ -110,7 +110,7 @@ public:
}
Status open(RuntimeState* state) override {
- if (_need_partitioner) {
+ if (_type == ExchangeType::SHUFFLE) {
RETURN_IF_ERROR(_partitioner->open(state));
}
@@ -122,7 +122,7 @@ public:
private:
friend class LocalExchangeSinkLocalState;
- bool _need_partitioner;
+ ExchangeType _type;
const int _num_partitions;
const std::vector<TExpr>& _texprs;
std::unique_ptr<vectorized::PartitionerBase> _partitioner;
diff --git
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
index dd64852891f..4467b32db4e 100644
---
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
+++
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
@@ -36,9 +36,6 @@ Status LocalExchangeSourceLocalState::init(RuntimeState*
state, LocalStateInfo&
RETURN_IF_ERROR(Base::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
- _dependency->set_shared_state(info.local_exchange_state);
- _shared_state =
(LocalExchangeSharedState*)_dependency->shared_state().get();
- DCHECK(_shared_state != nullptr);
_channel_id = info.task_idx;
_shared_state->set_dep_by_channel_id(_dependency, _channel_id);
_exchanger = _shared_state->exchanger.get();
diff --git
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
index d94b9041fce..9a44ce9678c 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
@@ -60,10 +60,9 @@ private:
class LocalExchangeSourceOperatorX final : public
OperatorX<LocalExchangeSourceLocalState> {
public:
using Base = OperatorX<LocalExchangeSourceLocalState>;
- LocalExchangeSourceOperatorX(ObjectPool* pool, int id, OperatorXBase*
parent)
- : Base(pool, -1, id), _parent(parent) {}
- Status init(const TPlanNode& tnode, RuntimeState* state) override {
- _op_name = "LOCAL_EXCHANGE_OPERATOR";
+ LocalExchangeSourceOperatorX(ObjectPool* pool, int id) : Base(pool, -1,
id) {}
+ Status init(ExchangeType type) override {
+ _op_name = "LOCAL_EXCHANGE_OPERATOR (" + get_exchange_type_name(type)
+ ")";
return Status::OK();
}
Status prepare(RuntimeState* state) override { return Status::OK(); }
@@ -79,21 +78,8 @@ public:
bool is_source() const override { return true; }
- Status set_child(OperatorXPtr child) override {
- if (_child_x) {
- // Set build side child for join probe operator
- DCHECK(_parent != nullptr);
- RETURN_IF_ERROR(_parent->set_child(child));
- } else {
- _child_x = std::move(child);
- }
- return Status::OK();
- }
-
private:
friend class LocalExchangeSourceLocalState;
-
- OperatorXBase* _parent = nullptr;
};
} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
index 616b469a99f..13a3f232222 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
@@ -115,8 +115,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state,
const uint32_t* __rest
Status PassthroughExchanger::sink(RuntimeState* state, vectorized::Block*
in_block,
SourceState source_state,
LocalExchangeSinkLocalState& local_state) {
- auto new_block = vectorized::Block::create_unique(in_block->clone_empty());
- new_block->swap(*in_block);
+ vectorized::Block new_block(in_block->clone_empty());
+ new_block.swap(*in_block);
auto channel_id = (local_state._channel_id++) % _num_instances;
_data_queue[channel_id].enqueue(std::move(new_block));
local_state._shared_state->set_ready_for_read(channel_id);
@@ -127,16 +127,16 @@ Status PassthroughExchanger::sink(RuntimeState* state,
vectorized::Block* in_blo
Status PassthroughExchanger::get_block(RuntimeState* state, vectorized::Block*
block,
SourceState& source_state,
LocalExchangeSourceLocalState&
local_state) {
- std::unique_ptr<vectorized::Block> next_block;
+ vectorized::Block next_block;
if (running_sink_operators == 0) {
if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
- *block = *next_block.release();
+ *block = std::move(next_block);
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
source_state = SourceState::FINISHED;
}
} else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
- *block = *next_block.release();
+ *block = std::move(next_block);
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
local_state._dependency->block();
diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
index 13e3fe931e7..b7acff688f6 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
@@ -22,11 +22,6 @@
namespace doris::pipeline {
-enum class ExchangeType : uint8_t {
- SHUFFLE = 0,
- PASSTHROUGH = 1,
-};
-
class LocalExchangeSourceLocalState;
class LocalExchangeSinkLocalState;
@@ -92,7 +87,7 @@ public:
ExchangeType get_type() const override { return ExchangeType::PASSTHROUGH;
}
private:
-
std::vector<moodycamel::ConcurrentQueue<std::unique_ptr<vectorized::Block>>>
_data_queue;
+ std::vector<moodycamel::ConcurrentQueue<vectorized::Block>> _data_queue;
};
} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/operator.cpp
b/be/src/pipeline/pipeline_x/operator.cpp
index 04f7cea0314..76e4d42823a 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -330,8 +330,7 @@
PipelineXLocalStateBase::PipelineXLocalStateBase(RuntimeState* state, OperatorXB
template <typename DependencyType>
Status PipelineXLocalState<DependencyType>::init(RuntimeState* state,
LocalStateInfo& info) {
- _runtime_profile.reset(new RuntimeProfile(_parent->get_name() +
- " (id=" +
std::to_string(_parent->node_id()) + ")"));
+ _runtime_profile.reset(new RuntimeProfile(_parent->get_name() +
name_suffix()));
_runtime_profile->set_metadata(_parent->node_id());
_runtime_profile->set_is_sink(false);
info.parent_profile->add_child(_runtime_profile.get(), true, nullptr);
@@ -341,7 +340,7 @@ Status
PipelineXLocalState<DependencyType>::init(RuntimeState* state, LocalState
if constexpr (!std::is_same_v<FakeDependency, DependencyType>) {
auto& deps = info.upstream_dependencies;
if constexpr (std::is_same_v<LocalExchangeSourceDependency,
DependencyType>) {
- _dependency->set_shared_state(info.local_exchange_state);
+
_dependency->set_shared_state(info.le_state_map[_parent->operator_id()]);
} else {
_dependency->set_shared_state(deps.front()->shared_state());
}
@@ -402,7 +401,7 @@ template <typename DependencyType>
Status PipelineXSinkLocalState<DependencyType>::init(RuntimeState* state,
LocalSinkStateInfo& info)
{
// create profile
- _profile = state->obj_pool()->add(new RuntimeProfile(_parent->get_name() +
id_name()));
+ _profile = state->obj_pool()->add(new RuntimeProfile(_parent->get_name() +
name_suffix()));
_profile->set_metadata(_parent->node_id());
_profile->set_is_sink(true);
_wait_for_finish_dependency_timer = ADD_TIMER(_profile,
"PendingFinishDependency");
@@ -410,7 +409,7 @@ Status
PipelineXSinkLocalState<DependencyType>::init(RuntimeState* state,
auto& deps = info.dependencys;
_dependency = (DependencyType*)deps.front().get();
if constexpr (std::is_same_v<LocalExchangeSinkDependency,
DependencyType>) {
- _dependency->set_shared_state(info.local_exchange_state);
+
_dependency->set_shared_state(info.le_state_map[_parent->dests_id().front()]);
}
if (_dependency) {
_shared_state =
diff --git a/be/src/pipeline/pipeline_x/operator.h
b/be/src/pipeline/pipeline_x/operator.h
index 58c18db7038..69d6a6cbc37 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -20,6 +20,7 @@
#include "common/logging.h"
#include "pipeline/exec/operator.h"
#include "pipeline/pipeline_x/dependency.h"
+#include "pipeline/pipeline_x/local_exchange/local_exchanger.h"
namespace doris::pipeline {
@@ -28,7 +29,7 @@ struct LocalStateInfo {
RuntimeProfile* parent_profile = nullptr;
const std::vector<TScanRangeParams> scan_ranges;
std::vector<DependencySPtr>& upstream_dependencies;
- std::shared_ptr<LocalExchangeSharedState> local_exchange_state;
+ std::map<int, std::shared_ptr<LocalExchangeSharedState>> le_state_map;
int task_idx;
DependencySPtr dependency;
@@ -39,7 +40,7 @@ struct LocalSinkStateInfo {
RuntimeProfile* parent_profile = nullptr;
const int sender_id;
std::vector<DependencySPtr>& dependencys;
- std::shared_ptr<LocalExchangeSharedState> local_exchange_state;
+ std::map<int, std::shared_ptr<LocalExchangeSharedState>> le_state_map;
const TDataSink& tsink;
};
@@ -160,6 +161,10 @@ public:
LOG(FATAL) << "should not reach here!";
return Status::OK();
}
+ virtual Status init(ExchangeType type) {
+ LOG(FATAL) << "should not reach here!";
+ return Status::OK();
+ }
[[nodiscard]] RuntimeProfile* get_runtime_profile() const override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"Runtime Profile is not owned by operator");
@@ -170,6 +175,8 @@ public:
}
[[nodiscard]] std::string get_name() const override { return _op_name; }
virtual DependencySPtr get_dependency(QueryContext* ctx) = 0;
+ virtual std::vector<TExpr> get_local_shuffle_exprs() const { return {}; }
+ virtual ExchangeType get_local_exchange_type() const { return
ExchangeType::NOOP; }
Status prepare(RuntimeState* state) override;
@@ -313,6 +320,10 @@ public:
Status init(RuntimeState* state, LocalStateInfo& info) override;
+ virtual std::string name_suffix() const {
+ return " (id=" + std::to_string(_parent->node_id()) + ")";
+ }
+
Status close(RuntimeState* state) override;
[[nodiscard]] std::string debug_string(int indentation_level = 0) const
override;
@@ -427,7 +438,7 @@ public:
virtual Status init(const TPlanNode& tnode, RuntimeState* state);
Status init(const TDataSink& tsink) override;
- virtual Status init(bool need_partitioner) {
+ virtual Status init(ExchangeType type) {
return Status::InternalError("init() is only implemented in local
exchange!");
}
@@ -452,6 +463,8 @@ public:
}
virtual void get_dependency(std::vector<DependencySPtr>& dependency,
QueryContext* ctx) = 0;
+ virtual std::vector<TExpr> get_local_shuffle_exprs() const { return {}; }
+ virtual ExchangeType get_local_exchange_type() const { return
ExchangeType::NOOP; }
Status close(RuntimeState* state) override {
return Status::InternalError("Should not reach here!");
@@ -570,7 +583,7 @@ public:
[[nodiscard]] std::string debug_string(int indentation_level) const
override;
- virtual std::string id_name() { return " (id=" +
std::to_string(_parent->node_id()) + ")"; }
+ virtual std::string name_suffix() { return " (id=" +
std::to_string(_parent->node_id()) + ")"; }
Dependency* dependency() override { return _dependency; }
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 636d4e235d7..71b45ad79c3 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -231,7 +231,7 @@ Status PipelineXFragmentContext::prepare(const
doris::TPipelineFragmentParams& r
RETURN_IF_ERROR(_sink->init(request.fragment.output_sink));
static_cast<void>(root_pipeline->set_sink(_sink));
- // RETURN_IF_ERROR(_plan_local_shuffle());
+ RETURN_IF_ERROR(_plan_local_shuffle());
// 4. Initialize global states in pipelines.
for (PipelinePtr& pipeline : _pipelines) {
@@ -254,7 +254,34 @@ Status PipelineXFragmentContext::_plan_local_shuffle() {
auto& children = _pipelines[pip_idx]->children();
if (children.empty()) {
_pipelines[pip_idx]->init_need_to_local_shuffle_by_source();
- } else {
+ } else if (children.size() == 1) {
+
_pipelines[pip_idx]->set_need_to_local_shuffle(children[0]->need_to_local_shuffle());
+ }
+
+ int idx = 0;
+ bool do_local_exchange = false;
+ do {
+ auto& ops = _pipelines[pip_idx]->operator_xs();
+ do_local_exchange = false;
+ for (; idx < ops.size();) {
+ if (ops[idx]->get_local_exchange_type() != ExchangeType::NOOP)
{
+ RETURN_IF_ERROR(_add_local_exchange(
+ idx, ops[idx]->node_id(),
_runtime_state->obj_pool(),
+ _pipelines[pip_idx],
ops[idx]->get_local_shuffle_exprs(),
+ ops[idx]->get_local_exchange_type(),
&do_local_exchange));
+ }
+ if (do_local_exchange) {
+ idx = 2;
+ break;
+ }
+ idx++;
+ }
+ } while (do_local_exchange);
+ if (_pipelines[pip_idx]->sink_x()->get_local_exchange_type() !=
ExchangeType::NOOP) {
+ RETURN_IF_ERROR(_add_local_exchange(
+ idx, _pipelines[pip_idx]->sink_x()->node_id(),
_runtime_state->obj_pool(),
+ _pipelines[pip_idx],
_pipelines[pip_idx]->sink_x()->get_local_shuffle_exprs(),
+ _pipelines[pip_idx]->sink_x()->get_local_exchange_type(),
&do_local_exchange));
}
}
return Status::OK();
@@ -443,19 +470,20 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
_runtime_states[i]->set_total_load_streams(request.total_load_streams);
_runtime_states[i]->set_num_local_sink(request.num_local_sink);
std::map<PipelineId, PipelineXTask*> pipeline_id_to_task;
- auto get_local_exchange_state =
- [&](PipelinePtr pipeline) ->
std::shared_ptr<LocalExchangeSharedState> {
+ auto get_local_exchange_state = [&](PipelinePtr pipeline)
+ -> std::map<int, std::shared_ptr<LocalExchangeSharedState>> {
+ std::map<int, std::shared_ptr<LocalExchangeSharedState>>
le_state_map;
auto source_id = pipeline->operator_xs().front()->operator_id();
if (auto iter = _op_id_to_le_state.find(source_id); iter !=
_op_id_to_le_state.end()) {
- return iter->second;
+ le_state_map.insert({source_id, iter->second});
}
for (auto sink_to_source_id : pipeline->sink_x()->dests_id()) {
if (auto iter = _op_id_to_le_state.find(sink_to_source_id);
iter != _op_id_to_le_state.end()) {
- return iter->second;
+ le_state_map.insert({sink_to_source_id, iter->second});
}
}
- return nullptr;
+ return le_state_map;
};
for (auto& pipeline : _pipelines) {
auto task = std::make_unique<PipelineXTask>(
@@ -606,49 +634,100 @@ Status
PipelineXFragmentContext::_create_tree_helper(ObjectPool* pool,
return Status::OK();
}
-Status PipelineXFragmentContext::_add_local_exchange(ObjectPool* pool,
OperatorXPtr& op,
- PipelinePtr& cur_pipe,
const TPlanNode& tnode,
+Status PipelineXFragmentContext::_add_local_exchange(int idx, int node_id,
ObjectPool* pool,
+ PipelinePtr cur_pipe,
const std::vector<TExpr>&
texprs,
- ExchangeType
exchange_type) {
+ ExchangeType
exchange_type,
+ bool* do_local_exchange) {
if (!_runtime_state->enable_local_shuffle() || _num_instances <= 1) {
return Status::OK();
}
- auto parent = op;
- RETURN_IF_ERROR(parent->init(tnode, _runtime_state.get()));
- auto local_exchange_id = next_operator_id();
- op.reset(new LocalExchangeSourceOperatorX(pool, local_exchange_id,
parent.get()));
- RETURN_IF_ERROR(cur_pipe->add_operator(op));
- RETURN_IF_ERROR(parent->set_child(op));
- const auto downstream_pipeline_id = cur_pipe->id();
- if (_dag.find(downstream_pipeline_id) == _dag.end()) {
- _dag.insert({downstream_pipeline_id, {}});
+ if (!cur_pipe->need_to_local_shuffle() && exchange_type ==
ExchangeType::SHUFFLE) {
+ return Status::OK();
}
- cur_pipe = add_pipeline();
- _dag[downstream_pipeline_id].push_back(cur_pipe->id());
+ *do_local_exchange = true;
+
+ auto& operator_xs = cur_pipe->operator_xs();
+ auto total_op_num = operator_xs.size();
+ const auto downstream_pipeline_id = cur_pipe->id();
+ auto local_exchange_id = next_operator_id();
+ // 1. Create a new pipeline with local exchange sink.
+ auto new_pip = add_pipeline();
DataSinkOperatorXPtr sink;
sink.reset(new LocalExchangeSinkOperatorX(next_sink_operator_id(),
local_exchange_id,
_num_instances, texprs));
- RETURN_IF_ERROR(cur_pipe->set_sink(sink));
+ RETURN_IF_ERROR(new_pip->set_sink(sink));
- bool need_partitioner = false;
auto shared_state = LocalExchangeSharedState::create_shared();
shared_state->source_dependencies.resize(_num_instances, nullptr);
switch (exchange_type) {
case ExchangeType::SHUFFLE:
shared_state->exchanger =
ShuffleExchanger::create_unique(_num_instances);
- need_partitioner = true;
+ new_pip->set_need_to_local_shuffle(false);
+ cur_pipe->set_need_to_local_shuffle(false);
break;
case ExchangeType::PASSTHROUGH:
shared_state->exchanger =
PassthroughExchanger::create_unique(_num_instances);
+ new_pip->set_need_to_local_shuffle(cur_pipe->need_to_local_shuffle());
+ cur_pipe->set_need_to_local_shuffle(true);
break;
default:
return Status::InternalError("Unsupported local exchange type : " +
std::to_string((int)exchange_type));
}
- RETURN_IF_ERROR(cur_pipe->sink_x()->init(need_partitioner));
+ RETURN_IF_ERROR(new_pip->sink_x()->init(exchange_type));
_op_id_to_le_state.insert({local_exchange_id, shared_state});
+
+ // 2. Initialize operators list.
+ std::copy(operator_xs.begin(), operator_xs.begin() + idx,
+ std::inserter(new_pip->operator_xs(),
new_pip->operator_xs().end()));
+
+ // 3. Erase operators in new pipeline.
+ OperatorXPtr source_op;
+ source_op.reset(new LocalExchangeSourceOperatorX(pool, local_exchange_id));
+ RETURN_IF_ERROR(source_op->init(exchange_type));
+ operator_xs.erase(operator_xs.begin(), operator_xs.begin() + idx);
+ if (operator_xs.size() > 0) {
+ RETURN_IF_ERROR(operator_xs.front()->set_child(source_op));
+ }
+
+ operator_xs.insert(operator_xs.begin(), source_op);
+ RETURN_IF_ERROR(source_op->set_child(new_pip->operator_xs().back()));
+
+ std::vector<std::shared_ptr<Pipeline>> new_children;
+ std::vector<PipelineId> edges_with_source;
+ for (auto child : cur_pipe->children()) {
+ bool found = false;
+ for (auto op : new_pip->operator_xs()) {
+ if (child->sink_x()->node_id() == op->node_id()) {
+ new_pip->set_children(child);
+ found = true;
+ };
+ }
+ if (!found) {
+ new_children.push_back(child);
+ edges_with_source.push_back(child->id());
+ }
+ }
+ new_children.push_back(new_pip);
+ edges_with_source.push_back(new_pip->id());
+
+ if (!new_pip->children().empty()) {
+ std::vector<PipelineId> edges_with_sink;
+ for (auto child : new_pip->children()) {
+ edges_with_sink.push_back(child->id());
+ }
+ _dag.insert({new_pip->id(), edges_with_sink});
+ }
+ cur_pipe->set_children(new_children);
+ _dag[downstream_pipeline_id] = edges_with_source;
+
+ CHECK(total_op_num + 1 == cur_pipe->operator_xs().size() +
new_pip->operator_xs().size())
+ << "total_op_num: " << total_op_num
+ << " cur_pipe->operator_xs().size(): " <<
cur_pipe->operator_xs().size()
+ << " new_pip->operator_xs().size(): " <<
new_pip->operator_xs().size();
return Status::OK();
}
@@ -704,7 +783,7 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
if (_dag.find(downstream_pipeline_id) == _dag.end()) {
_dag.insert({downstream_pipeline_id, {}});
}
- cur_pipe = add_pipeline();
+ cur_pipe = add_pipeline(cur_pipe);
_dag[downstream_pipeline_id].push_back(cur_pipe->id());
DataSinkOperatorXPtr sink;
sink.reset(new DistinctStreamingAggSinkOperatorX(pool,
next_sink_operator_id(), tnode,
@@ -712,10 +791,6 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
sink->set_dests_id({op->operator_id()});
RETURN_IF_ERROR(cur_pipe->set_sink(sink));
RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode,
_runtime_state.get()));
-
- // RETURN_IF_ERROR(_add_local_exchange(pool, op,
cur_pipe, tnode,
- //
tnode.agg_node.grouping_exprs,
- //
ExchangeType::PASSTHROUGH));
} else if (tnode.agg_node.__isset.use_streaming_preaggregation &&
tnode.agg_node.use_streaming_preaggregation) {
op.reset(new StreamingAggSourceOperatorX(pool, tnode,
next_operator_id(), descs));
@@ -725,7 +800,7 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
if (_dag.find(downstream_pipeline_id) == _dag.end()) {
_dag.insert({downstream_pipeline_id, {}});
}
- cur_pipe = add_pipeline();
+ cur_pipe = add_pipeline(cur_pipe);
_dag[downstream_pipeline_id].push_back(cur_pipe->id());
DataSinkOperatorXPtr sink;
sink.reset(new StreamingAggSinkOperatorX(pool,
next_sink_operator_id(), tnode, descs));
@@ -733,9 +808,6 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
RETURN_IF_ERROR(cur_pipe->set_sink(sink));
RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode,
_runtime_state.get()));
- // RETURN_IF_ERROR(_add_local_exchange(pool, op,
cur_pipe, tnode,
- //
tnode.agg_node.grouping_exprs,
- //
ExchangeType::PASSTHROUGH));
} else {
op.reset(new AggSourceOperatorX(pool, tnode, next_operator_id(),
descs));
RETURN_IF_ERROR(cur_pipe->add_operator(op));
@@ -744,7 +816,7 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
if (_dag.find(downstream_pipeline_id) == _dag.end()) {
_dag.insert({downstream_pipeline_id, {}});
}
- cur_pipe = add_pipeline();
+ cur_pipe = add_pipeline(cur_pipe);
_dag[downstream_pipeline_id].push_back(cur_pipe->id());
DataSinkOperatorXPtr sink;
@@ -752,20 +824,6 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
sink->set_dests_id({op->operator_id()});
RETURN_IF_ERROR(cur_pipe->set_sink(sink));
RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode,
_runtime_state.get()));
-
- // if (tnode.agg_node.grouping_exprs.empty()) {
- // if (tnode.agg_node.need_finalize) {
- // RETURN_IF_ERROR(_add_local_exchange(pool,
op, cur_pipe, tnode,
- //
tnode.agg_node.grouping_exprs,
- //
ExchangeType::PASSTHROUGH));
- // } else {
- // // TODO(gabriel): maybe use local shuffle
- // }
- // } else if (cur_pipe->need_to_local_shuffle()) {
- // RETURN_IF_ERROR(_add_local_exchange(pool, op,
cur_pipe, tnode,
- //
tnode.agg_node.grouping_exprs,
- //
ExchangeType::SHUFFLE));
- // }
}
break;
}
@@ -777,7 +835,7 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
if (_dag.find(downstream_pipeline_id) == _dag.end()) {
_dag.insert({downstream_pipeline_id, {}});
}
- PipelinePtr build_side_pipe = add_pipeline();
+ PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
_dag[downstream_pipeline_id].push_back(build_side_pipe->id());
DataSinkOperatorXPtr sink;
@@ -786,20 +844,6 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
RETURN_IF_ERROR(build_side_pipe->set_sink(sink));
RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode,
_runtime_state.get()));
- std::vector<TExpr> probe_exprs;
- const std::vector<TEqJoinCondition>& eq_join_conjuncts =
- tnode.hash_join_node.eq_join_conjuncts;
- for (const auto& eq_join_conjunct : eq_join_conjuncts) {
- probe_exprs.push_back(eq_join_conjunct.left);
- }
- if (tnode.hash_join_node.__isset.is_broadcast_join &&
- tnode.hash_join_node.is_broadcast_join) {
- RETURN_IF_ERROR(_add_local_exchange(pool, op, cur_pipe, tnode,
probe_exprs,
- ExchangeType::PASSTHROUGH));
- } else if (cur_pipe->need_to_local_shuffle()) {
- RETURN_IF_ERROR(_add_local_exchange(pool, op, cur_pipe, tnode,
probe_exprs,
- ExchangeType::SHUFFLE));
- }
_pipeline_parent_map.push(op->node_id(), cur_pipe);
_pipeline_parent_map.push(op->node_id(), build_side_pipe);
break;
@@ -812,7 +856,7 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
if (_dag.find(downstream_pipeline_id) == _dag.end()) {
_dag.insert({downstream_pipeline_id, {}});
}
- PipelinePtr build_side_pipe = add_pipeline();
+ PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
_dag[downstream_pipeline_id].push_back(build_side_pipe->id());
DataSinkOperatorXPtr sink;
@@ -835,7 +879,7 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
_dag.insert({downstream_pipeline_id, {}});
}
for (int i = 0; i < child_count; i++) {
- PipelinePtr build_side_pipe = add_pipeline();
+ PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
_dag[downstream_pipeline_id].push_back(build_side_pipe->id());
DataSinkOperatorXPtr sink;
sink.reset(new UnionSinkOperatorX(i, next_sink_operator_id(),
pool, tnode, descs));
@@ -855,7 +899,7 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
if (_dag.find(downstream_pipeline_id) == _dag.end()) {
_dag.insert({downstream_pipeline_id, {}});
}
- cur_pipe = add_pipeline();
+ cur_pipe = add_pipeline(cur_pipe);
_dag[downstream_pipeline_id].push_back(cur_pipe->id());
DataSinkOperatorXPtr sink;
@@ -873,7 +917,7 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
if (_dag.find(downstream_pipeline_id) == _dag.end()) {
_dag.insert({downstream_pipeline_id, {}});
}
- cur_pipe = add_pipeline();
+ cur_pipe = add_pipeline(cur_pipe);
_dag[downstream_pipeline_id].push_back(cur_pipe->id());
DataSinkOperatorXPtr sink;
@@ -891,7 +935,7 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
if (_dag.find(downstream_pipeline_id) == _dag.end()) {
_dag.insert({downstream_pipeline_id, {}});
}
- cur_pipe = add_pipeline();
+ cur_pipe = add_pipeline(cur_pipe);
_dag[downstream_pipeline_id].push_back(cur_pipe->id());
DataSinkOperatorXPtr sink;
@@ -971,7 +1015,7 @@ Status
PipelineXFragmentContext::_build_operators_for_set_operation_node(
}
for (int child_id = 0; child_id < tnode.num_children; child_id++) {
- PipelinePtr probe_side_pipe = add_pipeline();
+ PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
_dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
DataSinkOperatorXPtr sink;
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
index 1390f2a9544..b2239c49910 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
@@ -125,9 +125,9 @@ public:
private:
void _close_fragment_instance() override;
Status _build_pipeline_tasks(const doris::TPipelineFragmentParams&
request) override;
- Status _add_local_exchange(ObjectPool* pool, OperatorXPtr& op,
PipelinePtr& cur_pipe,
- const TPlanNode& tnode, const
std::vector<TExpr>& texprs,
- ExchangeType exchange_type);
+ Status _add_local_exchange(int idx, int node_id, ObjectPool* pool,
PipelinePtr cur_pipe,
+ const std::vector<TExpr>& texprs, ExchangeType
exchange_type,
+ bool* do_local_exchange);
[[nodiscard]] Status _build_pipelines(ObjectPool* pool,
const
doris::TPipelineFragmentParams& request,
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index 3495d8eb273..559a76b8e65 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -47,14 +47,14 @@ namespace doris::pipeline {
PipelineXTask::PipelineXTask(PipelinePtr& pipeline, uint32_t task_id,
RuntimeState* state,
PipelineFragmentContext* fragment_context,
RuntimeProfile* parent_profile,
- std::shared_ptr<LocalExchangeSharedState>
local_exchange_state,
+ std::map<int,
std::shared_ptr<LocalExchangeSharedState>> le_state_map,
int task_idx)
: PipelineTask(pipeline, task_id, state, fragment_context,
parent_profile),
_operators(pipeline->operator_xs()),
_source(_operators.front()),
_root(_operators.back()),
_sink(pipeline->sink_shared_pointer()),
- _local_exchange_state(local_exchange_state),
+ _le_state_map(le_state_map),
_task_idx(task_idx),
_execution_dep(state->get_query_ctx()->get_execution_dependency()) {
_pipeline_task_watcher.start();
@@ -76,7 +76,7 @@ Status PipelineXTask::prepare(RuntimeState* state, const
TPipelineInstanceParams
{
// set sink local state
LocalSinkStateInfo info {_parent_profile, local_params.sender_id,
- get_downstream_dependency(),
_local_exchange_state, tsink};
+ get_downstream_dependency(), _le_state_map,
tsink};
RETURN_IF_ERROR(_sink->setup_local_state(state, info));
}
@@ -87,9 +87,8 @@ Status PipelineXTask::prepare(RuntimeState* state, const
TPipelineInstanceParams
for (int op_idx = _operators.size() - 1; op_idx >= 0; op_idx--) {
auto& op = _operators[op_idx];
auto& deps = get_upstream_dependency(op->operator_id());
- LocalStateInfo info {parent_profile, scan_ranges,
- deps, _local_exchange_state,
- _task_idx,
_source_dependency[op->operator_id()]};
+ LocalStateInfo info {parent_profile, scan_ranges, deps,
+ _le_state_map, _task_idx,
_source_dependency[op->operator_id()]};
RETURN_IF_ERROR(op->setup_local_state(state, info));
parent_profile = state->get_local_state(op->operator_id())->profile();
}
@@ -292,7 +291,7 @@ void PipelineXTask::finalize() {
std::vector<DependencySPtr> {}.swap(_downstream_dependency);
DependencyMap {}.swap(_upstream_dependency);
- _local_exchange_state = nullptr;
+ _le_state_map.clear();
}
Status PipelineXTask::try_close(Status exec_status) {
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h
b/be/src/pipeline/pipeline_x/pipeline_x_task.h
index e4ee71914b5..2a5193d3c4b 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h
@@ -52,7 +52,8 @@ class PipelineXTask : public PipelineTask {
public:
PipelineXTask(PipelinePtr& pipeline, uint32_t task_id, RuntimeState* state,
PipelineFragmentContext* fragment_context, RuntimeProfile*
parent_profile,
- std::shared_ptr<LocalExchangeSharedState>
local_exchange_state, int task_idx);
+ std::map<int, std::shared_ptr<LocalExchangeSharedState>>
le_state_map,
+ int task_idx);
Status prepare(RuntimeState* state) override {
return Status::InternalError("Should not reach here!");
@@ -209,7 +210,7 @@ private:
DependencyMap _upstream_dependency;
std::map<int, DependencySPtr> _source_dependency;
std::vector<DependencySPtr> _downstream_dependency;
- std::shared_ptr<LocalExchangeSharedState> _local_exchange_state;
+ std::map<int, std::shared_ptr<LocalExchangeSharedState>> _le_state_map;
int _task_idx;
bool _dry_run = false;
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index d4f11d25e2f..40404423e4c 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -844,6 +844,7 @@ Status FragmentMgr::exec_plan_fragment(const
TPipelineFragmentParams& params,
SCOPED_RAW_TIMER(&duration_ns);
auto prepare_st = context->prepare(params);
if (!prepare_st.ok()) {
+ LOG(WARNING) << "Prepare failed: " << prepare_st.to_string();
context->close_if_prepare_failed();
return prepare_st;
}
@@ -923,6 +924,7 @@ Status FragmentMgr::exec_plan_fragment(const
TPipelineFragmentParams& params,
SCOPED_RAW_TIMER(&duration_ns);
auto prepare_st = context->prepare(params, i);
if (!prepare_st.ok()) {
+ LOG(WARNING) << "Prepare failed: " <<
prepare_st.to_string();
context->close_if_prepare_failed();
static_cast<void>(context->update_status(prepare_st));
return prepare_st;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 6eb9f2d75e9..72e56d09c4e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -310,6 +310,7 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
}
}
DataPartition dataPartition =
toDataPartition(distribute.getDistributionSpec(), validOutputIds, context);
+ exchangeNode.setPartitionType(dataPartition.getType());
PlanFragment parentFragment = new
PlanFragment(context.nextFragmentId(), exchangeNode, dataPartition);
exchangeNode.setNumInstances(inputFragment.getPlanRoot().getNumInstances());
if (distribute.getDistributionSpec() instanceof
DistributionSpecGather) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
index bde891a8351..65a366fde16 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
@@ -29,6 +29,7 @@ import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.statistics.StatsRecursiveDerive;
import org.apache.doris.thrift.TExchangeNode;
import org.apache.doris.thrift.TExplainLevel;
+import org.apache.doris.thrift.TPartitionType;
import org.apache.doris.thrift.TPlanNode;
import org.apache.doris.thrift.TPlanNodeType;
@@ -64,6 +65,7 @@ public class ExchangeNode extends PlanNode {
private SortInfo mergeInfo;
private boolean isRightChildOfBroadcastHashJoin = false;
+ private TPartitionType partitionType;
/**
* use for Nereids only.
@@ -77,6 +79,10 @@ public class ExchangeNode extends PlanNode {
computeTupleIds();
}
+ public void setPartitionType(TPartitionType partitionType) {
+ this.partitionType = partitionType;
+ }
+
/**
* Create ExchangeNode that consumes output of inputNode.
* An ExchangeNode doesn't have an input node as a child, which is why we
@@ -171,6 +177,7 @@ public class ExchangeNode extends PlanNode {
msg.exchange_node.setSortInfo(mergeInfo.toThrift());
}
msg.exchange_node.setOffset(offset);
+ msg.exchange_node.setPartitionType(partitionType);
}
@Override
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 778604ce7d5..09e3f4788d6 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -1028,6 +1028,8 @@ struct TExchangeNode {
2: optional TSortInfo sort_info
// This is tHe number of rows to skip before returning results
3: optional i64 offset
+ // Shuffle partition type
+ 4: optional Partitions.TPartitionType partition_type
}
struct TOlapRewriteNode {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]