This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 8ff4d55848f9d03391463bf23d2249fec0473731 Author: Gabriel <[email protected]> AuthorDate: Tue Apr 16 10:21:31 2024 +0800 [refactor](pipelineX) Reduce prepare overhead (PART II) (#33681) --- be/src/pipeline/exec/olap_table_sink_operator.cpp | 9 ----- be/src/pipeline/exec/olap_table_sink_operator.h | 7 ---- .../pipeline/exec/olap_table_sink_v2_operator.cpp | 9 ----- be/src/pipeline/exec/olap_table_sink_v2_operator.h | 7 ---- .../local_exchange_sink_operator.cpp | 8 ++++- .../local_exchange/local_exchange_sink_operator.h | 1 + .../local_exchange_source_operator.cpp | 10 +++++- .../local_exchange_source_operator.h | 1 + be/src/pipeline/pipeline_x/operator.cpp | 42 ++++++++++++---------- be/src/pipeline/pipeline_x/operator.h | 3 +- be/src/vec/sink/volap_table_sink.cpp | 8 +---- be/src/vec/sink/volap_table_sink.h | 4 --- be/src/vec/sink/volap_table_sink_v2.cpp | 8 +---- be/src/vec/sink/volap_table_sink_v2.h | 4 --- be/src/vec/sink/writer/vtablet_writer.cpp | 6 +--- be/src/vec/sink/writer/vtablet_writer.h | 2 -- be/src/vec/sink/writer/vtablet_writer_v2.cpp | 6 +--- be/src/vec/sink/writer/vtablet_writer_v2.h | 2 -- 18 files changed, 47 insertions(+), 90 deletions(-) diff --git a/be/src/pipeline/exec/olap_table_sink_operator.cpp b/be/src/pipeline/exec/olap_table_sink_operator.cpp index 7c9e71da56c..faffaf99c11 100644 --- a/be/src/pipeline/exec/olap_table_sink_operator.cpp +++ b/be/src/pipeline/exec/olap_table_sink_operator.cpp @@ -29,15 +29,6 @@ OperatorPtr OlapTableSinkOperatorBuilder::build_operator() { return std::make_shared<OlapTableSinkOperator>(this, _sink); } -Status OlapTableSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { - RETURN_IF_ERROR(Base::init(state, info)); - SCOPED_TIMER(exec_time_counter()); - SCOPED_TIMER(_open_timer); - auto& p = _parent->cast<Parent>(); - RETURN_IF_ERROR(_writer->init_properties(p._pool)); - return Status::OK(); -} - Status OlapTableSinkLocalState::close(RuntimeState* state, Status exec_status) { if (Base::_closed) { return Status::OK(); diff --git a/be/src/pipeline/exec/olap_table_sink_operator.h b/be/src/pipeline/exec/olap_table_sink_operator.h index c688660e262..19c192160fd 100644 --- a/be/src/pipeline/exec/olap_table_sink_operator.h +++ b/be/src/pipeline/exec/olap_table_sink_operator.h @@ -52,13 +52,6 @@ public: ENABLE_FACTORY_CREATOR(OlapTableSinkLocalState); OlapTableSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) : Base(parent, state) {}; - Status init(RuntimeState* state, LocalSinkStateInfo& info) override; - Status open(RuntimeState* state) override { - SCOPED_TIMER(exec_time_counter()); - SCOPED_TIMER(_open_timer); - return Base::open(state); - } - Status close(RuntimeState* state, Status exec_status) override; friend class OlapTableSinkOperatorX; diff --git a/be/src/pipeline/exec/olap_table_sink_v2_operator.cpp b/be/src/pipeline/exec/olap_table_sink_v2_operator.cpp index 0f43111ef55..4b31edb091c 100644 --- a/be/src/pipeline/exec/olap_table_sink_v2_operator.cpp +++ b/be/src/pipeline/exec/olap_table_sink_v2_operator.cpp @@ -25,15 +25,6 @@ OperatorPtr OlapTableSinkV2OperatorBuilder::build_operator() { return std::make_shared<OlapTableSinkV2Operator>(this, _sink); } -Status OlapTableSinkV2LocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { - RETURN_IF_ERROR(Base::init(state, info)); - SCOPED_TIMER(exec_time_counter()); - SCOPED_TIMER(_open_timer); - auto& p = _parent->cast<Parent>(); - RETURN_IF_ERROR(_writer->init_properties(p._pool)); - return Status::OK(); -} - Status OlapTableSinkV2LocalState::close(RuntimeState* state, Status exec_status) { if (Base::_closed) { return Status::OK(); diff --git a/be/src/pipeline/exec/olap_table_sink_v2_operator.h b/be/src/pipeline/exec/olap_table_sink_v2_operator.h index 595009cfc94..1fcd4716268 100644 --- a/be/src/pipeline/exec/olap_table_sink_v2_operator.h +++ b/be/src/pipeline/exec/olap_table_sink_v2_operator.h @@ -52,13 +52,6 @@ public: ENABLE_FACTORY_CREATOR(OlapTableSinkV2LocalState); OlapTableSinkV2LocalState(DataSinkOperatorXBase* parent, RuntimeState* state) : Base(parent, state) {}; - Status init(RuntimeState* state, LocalSinkStateInfo& info) override; - Status open(RuntimeState* state) override { - SCOPED_TIMER(exec_time_counter()); - SCOPED_TIMER(_open_timer); - return Base::open(state); - } - Status close(RuntimeState* state, Status exec_status) override; friend class OlapTableSinkV2OperatorX; diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp index 068b8d1701f..d4252de7153 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp @@ -24,10 +24,16 @@ namespace doris::pipeline { Status LocalExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); SCOPED_TIMER(exec_time_counter()); - SCOPED_TIMER(_open_timer); + SCOPED_TIMER(_init_timer); _compute_hash_value_timer = ADD_TIMER(profile(), "ComputeHashValueTime"); _distribute_timer = ADD_TIMER(profile(), "DistributeDataTime"); + return Status::OK(); +} +Status LocalExchangeSinkLocalState::open(RuntimeState* state) { + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_open_timer); + RETURN_IF_ERROR(Base::open(state)); _exchanger = _shared_state->exchanger.get(); DCHECK(_exchanger != nullptr); diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h index b7e0d754655..b3ecf29736f 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h @@ -37,6 +37,7 @@ public: ~LocalExchangeSinkLocalState() override = default; Status init(RuntimeState* state, LocalSinkStateInfo& info) override; + Status open(RuntimeState* state) override; Status close(RuntimeState* state, Status exec_status) override; std::string debug_string(int indentation_level) const override; diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp index 71a5a6b3c13..4b0840ea01a 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp @@ -24,9 +24,17 @@ namespace doris::pipeline { Status LocalExchangeSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); SCOPED_TIMER(exec_time_counter()); - SCOPED_TIMER(_open_timer); + SCOPED_TIMER(_init_timer); _channel_id = info.task_idx; _shared_state->mem_trackers[_channel_id] = _mem_tracker.get(); + return Status::OK(); +} + +Status LocalExchangeSourceLocalState::open(RuntimeState* state) { + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_open_timer); + RETURN_IF_ERROR(Base::open(state)); + _exchanger = _shared_state->exchanger.get(); DCHECK(_exchanger != nullptr); _get_block_failed_counter = diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h index 53e6aef3327..7d416b10c19 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h @@ -35,6 +35,7 @@ public: : Base(state, parent) {} Status init(RuntimeState* state, LocalStateInfo& info) override; + Status open(RuntimeState* state) override; std::string debug_string(int indentation_level) const override; private: diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index a468729765f..0f6728ef8e7 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -393,6 +393,24 @@ Status PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, LocalState } } + _rows_returned_counter = + ADD_COUNTER_WITH_LEVEL(_runtime_profile, "RowsProduced", TUnit::UNIT, 1); + _blocks_returned_counter = + ADD_COUNTER_WITH_LEVEL(_runtime_profile, "BlocksProduced", TUnit::UNIT, 1); + _projection_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "ProjectionTime", 1); + _init_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "InitTime", 1); + _open_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "OpenTime", 1); + _close_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "CloseTime", 1); + _exec_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "ExecTime", 1); + _mem_tracker = std::make_unique<MemTracker>("PipelineXLocalState:" + _runtime_profile->name()); + _memory_used_counter = ADD_LABEL_COUNTER_WITH_LEVEL(_runtime_profile, "MemoryUsage", 1); + _peak_memory_usage_counter = _runtime_profile->AddHighWaterMarkCounter( + "PeakMemoryUsage", TUnit::BYTES, "MemoryUsage", 1); + return Status::OK(); +} + +template <typename SharedStateArg> +Status PipelineXLocalState<SharedStateArg>::open(RuntimeState* state) { _conjuncts.resize(_parent->_conjuncts.size()); _projections.resize(_parent->_projections.size()); for (size_t i = 0; i < _conjuncts.size(); i++) { @@ -409,20 +427,6 @@ Status PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, LocalState state, _intermediate_projections[i][j])); } } - - _rows_returned_counter = - ADD_COUNTER_WITH_LEVEL(_runtime_profile, "RowsProduced", TUnit::UNIT, 1); - _blocks_returned_counter = - ADD_COUNTER_WITH_LEVEL(_runtime_profile, "BlocksProduced", TUnit::UNIT, 1); - _projection_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "ProjectionTime", 1); - _init_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "InitTime", 1); - _open_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "OpenTime", 1); - _close_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "CloseTime", 1); - _exec_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "ExecTime", 1); - _mem_tracker = std::make_unique<MemTracker>("PipelineXLocalState:" + _runtime_profile->name()); - _memory_used_counter = ADD_LABEL_COUNTER_WITH_LEVEL(_runtime_profile, "MemoryUsage", 1); - _peak_memory_usage_counter = _runtime_profile->AddHighWaterMarkCounter( - "PeakMemoryUsage", TUnit::BYTES, "MemoryUsage", 1); return Status::OK(); } @@ -539,11 +543,6 @@ template <typename Writer, typename Parent> requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>) Status AsyncWriterSink<Writer, Parent>::init(RuntimeState* state, LocalSinkStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); - _output_vexpr_ctxs.resize(_parent->cast<Parent>()._output_vexpr_ctxs.size()); - for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) { - RETURN_IF_ERROR( - _parent->cast<Parent>()._output_vexpr_ctxs[i]->clone(state, _output_vexpr_ctxs[i])); - } _writer.reset(new Writer(info.tsink, _output_vexpr_ctxs)); _async_writer_dependency = AsyncWriterDependency::create_shared( _parent->operator_id(), _parent->node_id(), state->get_query_ctx()); @@ -558,6 +557,11 @@ template <typename Writer, typename Parent> requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>) Status AsyncWriterSink<Writer, Parent>::open(RuntimeState* state) { RETURN_IF_ERROR(Base::open(state)); + _output_vexpr_ctxs.resize(_parent->cast<Parent>()._output_vexpr_ctxs.size()); + for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) { + RETURN_IF_ERROR( + _parent->cast<Parent>()._output_vexpr_ctxs[i]->clone(state, _output_vexpr_ctxs[i])); + } RETURN_IF_ERROR(_writer->start_writer(state, _profile)); return Status::OK(); } diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index 2d1f1611b66..b470c9237e8 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -73,7 +73,7 @@ public: virtual Status init(RuntimeState* state, LocalStateInfo& info) = 0; // Do initialization. This step can be executed multiple times, so we should make sure it is // idempotent (e.g. wait for runtime filters). - virtual Status open(RuntimeState* state) { return Status::OK(); } + virtual Status open(RuntimeState* state) = 0; virtual Status close(RuntimeState* state) = 0; // If use projection, we should clear `_origin_block`. @@ -403,6 +403,7 @@ public: ~PipelineXLocalState() override = default; Status init(RuntimeState* state, LocalStateInfo& info) override; + Status open(RuntimeState* state) override; virtual std::string name_suffix() const { return " (id=" + std::to_string(_parent->node_id()) + ")"; diff --git a/be/src/vec/sink/volap_table_sink.cpp b/be/src/vec/sink/volap_table_sink.cpp index 7d607d5f003..8da183cd61f 100644 --- a/be/src/vec/sink/volap_table_sink.cpp +++ b/be/src/vec/sink/volap_table_sink.cpp @@ -122,13 +122,7 @@ namespace vectorized { VOlapTableSink::VOlapTableSink(ObjectPool* pool, const RowDescriptor& row_desc, const std::vector<TExpr>& texprs) - : AsyncWriterSink<VTabletWriter, VOLAP_TABLE_SINK>(row_desc, texprs), _pool(pool) {} - -Status VOlapTableSink::init(const TDataSink& t_sink) { - RETURN_IF_ERROR(AsyncWriterSink::init(t_sink)); - RETURN_IF_ERROR(_writer->init_properties(_pool)); - return Status::OK(); -} + : AsyncWriterSink<VTabletWriter, VOLAP_TABLE_SINK>(row_desc, texprs) {} Status VOlapTableSink::close(RuntimeState* state, Status exec_status) { if (_closed) { diff --git a/be/src/vec/sink/volap_table_sink.h b/be/src/vec/sink/volap_table_sink.h index add285cdfdd..2f7d2b5d22f 100644 --- a/be/src/vec/sink/volap_table_sink.h +++ b/be/src/vec/sink/volap_table_sink.h @@ -86,14 +86,10 @@ public: // Construct from thrift struct which is generated by FE. VOlapTableSink(ObjectPool* pool, const RowDescriptor& row_desc, const std::vector<TExpr>& texprs); - // the real writer will construct in (actually, father's) init but not constructor - Status init(const TDataSink& sink) override; Status close(RuntimeState* state, Status exec_status) override; private: - ObjectPool* _pool = nullptr; - Status _close_status = Status::OK(); }; diff --git a/be/src/vec/sink/volap_table_sink_v2.cpp b/be/src/vec/sink/volap_table_sink_v2.cpp index a73ee483bd3..fbc57fc83e1 100644 --- a/be/src/vec/sink/volap_table_sink_v2.cpp +++ b/be/src/vec/sink/volap_table_sink_v2.cpp @@ -39,16 +39,10 @@ namespace vectorized { VOlapTableSinkV2::VOlapTableSinkV2(ObjectPool* pool, const RowDescriptor& row_desc, const std::vector<TExpr>& texprs) - : AsyncWriterSink<VTabletWriterV2, VOLAP_TABLE_SINK_V2>(row_desc, texprs), _pool(pool) {} + : AsyncWriterSink<VTabletWriterV2, VOLAP_TABLE_SINK_V2>(row_desc, texprs) {} VOlapTableSinkV2::~VOlapTableSinkV2() = default; -Status VOlapTableSinkV2::init(const TDataSink& t_sink) { - RETURN_IF_ERROR(AsyncWriterSink::init(t_sink)); - RETURN_IF_ERROR(_writer->init_properties(_pool)); - return Status::OK(); -} - Status VOlapTableSinkV2::close(RuntimeState* state, Status exec_status) { SCOPED_TIMER(_exec_timer); if (_closed) { diff --git a/be/src/vec/sink/volap_table_sink_v2.h b/be/src/vec/sink/volap_table_sink_v2.h index 8257d83bfc1..33e50adeb11 100644 --- a/be/src/vec/sink/volap_table_sink_v2.h +++ b/be/src/vec/sink/volap_table_sink_v2.h @@ -56,13 +56,9 @@ public: ~VOlapTableSinkV2() override; - Status init(const TDataSink& sink) override; - Status close(RuntimeState* state, Status exec_status) override; private: - ObjectPool* _pool = nullptr; - Status _close_status = Status::OK(); }; diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index ddd3fd68bd1..9bcb213742a 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -969,11 +969,6 @@ VTabletWriter::VTabletWriter(const TDataSink& t_sink, const VExprContextSPtrs& o _transfer_large_data_by_brpc = config::transfer_large_data_by_brpc; } -Status VTabletWriter::init_properties(doris::ObjectPool* pool) { - _pool = pool; - return Status::OK(); -} - void VTabletWriter::_send_batch_process() { SCOPED_TIMER(_non_blocking_send_timer); SCOPED_ATTACH_TASK(_state); @@ -1120,6 +1115,7 @@ Status VTabletWriter::_init_row_distribution() { Status VTabletWriter::_init(RuntimeState* state, RuntimeProfile* profile) { DCHECK(_t_sink.__isset.olap_table_sink); + _pool = state->obj_pool(); auto& table_sink = _t_sink.olap_table_sink; _load_id.set_hi(table_sink.load_id.hi); _load_id.set_lo(table_sink.load_id.lo); diff --git a/be/src/vec/sink/writer/vtablet_writer.h b/be/src/vec/sink/writer/vtablet_writer.h index 8d20d2fc71d..32539712e3b 100644 --- a/be/src/vec/sink/writer/vtablet_writer.h +++ b/be/src/vec/sink/writer/vtablet_writer.h @@ -520,8 +520,6 @@ class VTabletWriter final : public AsyncResultWriter { public: VTabletWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs); - Status init_properties(ObjectPool* pool); - Status write(Block& block) override; Status close(Status) override; diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index 3ebda99309a..617e96c6cc4 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -134,12 +134,8 @@ Status VTabletWriterV2::_init_row_distribution() { return _row_distribution.open(_output_row_desc); } -Status VTabletWriterV2::init_properties(ObjectPool* pool) { - _pool = pool; - return Status::OK(); -} - Status VTabletWriterV2::_init(RuntimeState* state, RuntimeProfile* profile) { + _pool = state->obj_pool(); auto& table_sink = _t_sink.olap_table_sink; _load_id.set_hi(table_sink.load_id.hi); _load_id.set_lo(table_sink.load_id.lo); diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h b/be/src/vec/sink/writer/vtablet_writer_v2.h index c04cff15cf4..20952229bbb 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.h +++ b/be/src/vec/sink/writer/vtablet_writer_v2.h @@ -106,8 +106,6 @@ public: ~VTabletWriterV2() override; - Status init_properties(ObjectPool* pool); - Status write(Block& block) override; Status open(RuntimeState* state, RuntimeProfile* profile) override; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
