This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ad1313cce67 [refactor](pipelineX) refine _build_side_pipelines (#25871)
ad1313cce67 is described below
commit ad1313cce67e6342f253bed0da6fe0ca42197492
Author: Mryange <[email protected]>
AuthorDate: Thu Oct 26 10:32:23 2023 +0800
[refactor](pipelineX) refine _build_side_pipelines (#25871)
---
.../pipeline_x/pipeline_x_fragment_context.cpp | 38 +++++-----------------
.../pipeline_x/pipeline_x_fragment_context.h | 25 +++++++++++---
2 files changed, 28 insertions(+), 35 deletions(-)
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index a1f69e6f02b..78a888af9df 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -473,9 +473,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
_runtime_states[i].get()});
}
}
- _build_side_pipelines.clear();
- _union_child_pipelines.clear();
- _set_child_pipelines.clear();
+ _pipeline_parent_map.clear();
_dag.clear();
_op_id_to_le_state.clear();
@@ -589,16 +587,7 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
const DescriptorTbl& descs,
OperatorXPtr& op,
PipelinePtr& cur_pipe, int
parent_idx,
int child_idx) {
- if (_build_side_pipelines.find(parent_idx) != _build_side_pipelines.end()
&& child_idx > 0) {
- cur_pipe = _build_side_pipelines[parent_idx];
- }
- if (_union_child_pipelines.find(parent_idx) !=
_union_child_pipelines.end()) {
- cur_pipe = _union_child_pipelines[parent_idx][child_idx];
- }
- if (_set_child_pipelines.find(parent_idx) != _set_child_pipelines.end()) {
- cur_pipe = _set_child_pipelines[parent_idx][child_idx];
- }
-
+ _pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx);
std::stringstream error_msg;
switch (tnode.node_type) {
case TPlanNodeType::OLAP_SCAN_NODE: {
@@ -704,7 +693,8 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
sink->set_dests_id({op->operator_id()});
RETURN_IF_ERROR(build_side_pipe->set_sink(sink));
RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode,
_runtime_state.get()));
- _build_side_pipelines.insert({sink->node_id(), build_side_pipe});
+ _pipeline_parent_map.push(op->node_id(), cur_pipe);
+ _pipeline_parent_map.push(op->node_id(), build_side_pipe);
break;
}
case TPlanNodeType::CROSS_JOIN_NODE: {
@@ -723,7 +713,8 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
sink->set_dests_id({op->operator_id()});
RETURN_IF_ERROR(build_side_pipe->set_sink(sink));
RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode,
_runtime_state.get()));
- _build_side_pipelines.insert({sink->node_id(), build_side_pipe});
+ _pipeline_parent_map.push(op->node_id(), cur_pipe);
+ _pipeline_parent_map.push(op->node_id(), build_side_pipe);
break;
}
case TPlanNodeType::UNION_NODE: {
@@ -735,7 +726,6 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
if (_dag.find(downstream_pipeline_id) == _dag.end()) {
_dag.insert({downstream_pipeline_id, {}});
}
- int father_id = tnode.node_id;
for (int i = 0; i < child_count; i++) {
PipelinePtr build_side_pipe = add_pipeline();
_dag[downstream_pipeline_id].push_back(build_side_pipe->id());
@@ -745,13 +735,8 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
RETURN_IF_ERROR(build_side_pipe->set_sink(sink));
RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode,
_runtime_state.get()));
// preset children pipelines. if any pipeline found this as its
father, will use the prepared pipeline to build.
- if (_union_child_pipelines.find(father_id) ==
_union_child_pipelines.end()) {
- _union_child_pipelines.insert({father_id, {build_side_pipe}});
- } else {
- _union_child_pipelines[father_id].push_back(build_side_pipe);
- }
+ _pipeline_parent_map.push(op->node_id(), build_side_pipe);
}
-
break;
}
case TPlanNodeType::SORT_NODE: {
@@ -877,8 +862,6 @@ Status
PipelineXFragmentContext::_build_operators_for_set_operation_node(
_dag.insert({downstream_pipeline_id, {}});
}
- int parent_id = tnode.node_id;
-
for (int child_id = 0; child_id < tnode.num_children; child_id++) {
PipelinePtr probe_side_pipe = add_pipeline();
_dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
@@ -895,12 +878,7 @@ Status
PipelineXFragmentContext::_build_operators_for_set_operation_node(
RETURN_IF_ERROR(probe_side_pipe->set_sink(sink));
RETURN_IF_ERROR(probe_side_pipe->sink_x()->init(tnode,
_runtime_state.get()));
// prepare children pipelines. if any pipeline found this as its
father, will use the prepared pipeline to build.
- if (child_id == 0) {
- DCHECK(_set_child_pipelines.find(parent_id) ==
_set_child_pipelines.end());
- _set_child_pipelines.insert({parent_id, {probe_side_pipe}});
- } else {
- _set_child_pipelines[parent_id].push_back(probe_side_pipe);
- }
+ _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
}
return Status::OK();
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
index 45eb7f48cc5..4d2a59277e9 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
@@ -175,14 +175,29 @@ private:
// build probe operator and build operator in separate pipelines. To do
this, we should build
// ProbeSide first, and use `_pipelines_to_build` to store which pipeline
the build operator
// is in, so we can build BuildSide once we complete probe side.
- std::map<int, PipelinePtr> _build_side_pipelines;
+ struct pipeline_parent_map {
+ std::map<int, std::vector<PipelinePtr>> _build_side_pipelines;
+ void push(int parent_node_id, PipelinePtr pipeline) {
+ if (!_build_side_pipelines.contains(parent_node_id)) {
+ _build_side_pipelines.insert({parent_node_id, {pipeline}});
+ } else {
+ _build_side_pipelines[parent_node_id].push_back(pipeline);
+ }
+ }
+ void pop(PipelinePtr& cur_pipe, int parent_node_id, int child_idx) {
+ if (!_build_side_pipelines.contains(parent_node_id)) {
+ return;
+ }
+ DCHECK(_build_side_pipelines.contains(parent_node_id));
+ auto& child_pipeline = _build_side_pipelines[parent_node_id];
+ DCHECK(child_idx < child_pipeline.size());
+ cur_pipe = child_pipeline[child_idx];
+ }
+ void clear() { _build_side_pipelines.clear(); }
+ } _pipeline_parent_map;
std::map<UniqueId, RuntimeState*> _instance_id_to_runtime_state;
std::mutex _state_map_lock;
-
- // TODO: Unify `_union_child_pipelines`, `_set_child_pipelines`,
`_build_side_pipelines`.
- std::map<int, std::vector<PipelinePtr>> _union_child_pipelines;
- std::map<int, std::vector<PipelinePtr>> _set_child_pipelines;
// We can guarantee that a plan node ID can correspond to an operator ID,
// but some operators do not have a corresponding plan node ID.
// We set these IDs as negative numbers, which are not visible to the user.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]