This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 198f8c296fb [refactor](pipelineX) Reduce prepare overhead (PART III) 
(#33689)
198f8c296fb is described below

commit 198f8c296fbb86e8cd770161bebc339097b5cb8f
Author: Gabriel <[email protected]>
AuthorDate: Wed Apr 17 11:20:10 2024 +0800

    [refactor](pipelineX) Reduce prepare overhead (PART III) (#33689)
---
 be/src/pipeline/exec/join_probe_operator.cpp          | 17 ++++++++++++-----
 be/src/pipeline/exec/join_probe_operator.h            |  1 +
 .../pipeline/exec/multi_cast_data_stream_source.cpp   | 19 ++++++++++++++-----
 be/src/pipeline/exec/multi_cast_data_stream_source.h  |  6 +-----
 .../pipeline/exec/nested_loop_join_build_operator.cpp | 16 ++++++++++++----
 .../pipeline/exec/nested_loop_join_build_operator.h   |  1 +
 .../pipeline/exec/nested_loop_join_probe_operator.cpp | 10 ++++++++--
 .../pipeline/exec/nested_loop_join_probe_operator.h   |  1 +
 8 files changed, 50 insertions(+), 21 deletions(-)

diff --git a/be/src/pipeline/exec/join_probe_operator.cpp 
b/be/src/pipeline/exec/join_probe_operator.cpp
index c78e5423709..31331568aac 100644
--- a/be/src/pipeline/exec/join_probe_operator.cpp
+++ b/be/src/pipeline/exec/join_probe_operator.cpp
@@ -28,6 +28,18 @@ template <typename SharedStateArg, typename Derived>
 Status JoinProbeLocalState<SharedStateArg, Derived>::init(RuntimeState* state,
                                                           LocalStateInfo& 
info) {
     RETURN_IF_ERROR(Base::init(state, info));
+
+    _probe_timer = ADD_TIMER(Base::profile(), "ProbeTime");
+    _join_filter_timer = ADD_TIMER(Base::profile(), "JoinFilterTimer");
+    _build_output_block_timer = ADD_TIMER(Base::profile(), "BuildOutputBlock");
+    _probe_rows_counter = ADD_COUNTER_WITH_LEVEL(Base::profile(), "ProbeRows", 
TUnit::UNIT, 1);
+
+    return Status::OK();
+}
+
+template <typename SharedStateArg, typename Derived>
+Status JoinProbeLocalState<SharedStateArg, Derived>::open(RuntimeState* state) 
{
+    RETURN_IF_ERROR(Base::open(state));
     auto& p = Base::_parent->template cast<typename Derived::Parent>();
     // only use in outer join as the bool column to mark for function of 
`tuple_is_null`
     if (p._is_outer_join) {
@@ -39,11 +51,6 @@ Status JoinProbeLocalState<SharedStateArg, 
Derived>::init(RuntimeState* state,
         RETURN_IF_ERROR(p._output_expr_ctxs[i]->clone(state, 
_output_expr_ctxs[i]));
     }
 
-    _probe_timer = ADD_TIMER(Base::profile(), "ProbeTime");
-    _join_filter_timer = ADD_TIMER(Base::profile(), "JoinFilterTimer");
-    _build_output_block_timer = ADD_TIMER(Base::profile(), "BuildOutputBlock");
-    _probe_rows_counter = ADD_COUNTER_WITH_LEVEL(Base::profile(), "ProbeRows", 
TUnit::UNIT, 1);
-
     return Status::OK();
 }
 
diff --git a/be/src/pipeline/exec/join_probe_operator.h 
b/be/src/pipeline/exec/join_probe_operator.h
index 4072baa72fc..679446147ef 100644
--- a/be/src/pipeline/exec/join_probe_operator.h
+++ b/be/src/pipeline/exec/join_probe_operator.h
@@ -31,6 +31,7 @@ class JoinProbeLocalState : public 
PipelineXLocalState<SharedStateArg> {
 public:
     using Base = PipelineXLocalState<SharedStateArg>;
     Status init(RuntimeState* state, LocalStateInfo& info) override;
+    Status open(RuntimeState* state) override;
     Status close(RuntimeState* state) override;
     virtual void add_tuple_is_null_column(vectorized::Block* block) = 0;
 
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp 
b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
index c40af83bd58..90c809c5359 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
@@ -135,13 +135,9 @@ Status 
MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalState
     RETURN_IF_ERROR(Base::init(state, info));
     RETURN_IF_ERROR(RuntimeFilterConsumer::init(state));
     SCOPED_TIMER(exec_time_counter());
-    SCOPED_TIMER(_open_timer);
+    SCOPED_TIMER(_init_timer);
     auto& p = _parent->cast<Parent>();
     
_shared_state->multi_cast_data_streamer.set_dep_by_sender_idx(p._consumer_id, 
_dependency);
-    _output_expr_contexts.resize(p._output_expr_contexts.size());
-    for (size_t i = 0; i < p._output_expr_contexts.size(); i++) {
-        RETURN_IF_ERROR(p._output_expr_contexts[i]->clone(state, 
_output_expr_contexts[i]));
-    }
     _wait_for_rf_timer = ADD_TIMER(_runtime_profile, "WaitForRuntimeFilter");
     // init profile for runtime filter
     RuntimeFilterConsumer::_init_profile(profile());
@@ -150,6 +146,19 @@ Status 
MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalState
     return Status::OK();
 }
 
+Status MultiCastDataStreamSourceLocalState::open(RuntimeState* state) {
+    SCOPED_TIMER(exec_time_counter());
+    SCOPED_TIMER(_open_timer);
+    RETURN_IF_ERROR(Base::open(state));
+    RETURN_IF_ERROR(_acquire_runtime_filter(true));
+    auto& p = _parent->cast<Parent>();
+    _output_expr_contexts.resize(p._output_expr_contexts.size());
+    for (size_t i = 0; i < p._output_expr_contexts.size(); i++) {
+        RETURN_IF_ERROR(p._output_expr_contexts[i]->clone(state, 
_output_expr_contexts[i]));
+    }
+    return Status::OK();
+}
+
 Status MultiCastDataStreamSourceLocalState::close(RuntimeState* state) {
     if (_closed) {
         return Status::OK();
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h 
b/be/src/pipeline/exec/multi_cast_data_stream_source.h
index fd1f6f2c033..8d14b4f266b 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h
@@ -102,11 +102,7 @@ public:
     MultiCastDataStreamSourceLocalState(RuntimeState* state, OperatorXBase* 
parent);
     Status init(RuntimeState* state, LocalStateInfo& info) override;
 
-    Status open(RuntimeState* state) override {
-        RETURN_IF_ERROR(Base::open(state));
-        RETURN_IF_ERROR(_acquire_runtime_filter(true));
-        return Status::OK();
-    }
+    Status open(RuntimeState* state) override;
     Status close(RuntimeState* state) override;
     friend class MultiCastDataStreamerSourceOperatorX;
 
diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp 
b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
index f074afce374..66612700fed 100644
--- a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
+++ b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
@@ -36,10 +36,6 @@ Status NestedLoopJoinBuildSinkLocalState::init(RuntimeState* 
state, LocalSinkSta
     SCOPED_TIMER(_open_timer);
     auto& p = _parent->cast<NestedLoopJoinBuildSinkOperatorX>();
     _shared_state->join_op_variants = p._join_op_variants;
-    _filter_src_expr_ctxs.resize(p._filter_src_expr_ctxs.size());
-    for (size_t i = 0; i < _filter_src_expr_ctxs.size(); i++) {
-        RETURN_IF_ERROR(p._filter_src_expr_ctxs[i]->clone(state, 
_filter_src_expr_ctxs[i]));
-    }
     _runtime_filters.resize(p._runtime_filter_descs.size());
     for (size_t i = 0; i < p._runtime_filter_descs.size(); i++) {
         RETURN_IF_ERROR(state->register_producer_runtime_filter(
@@ -48,6 +44,18 @@ Status NestedLoopJoinBuildSinkLocalState::init(RuntimeState* 
state, LocalSinkSta
     return Status::OK();
 }
 
+Status NestedLoopJoinBuildSinkLocalState::open(RuntimeState* state) {
+    SCOPED_TIMER(exec_time_counter());
+    SCOPED_TIMER(_open_timer);
+    RETURN_IF_ERROR(JoinBuildSinkLocalState::open(state));
+    auto& p = _parent->cast<NestedLoopJoinBuildSinkOperatorX>();
+    _filter_src_expr_ctxs.resize(p._filter_src_expr_ctxs.size());
+    for (size_t i = 0; i < _filter_src_expr_ctxs.size(); i++) {
+        RETURN_IF_ERROR(p._filter_src_expr_ctxs[i]->clone(state, 
_filter_src_expr_ctxs[i]));
+    }
+    return Status::OK();
+}
+
 NestedLoopJoinBuildSinkOperatorX::NestedLoopJoinBuildSinkOperatorX(ObjectPool* 
pool,
                                                                    int 
operator_id,
                                                                    const 
TPlanNode& tnode,
diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.h 
b/be/src/pipeline/exec/nested_loop_join_build_operator.h
index 52f723b13ae..da42e961f47 100644
--- a/be/src/pipeline/exec/nested_loop_join_build_operator.h
+++ b/be/src/pipeline/exec/nested_loop_join_build_operator.h
@@ -56,6 +56,7 @@ public:
     ~NestedLoopJoinBuildSinkLocalState() = default;
 
     Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
+    Status open(RuntimeState* state) override;
 
     vectorized::VExprContextSPtrs& filter_src_expr_ctxs() { return 
_filter_src_expr_ctxs; }
     RuntimeProfile::Counter* runtime_filter_compute_timer() {
diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp 
b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
index 271891709b0..c7afa2c399c 100644
--- a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
@@ -52,6 +52,14 @@ 
NestedLoopJoinProbeLocalState::NestedLoopJoinProbeLocalState(RuntimeState* state
 Status NestedLoopJoinProbeLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     RETURN_IF_ERROR(JoinProbeLocalState::init(state, info));
     SCOPED_TIMER(exec_time_counter());
+    SCOPED_TIMER(_init_timer);
+    _loop_join_timer = ADD_TIMER(profile(), "LoopGenerateJoin");
+    return Status::OK();
+}
+
+Status NestedLoopJoinProbeLocalState::open(RuntimeState* state) {
+    RETURN_IF_ERROR(JoinProbeLocalState::open(state));
+    SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_open_timer);
     auto& p = _parent->cast<NestedLoopJoinProbeOperatorX>();
     _join_conjuncts.resize(p._join_conjuncts.size());
@@ -59,8 +67,6 @@ Status NestedLoopJoinProbeLocalState::init(RuntimeState* 
state, LocalStateInfo&
         RETURN_IF_ERROR(p._join_conjuncts[i]->clone(state, 
_join_conjuncts[i]));
     }
     _construct_mutable_join_block();
-
-    _loop_join_timer = ADD_TIMER(profile(), "LoopGenerateJoin");
     return Status::OK();
 }
 
diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.h 
b/be/src/pipeline/exec/nested_loop_join_probe_operator.h
index 7a8be87d922..de9f11b437e 100644
--- a/be/src/pipeline/exec/nested_loop_join_probe_operator.h
+++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.h
@@ -66,6 +66,7 @@ public:
         block->get_by_position(i).column->assume_mutable()->clear(); \
     }
     Status init(RuntimeState* state, LocalStateInfo& info) override;
+    Status open(RuntimeState* state) override;
     Status close(RuntimeState* state) override;
     template <typename JoinOpType, bool set_build_side_flag, bool 
set_probe_side_flag>
     Status generate_join_block_data(RuntimeState* state, JoinOpType& 
join_op_variants);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to