This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a7c0dddbc92 [refactor](rename) Rename some variables in pipeline for
better readability (#29140)
a7c0dddbc92 is described below
commit a7c0dddbc9206ed47812ba2fb9bdadac0add067c
Author: zhiqiang <[email protected]>
AuthorDate: Thu Dec 28 12:54:47 2023 +0800
[refactor](rename) Rename some variables in pipeline for better readability
(#29140)
* rft-rename
* format
---
be/src/pipeline/pipeline.cpp | 6 ++--
be/src/pipeline/pipeline.h | 6 ++--
be/src/pipeline/pipeline_fragment_context.cpp | 42 +++++++++++++++------------
3 files changed, 29 insertions(+), 25 deletions(-)
diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp
index d8ac73374f5..7990f84df49 100644
--- a/be/src/pipeline/pipeline.cpp
+++ b/be/src/pipeline/pipeline.cpp
@@ -71,14 +71,14 @@ Status Pipeline::prepare(RuntimeState* state) {
return Status::OK();
}
-Status Pipeline::set_sink(OperatorBuilderPtr& sink_) {
- if (_sink) {
+Status Pipeline::set_sink_builder(OperatorBuilderPtr& sink_) {
+ if (_sink_builder) {
return Status::InternalError("set sink twice");
}
if (!sink_->is_sink()) {
return Status::InternalError("should set a sink operator but {}",
typeid(sink_).name());
}
- _sink = sink_;
+ _sink_builder = sink_;
return Status::OK();
}
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index 2775c45019e..ef0acfba258 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -96,10 +96,10 @@ public:
// prepare operators for pipelineX
Status prepare(RuntimeState* state);
- Status set_sink(OperatorBuilderPtr& sink_operator);
+ Status set_sink_builder(OperatorBuilderPtr& sink_operator_builder);
Status set_sink(DataSinkOperatorXPtr& sink_operator);
- OperatorBuilderBase* sink() { return _sink.get(); }
+ OperatorBuilderBase* get_sink_builder() { return _sink_builder.get(); }
DataSinkOperatorXBase* sink_x() { return _sink_x.get(); }
OperatorXs& operator_xs() { return operatorXs; }
DataSinkOperatorXPtr sink_shared_pointer() { return _sink_x; }
@@ -185,7 +185,7 @@ private:
void _init_profile();
OperatorBuilders _operator_builders; // left is _source, right is _root
- OperatorBuilderPtr _sink; // put block to sink
+ OperatorBuilderPtr _sink_builder; // put block to sink
std::mutex _depend_mutex;
std::vector<std::pair<int, std::weak_ptr<Pipeline>>> _parents;
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 034fbaf2951..695fd6f4d3d 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -35,6 +35,7 @@
#include "common/config.h"
#include "common/logging.h"
+#include "common/status.h"
#include "exec/data_sink.h"
#include "exec/exec_node.h"
#include "exec/scan_node.h"
@@ -332,6 +333,7 @@ Status PipelineFragmentContext::prepare(const
doris::TPipelineFragmentParams& re
_runtime_state->set_num_local_sink(request.num_local_sink);
if (request.fragment.__isset.output_sink) {
+ // Here we build a DataSink object, which will be hold by
DataSinkOperator
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(DataSink::create_data_sink(
_runtime_state->obj_pool(), request.fragment.output_sink,
request.fragment.output_exprs, request, idx,
_root_plan->row_desc(),
@@ -343,6 +345,7 @@ Status PipelineFragmentContext::prepare(const
doris::TPipelineFragmentParams& re
_root_pipeline->set_collect_query_statistics_with_every_batch();
RETURN_IF_ERROR(_build_pipelines(_root_plan, _root_pipeline));
if (_sink) {
+ // DataSinkOperator is builded here
RETURN_IF_ERROR(_create_sink(request.local_params[idx].sender_id,
request.fragment.output_sink,
_runtime_state.get()));
}
@@ -366,14 +369,15 @@ Status PipelineFragmentContext::_build_pipeline_tasks(
_total_tasks = 0;
for (PipelinePtr& pipeline : _pipelines) {
// if sink
- auto sink = pipeline->sink()->build_operator();
+ auto sink_operator = pipeline->get_sink_builder()->build_operator();
// TODO pipeline 1 need to add new interface for exec node and operator
- static_cast<void>(sink->init(request.fragment.output_sink));
+ RETURN_IF_ERROR(sink_operator->init(request.fragment.output_sink));
RETURN_IF_ERROR(pipeline->build_operators());
- auto task = std::make_unique<PipelineTask>(pipeline, _total_tasks++,
_runtime_state.get(),
- sink, this,
pipeline->pipeline_profile());
- static_cast<void>(sink->set_child(task->get_root()));
+ auto task =
+ std::make_unique<PipelineTask>(pipeline, _total_tasks++,
_runtime_state.get(),
+ sink_operator, this,
pipeline->pipeline_profile());
+ RETURN_IF_ERROR(sink_operator->set_child(task->get_root()));
_tasks.emplace_back(std::move(task));
_runtime_profile->add_child(pipeline->pipeline_profile(), true,
nullptr);
}
@@ -524,7 +528,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode*
node, PipelinePtr cur
RETURN_IF_ERROR(_build_pipelines(union_node->child(child_id),
new_child_pipeline));
OperatorBuilderPtr child_sink_builder =
std::make_shared<UnionSinkOperatorBuilder>(
union_node->id(), child_id, union_node, data_queue);
-
RETURN_IF_ERROR(new_child_pipeline->set_sink(child_sink_builder));
+
RETURN_IF_ERROR(new_child_pipeline->set_sink_builder(child_sink_builder));
}
OperatorBuilderPtr source_builder =
std::make_shared<UnionSourceOperatorBuilder>(
node->id(), union_node, data_queue);
@@ -541,7 +545,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode*
node, PipelinePtr cur
OperatorBuilderPtr pre_agg_sink =
std::make_shared<DistinctStreamingAggSinkOperatorBuilder>(node->id(), agg_node,
data_queue);
- RETURN_IF_ERROR(new_pipe->set_sink(pre_agg_sink));
+ RETURN_IF_ERROR(new_pipe->set_sink_builder(pre_agg_sink));
OperatorBuilderPtr pre_agg_source =
std::make_shared<DistinctStreamingAggSourceOperatorBuilder>(
@@ -551,7 +555,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode*
node, PipelinePtr cur
auto data_queue = std::make_shared<DataQueue>(1);
OperatorBuilderPtr pre_agg_sink =
std::make_shared<StreamingAggSinkOperatorBuilder>(
node->id(), agg_node, data_queue);
- RETURN_IF_ERROR(new_pipe->set_sink(pre_agg_sink));
+ RETURN_IF_ERROR(new_pipe->set_sink_builder(pre_agg_sink));
OperatorBuilderPtr pre_agg_source =
std::make_shared<StreamingAggSourceOperatorBuilder>(
node->id(), agg_node, data_queue);
@@ -559,7 +563,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode*
node, PipelinePtr cur
} else {
OperatorBuilderPtr agg_sink =
std::make_shared<AggSinkOperatorBuilder>(node->id(),
agg_node);
- RETURN_IF_ERROR(new_pipe->set_sink(agg_sink));
+ RETURN_IF_ERROR(new_pipe->set_sink_builder(agg_sink));
OperatorBuilderPtr agg_source =
std::make_shared<AggSourceOperatorBuilder>(node->id(),
agg_node);
@@ -572,7 +576,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode*
node, PipelinePtr cur
RETURN_IF_ERROR(_build_pipelines(node->child(0), new_pipeline));
OperatorBuilderPtr sort_sink =
std::make_shared<SortSinkOperatorBuilder>(node->id(), node);
- RETURN_IF_ERROR(new_pipeline->set_sink(sort_sink));
+ RETURN_IF_ERROR(new_pipeline->set_sink_builder(sort_sink));
OperatorBuilderPtr sort_source =
std::make_shared<SortSourceOperatorBuilder>(node->id(), node);
@@ -585,7 +589,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode*
node, PipelinePtr cur
OperatorBuilderPtr partition_sort_sink =
std::make_shared<PartitionSortSinkOperatorBuilder>(node->id(),
node);
- RETURN_IF_ERROR(new_pipeline->set_sink(partition_sort_sink));
+ RETURN_IF_ERROR(new_pipeline->set_sink_builder(partition_sort_sink));
OperatorBuilderPtr partition_sort_source =
std::make_shared<PartitionSortSourceOperatorBuilder>(node->id(), node);
@@ -598,7 +602,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode*
node, PipelinePtr cur
OperatorBuilderPtr analytic_sink =
std::make_shared<AnalyticSinkOperatorBuilder>(node->id(),
node);
- RETURN_IF_ERROR(new_pipeline->set_sink(analytic_sink));
+ RETURN_IF_ERROR(new_pipeline->set_sink_builder(analytic_sink));
OperatorBuilderPtr analytic_source =
std::make_shared<AnalyticSourceOperatorBuilder>(node->id(),
node);
@@ -637,7 +641,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode*
node, PipelinePtr cur
}
OperatorBuilderPtr join_sink =
std::make_shared<HashJoinBuildSinkBuilder>(node->id(),
join_node);
- RETURN_IF_ERROR(new_pipe->set_sink(join_sink));
+ RETURN_IF_ERROR(new_pipe->set_sink_builder(join_sink));
RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe));
OperatorBuilderPtr join_source =
@@ -652,7 +656,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode*
node, PipelinePtr cur
RETURN_IF_ERROR(_build_pipelines(node->child(1), new_pipe));
OperatorBuilderPtr join_sink =
std::make_shared<NestLoopJoinBuildOperatorBuilder>(node->id(),
node);
- RETURN_IF_ERROR(new_pipe->set_sink(join_sink));
+ RETURN_IF_ERROR(new_pipe->set_sink_builder(join_sink));
RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe));
OperatorBuilderPtr join_source =
@@ -690,7 +694,7 @@ Status
PipelineFragmentContext::_build_operators_for_set_operation_node(ExecNode
RETURN_IF_ERROR(_build_pipelines(node->child(0), build_pipeline));
OperatorBuilderPtr sink_builder =
std::make_shared<SetSinkOperatorBuilder<is_intersect>>(node->id(),
node);
- RETURN_IF_ERROR(build_pipeline->set_sink(sink_builder));
+ RETURN_IF_ERROR(build_pipeline->set_sink_builder(sink_builder));
for (int child_id = 1; child_id < node->children_count(); ++child_id) {
auto probe_pipeline = add_pipeline();
@@ -698,7 +702,7 @@ Status
PipelineFragmentContext::_build_operators_for_set_operation_node(ExecNode
OperatorBuilderPtr probe_sink_builder =
std::make_shared<SetProbeSinkOperatorBuilder<is_intersect>>(node->id(),
child_id,
node);
- RETURN_IF_ERROR(probe_pipeline->set_sink(probe_sink_builder));
+ RETURN_IF_ERROR(probe_pipeline->set_sink_builder(probe_sink_builder));
}
OperatorBuilderPtr source_builder =
@@ -827,7 +831,7 @@ Status PipelineFragmentContext::_create_sink(int sender_id,
const TDataSink& thr
case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: {
sink_ =
std::make_shared<MultiCastDataStreamSinkOperatorBuilder>(next_operator_builder_id(),
_sink.get());
- RETURN_IF_ERROR(_root_pipeline->set_sink(sink_));
+ RETURN_IF_ERROR(_root_pipeline->set_sink_builder(sink_));
auto& multi_cast_data_streamer =
assert_cast<vectorized::MultiCastDataStreamSink*>(_sink.get())
@@ -862,7 +866,7 @@ Status PipelineFragmentContext::_create_sink(int sender_id,
const TDataSink& thr
// 3. create and set sink operator of data stream sender for new
pipeline
OperatorBuilderPtr sink_op_builder =
std::make_shared<ExchangeSinkOperatorBuilder>(
next_operator_builder_id(),
_multi_cast_stream_sink_senders[i].get(), i);
- static_cast<void>(new_pipeline->set_sink(sink_op_builder));
+ static_cast<void>(new_pipeline->set_sink_builder(sink_op_builder));
// 4. init and prepare the data_stream_sender of diff exchange
TDataSink t;
@@ -876,7 +880,7 @@ Status PipelineFragmentContext::_create_sink(int sender_id,
const TDataSink& thr
default:
return Status::InternalError("Unsuported sink type in pipeline: {}",
thrift_sink.type);
}
- return _root_pipeline->set_sink(sink_);
+ return _root_pipeline->set_sink_builder(sink_);
}
// If all pipeline tasks binded to the fragment instance are finished, then we
could
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]