This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 8ff4d55848f9d03391463bf23d2249fec0473731
Author: Gabriel <[email protected]>
AuthorDate: Tue Apr 16 10:21:31 2024 +0800

    [refactor](pipelineX) Reduce prepare overhead (PART II) (#33681)
---
 be/src/pipeline/exec/olap_table_sink_operator.cpp  |  9 -----
 be/src/pipeline/exec/olap_table_sink_operator.h    |  7 ----
 .../pipeline/exec/olap_table_sink_v2_operator.cpp  |  9 -----
 be/src/pipeline/exec/olap_table_sink_v2_operator.h |  7 ----
 .../local_exchange_sink_operator.cpp               |  8 ++++-
 .../local_exchange/local_exchange_sink_operator.h  |  1 +
 .../local_exchange_source_operator.cpp             | 10 +++++-
 .../local_exchange_source_operator.h               |  1 +
 be/src/pipeline/pipeline_x/operator.cpp            | 42 ++++++++++++----------
 be/src/pipeline/pipeline_x/operator.h              |  3 +-
 be/src/vec/sink/volap_table_sink.cpp               |  8 +----
 be/src/vec/sink/volap_table_sink.h                 |  4 ---
 be/src/vec/sink/volap_table_sink_v2.cpp            |  8 +----
 be/src/vec/sink/volap_table_sink_v2.h              |  4 ---
 be/src/vec/sink/writer/vtablet_writer.cpp          |  6 +---
 be/src/vec/sink/writer/vtablet_writer.h            |  2 --
 be/src/vec/sink/writer/vtablet_writer_v2.cpp       |  6 +---
 be/src/vec/sink/writer/vtablet_writer_v2.h         |  2 --
 18 files changed, 47 insertions(+), 90 deletions(-)

diff --git a/be/src/pipeline/exec/olap_table_sink_operator.cpp 
b/be/src/pipeline/exec/olap_table_sink_operator.cpp
index 7c9e71da56c..faffaf99c11 100644
--- a/be/src/pipeline/exec/olap_table_sink_operator.cpp
+++ b/be/src/pipeline/exec/olap_table_sink_operator.cpp
@@ -29,15 +29,6 @@ OperatorPtr OlapTableSinkOperatorBuilder::build_operator() {
     return std::make_shared<OlapTableSinkOperator>(this, _sink);
 }
 
-Status OlapTableSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& 
info) {
-    RETURN_IF_ERROR(Base::init(state, info));
-    SCOPED_TIMER(exec_time_counter());
-    SCOPED_TIMER(_open_timer);
-    auto& p = _parent->cast<Parent>();
-    RETURN_IF_ERROR(_writer->init_properties(p._pool));
-    return Status::OK();
-}
-
 Status OlapTableSinkLocalState::close(RuntimeState* state, Status exec_status) 
{
     if (Base::_closed) {
         return Status::OK();
diff --git a/be/src/pipeline/exec/olap_table_sink_operator.h 
b/be/src/pipeline/exec/olap_table_sink_operator.h
index c688660e262..19c192160fd 100644
--- a/be/src/pipeline/exec/olap_table_sink_operator.h
+++ b/be/src/pipeline/exec/olap_table_sink_operator.h
@@ -52,13 +52,6 @@ public:
     ENABLE_FACTORY_CREATOR(OlapTableSinkLocalState);
     OlapTableSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
             : Base(parent, state) {};
-    Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
-    Status open(RuntimeState* state) override {
-        SCOPED_TIMER(exec_time_counter());
-        SCOPED_TIMER(_open_timer);
-        return Base::open(state);
-    }
-
     Status close(RuntimeState* state, Status exec_status) override;
     friend class OlapTableSinkOperatorX;
 
diff --git a/be/src/pipeline/exec/olap_table_sink_v2_operator.cpp 
b/be/src/pipeline/exec/olap_table_sink_v2_operator.cpp
index 0f43111ef55..4b31edb091c 100644
--- a/be/src/pipeline/exec/olap_table_sink_v2_operator.cpp
+++ b/be/src/pipeline/exec/olap_table_sink_v2_operator.cpp
@@ -25,15 +25,6 @@ OperatorPtr OlapTableSinkV2OperatorBuilder::build_operator() 
{
     return std::make_shared<OlapTableSinkV2Operator>(this, _sink);
 }
 
-Status OlapTableSinkV2LocalState::init(RuntimeState* state, 
LocalSinkStateInfo& info) {
-    RETURN_IF_ERROR(Base::init(state, info));
-    SCOPED_TIMER(exec_time_counter());
-    SCOPED_TIMER(_open_timer);
-    auto& p = _parent->cast<Parent>();
-    RETURN_IF_ERROR(_writer->init_properties(p._pool));
-    return Status::OK();
-}
-
 Status OlapTableSinkV2LocalState::close(RuntimeState* state, Status 
exec_status) {
     if (Base::_closed) {
         return Status::OK();
diff --git a/be/src/pipeline/exec/olap_table_sink_v2_operator.h 
b/be/src/pipeline/exec/olap_table_sink_v2_operator.h
index 595009cfc94..1fcd4716268 100644
--- a/be/src/pipeline/exec/olap_table_sink_v2_operator.h
+++ b/be/src/pipeline/exec/olap_table_sink_v2_operator.h
@@ -52,13 +52,6 @@ public:
     ENABLE_FACTORY_CREATOR(OlapTableSinkV2LocalState);
     OlapTableSinkV2LocalState(DataSinkOperatorXBase* parent, RuntimeState* 
state)
             : Base(parent, state) {};
-    Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
-    Status open(RuntimeState* state) override {
-        SCOPED_TIMER(exec_time_counter());
-        SCOPED_TIMER(_open_timer);
-        return Base::open(state);
-    }
-
     Status close(RuntimeState* state, Status exec_status) override;
     friend class OlapTableSinkV2OperatorX;
 
diff --git 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp
index 068b8d1701f..d4252de7153 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp
@@ -24,10 +24,16 @@ namespace doris::pipeline {
 Status LocalExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& info) {
     RETURN_IF_ERROR(Base::init(state, info));
     SCOPED_TIMER(exec_time_counter());
-    SCOPED_TIMER(_open_timer);
+    SCOPED_TIMER(_init_timer);
     _compute_hash_value_timer = ADD_TIMER(profile(), "ComputeHashValueTime");
     _distribute_timer = ADD_TIMER(profile(), "DistributeDataTime");
+    return Status::OK();
+}
 
+Status LocalExchangeSinkLocalState::open(RuntimeState* state) {
+    SCOPED_TIMER(exec_time_counter());
+    SCOPED_TIMER(_open_timer);
+    RETURN_IF_ERROR(Base::open(state));
     _exchanger = _shared_state->exchanger.get();
     DCHECK(_exchanger != nullptr);
 
diff --git 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
index b7e0d754655..b3ecf29736f 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
@@ -37,6 +37,7 @@ public:
     ~LocalExchangeSinkLocalState() override = default;
 
     Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
+    Status open(RuntimeState* state) override;
     Status close(RuntimeState* state, Status exec_status) override;
     std::string debug_string(int indentation_level) const override;
 
diff --git 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
index 71a5a6b3c13..4b0840ea01a 100644
--- 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
+++ 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
@@ -24,9 +24,17 @@ namespace doris::pipeline {
 Status LocalExchangeSourceLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     RETURN_IF_ERROR(Base::init(state, info));
     SCOPED_TIMER(exec_time_counter());
-    SCOPED_TIMER(_open_timer);
+    SCOPED_TIMER(_init_timer);
     _channel_id = info.task_idx;
     _shared_state->mem_trackers[_channel_id] = _mem_tracker.get();
+    return Status::OK();
+}
+
+Status LocalExchangeSourceLocalState::open(RuntimeState* state) {
+    SCOPED_TIMER(exec_time_counter());
+    SCOPED_TIMER(_open_timer);
+    RETURN_IF_ERROR(Base::open(state));
+
     _exchanger = _shared_state->exchanger.get();
     DCHECK(_exchanger != nullptr);
     _get_block_failed_counter =
diff --git 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
index 53e6aef3327..7d416b10c19 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
@@ -35,6 +35,7 @@ public:
             : Base(state, parent) {}
 
     Status init(RuntimeState* state, LocalStateInfo& info) override;
+    Status open(RuntimeState* state) override;
     std::string debug_string(int indentation_level) const override;
 
 private:
diff --git a/be/src/pipeline/pipeline_x/operator.cpp 
b/be/src/pipeline/pipeline_x/operator.cpp
index a468729765f..0f6728ef8e7 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -393,6 +393,24 @@ Status 
PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, LocalState
         }
     }
 
+    _rows_returned_counter =
+            ADD_COUNTER_WITH_LEVEL(_runtime_profile, "RowsProduced", 
TUnit::UNIT, 1);
+    _blocks_returned_counter =
+            ADD_COUNTER_WITH_LEVEL(_runtime_profile, "BlocksProduced", 
TUnit::UNIT, 1);
+    _projection_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, 
"ProjectionTime", 1);
+    _init_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "InitTime", 1);
+    _open_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "OpenTime", 1);
+    _close_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "CloseTime", 1);
+    _exec_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "ExecTime", 1);
+    _mem_tracker = std::make_unique<MemTracker>("PipelineXLocalState:" + 
_runtime_profile->name());
+    _memory_used_counter = ADD_LABEL_COUNTER_WITH_LEVEL(_runtime_profile, 
"MemoryUsage", 1);
+    _peak_memory_usage_counter = _runtime_profile->AddHighWaterMarkCounter(
+            "PeakMemoryUsage", TUnit::BYTES, "MemoryUsage", 1);
+    return Status::OK();
+}
+
+template <typename SharedStateArg>
+Status PipelineXLocalState<SharedStateArg>::open(RuntimeState* state) {
     _conjuncts.resize(_parent->_conjuncts.size());
     _projections.resize(_parent->_projections.size());
     for (size_t i = 0; i < _conjuncts.size(); i++) {
@@ -409,20 +427,6 @@ Status 
PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, LocalState
                     state, _intermediate_projections[i][j]));
         }
     }
-
-    _rows_returned_counter =
-            ADD_COUNTER_WITH_LEVEL(_runtime_profile, "RowsProduced", 
TUnit::UNIT, 1);
-    _blocks_returned_counter =
-            ADD_COUNTER_WITH_LEVEL(_runtime_profile, "BlocksProduced", 
TUnit::UNIT, 1);
-    _projection_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, 
"ProjectionTime", 1);
-    _init_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "InitTime", 1);
-    _open_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "OpenTime", 1);
-    _close_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "CloseTime", 1);
-    _exec_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "ExecTime", 1);
-    _mem_tracker = std::make_unique<MemTracker>("PipelineXLocalState:" + 
_runtime_profile->name());
-    _memory_used_counter = ADD_LABEL_COUNTER_WITH_LEVEL(_runtime_profile, 
"MemoryUsage", 1);
-    _peak_memory_usage_counter = _runtime_profile->AddHighWaterMarkCounter(
-            "PeakMemoryUsage", TUnit::BYTES, "MemoryUsage", 1);
     return Status::OK();
 }
 
@@ -539,11 +543,6 @@ template <typename Writer, typename Parent>
     requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>)
 Status AsyncWriterSink<Writer, Parent>::init(RuntimeState* state, 
LocalSinkStateInfo& info) {
     RETURN_IF_ERROR(Base::init(state, info));
-    
_output_vexpr_ctxs.resize(_parent->cast<Parent>()._output_vexpr_ctxs.size());
-    for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) {
-        RETURN_IF_ERROR(
-                _parent->cast<Parent>()._output_vexpr_ctxs[i]->clone(state, 
_output_vexpr_ctxs[i]));
-    }
     _writer.reset(new Writer(info.tsink, _output_vexpr_ctxs));
     _async_writer_dependency = AsyncWriterDependency::create_shared(
             _parent->operator_id(), _parent->node_id(), 
state->get_query_ctx());
@@ -558,6 +557,11 @@ template <typename Writer, typename Parent>
     requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>)
 Status AsyncWriterSink<Writer, Parent>::open(RuntimeState* state) {
     RETURN_IF_ERROR(Base::open(state));
+    
_output_vexpr_ctxs.resize(_parent->cast<Parent>()._output_vexpr_ctxs.size());
+    for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) {
+        RETURN_IF_ERROR(
+                _parent->cast<Parent>()._output_vexpr_ctxs[i]->clone(state, 
_output_vexpr_ctxs[i]));
+    }
     RETURN_IF_ERROR(_writer->start_writer(state, _profile));
     return Status::OK();
 }
diff --git a/be/src/pipeline/pipeline_x/operator.h 
b/be/src/pipeline/pipeline_x/operator.h
index 2d1f1611b66..b470c9237e8 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -73,7 +73,7 @@ public:
     virtual Status init(RuntimeState* state, LocalStateInfo& info) = 0;
     // Do initialization. This step can be executed multiple times, so we 
should make sure it is
     // idempotent (e.g. wait for runtime filters).
-    virtual Status open(RuntimeState* state) { return Status::OK(); }
+    virtual Status open(RuntimeState* state) = 0;
     virtual Status close(RuntimeState* state) = 0;
 
     // If use projection, we should clear `_origin_block`.
@@ -403,6 +403,7 @@ public:
     ~PipelineXLocalState() override = default;
 
     Status init(RuntimeState* state, LocalStateInfo& info) override;
+    Status open(RuntimeState* state) override;
 
     virtual std::string name_suffix() const {
         return " (id=" + std::to_string(_parent->node_id()) + ")";
diff --git a/be/src/vec/sink/volap_table_sink.cpp 
b/be/src/vec/sink/volap_table_sink.cpp
index 7d607d5f003..8da183cd61f 100644
--- a/be/src/vec/sink/volap_table_sink.cpp
+++ b/be/src/vec/sink/volap_table_sink.cpp
@@ -122,13 +122,7 @@ namespace vectorized {
 
 VOlapTableSink::VOlapTableSink(ObjectPool* pool, const RowDescriptor& row_desc,
                                const std::vector<TExpr>& texprs)
-        : AsyncWriterSink<VTabletWriter, VOLAP_TABLE_SINK>(row_desc, texprs), 
_pool(pool) {}
-
-Status VOlapTableSink::init(const TDataSink& t_sink) {
-    RETURN_IF_ERROR(AsyncWriterSink::init(t_sink));
-    RETURN_IF_ERROR(_writer->init_properties(_pool));
-    return Status::OK();
-}
+        : AsyncWriterSink<VTabletWriter, VOLAP_TABLE_SINK>(row_desc, texprs) {}
 
 Status VOlapTableSink::close(RuntimeState* state, Status exec_status) {
     if (_closed) {
diff --git a/be/src/vec/sink/volap_table_sink.h 
b/be/src/vec/sink/volap_table_sink.h
index add285cdfdd..2f7d2b5d22f 100644
--- a/be/src/vec/sink/volap_table_sink.h
+++ b/be/src/vec/sink/volap_table_sink.h
@@ -86,14 +86,10 @@ public:
     // Construct from thrift struct which is generated by FE.
     VOlapTableSink(ObjectPool* pool, const RowDescriptor& row_desc,
                    const std::vector<TExpr>& texprs);
-    // the real writer will construct in (actually, father's) init but not 
constructor
-    Status init(const TDataSink& sink) override;
 
     Status close(RuntimeState* state, Status exec_status) override;
 
 private:
-    ObjectPool* _pool = nullptr;
-
     Status _close_status = Status::OK();
 };
 
diff --git a/be/src/vec/sink/volap_table_sink_v2.cpp 
b/be/src/vec/sink/volap_table_sink_v2.cpp
index a73ee483bd3..fbc57fc83e1 100644
--- a/be/src/vec/sink/volap_table_sink_v2.cpp
+++ b/be/src/vec/sink/volap_table_sink_v2.cpp
@@ -39,16 +39,10 @@ namespace vectorized {
 
 VOlapTableSinkV2::VOlapTableSinkV2(ObjectPool* pool, const RowDescriptor& 
row_desc,
                                    const std::vector<TExpr>& texprs)
-        : AsyncWriterSink<VTabletWriterV2, VOLAP_TABLE_SINK_V2>(row_desc, 
texprs), _pool(pool) {}
+        : AsyncWriterSink<VTabletWriterV2, VOLAP_TABLE_SINK_V2>(row_desc, 
texprs) {}
 
 VOlapTableSinkV2::~VOlapTableSinkV2() = default;
 
-Status VOlapTableSinkV2::init(const TDataSink& t_sink) {
-    RETURN_IF_ERROR(AsyncWriterSink::init(t_sink));
-    RETURN_IF_ERROR(_writer->init_properties(_pool));
-    return Status::OK();
-}
-
 Status VOlapTableSinkV2::close(RuntimeState* state, Status exec_status) {
     SCOPED_TIMER(_exec_timer);
     if (_closed) {
diff --git a/be/src/vec/sink/volap_table_sink_v2.h 
b/be/src/vec/sink/volap_table_sink_v2.h
index 8257d83bfc1..33e50adeb11 100644
--- a/be/src/vec/sink/volap_table_sink_v2.h
+++ b/be/src/vec/sink/volap_table_sink_v2.h
@@ -56,13 +56,9 @@ public:
 
     ~VOlapTableSinkV2() override;
 
-    Status init(const TDataSink& sink) override;
-
     Status close(RuntimeState* state, Status exec_status) override;
 
 private:
-    ObjectPool* _pool = nullptr;
-
     Status _close_status = Status::OK();
 };
 
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp 
b/be/src/vec/sink/writer/vtablet_writer.cpp
index ddd3fd68bd1..9bcb213742a 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -969,11 +969,6 @@ VTabletWriter::VTabletWriter(const TDataSink& t_sink, 
const VExprContextSPtrs& o
     _transfer_large_data_by_brpc = config::transfer_large_data_by_brpc;
 }
 
-Status VTabletWriter::init_properties(doris::ObjectPool* pool) {
-    _pool = pool;
-    return Status::OK();
-}
-
 void VTabletWriter::_send_batch_process() {
     SCOPED_TIMER(_non_blocking_send_timer);
     SCOPED_ATTACH_TASK(_state);
@@ -1120,6 +1115,7 @@ Status VTabletWriter::_init_row_distribution() {
 
 Status VTabletWriter::_init(RuntimeState* state, RuntimeProfile* profile) {
     DCHECK(_t_sink.__isset.olap_table_sink);
+    _pool = state->obj_pool();
     auto& table_sink = _t_sink.olap_table_sink;
     _load_id.set_hi(table_sink.load_id.hi);
     _load_id.set_lo(table_sink.load_id.lo);
diff --git a/be/src/vec/sink/writer/vtablet_writer.h 
b/be/src/vec/sink/writer/vtablet_writer.h
index 8d20d2fc71d..32539712e3b 100644
--- a/be/src/vec/sink/writer/vtablet_writer.h
+++ b/be/src/vec/sink/writer/vtablet_writer.h
@@ -520,8 +520,6 @@ class VTabletWriter final : public AsyncResultWriter {
 public:
     VTabletWriter(const TDataSink& t_sink, const VExprContextSPtrs& 
output_exprs);
 
-    Status init_properties(ObjectPool* pool);
-
     Status write(Block& block) override;
 
     Status close(Status) override;
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 3ebda99309a..617e96c6cc4 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -134,12 +134,8 @@ Status VTabletWriterV2::_init_row_distribution() {
     return _row_distribution.open(_output_row_desc);
 }
 
-Status VTabletWriterV2::init_properties(ObjectPool* pool) {
-    _pool = pool;
-    return Status::OK();
-}
-
 Status VTabletWriterV2::_init(RuntimeState* state, RuntimeProfile* profile) {
+    _pool = state->obj_pool();
     auto& table_sink = _t_sink.olap_table_sink;
     _load_id.set_hi(table_sink.load_id.hi);
     _load_id.set_lo(table_sink.load_id.lo);
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h 
b/be/src/vec/sink/writer/vtablet_writer_v2.h
index c04cff15cf4..20952229bbb 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.h
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.h
@@ -106,8 +106,6 @@ public:
 
     ~VTabletWriterV2() override;
 
-    Status init_properties(ObjectPool* pool);
-
     Status write(Block& block) override;
 
     Status open(RuntimeState* state, RuntimeProfile* profile) override;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to