This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 77d7df9d0b301d07e5ab1d55cbcd00084540372b Author: Gabriel <[email protected]> AuthorDate: Wed Apr 17 11:20:10 2024 +0800 [refactor](pipelineX) Reduce prepare overhead (PART III) (#33689) --- be/src/pipeline/exec/join_probe_operator.cpp | 17 ++++++++++++----- be/src/pipeline/exec/join_probe_operator.h | 1 + .../pipeline/exec/multi_cast_data_stream_source.cpp | 19 ++++++++++++++----- be/src/pipeline/exec/multi_cast_data_stream_source.h | 6 +----- .../pipeline/exec/nested_loop_join_build_operator.cpp | 16 ++++++++++++---- .../pipeline/exec/nested_loop_join_build_operator.h | 1 + .../pipeline/exec/nested_loop_join_probe_operator.cpp | 10 ++++++++-- .../pipeline/exec/nested_loop_join_probe_operator.h | 1 + 8 files changed, 50 insertions(+), 21 deletions(-) diff --git a/be/src/pipeline/exec/join_probe_operator.cpp b/be/src/pipeline/exec/join_probe_operator.cpp index c78e5423709..31331568aac 100644 --- a/be/src/pipeline/exec/join_probe_operator.cpp +++ b/be/src/pipeline/exec/join_probe_operator.cpp @@ -28,6 +28,18 @@ template <typename SharedStateArg, typename Derived> Status JoinProbeLocalState<SharedStateArg, Derived>::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); + + _probe_timer = ADD_TIMER(Base::profile(), "ProbeTime"); + _join_filter_timer = ADD_TIMER(Base::profile(), "JoinFilterTimer"); + _build_output_block_timer = ADD_TIMER(Base::profile(), "BuildOutputBlock"); + _probe_rows_counter = ADD_COUNTER_WITH_LEVEL(Base::profile(), "ProbeRows", TUnit::UNIT, 1); + + return Status::OK(); +} + +template <typename SharedStateArg, typename Derived> +Status JoinProbeLocalState<SharedStateArg, Derived>::open(RuntimeState* state) { + RETURN_IF_ERROR(Base::open(state)); auto& p = Base::_parent->template cast<typename Derived::Parent>(); // only use in outer join as the bool column to mark for function of `tuple_is_null` if (p._is_outer_join) { @@ -39,11 +51,6 @@ Status JoinProbeLocalState<SharedStateArg, Derived>::init(RuntimeState* state, RETURN_IF_ERROR(p._output_expr_ctxs[i]->clone(state, _output_expr_ctxs[i])); } - _probe_timer = ADD_TIMER(Base::profile(), "ProbeTime"); - _join_filter_timer = ADD_TIMER(Base::profile(), "JoinFilterTimer"); - _build_output_block_timer = ADD_TIMER(Base::profile(), "BuildOutputBlock"); - _probe_rows_counter = ADD_COUNTER_WITH_LEVEL(Base::profile(), "ProbeRows", TUnit::UNIT, 1); - return Status::OK(); } diff --git a/be/src/pipeline/exec/join_probe_operator.h b/be/src/pipeline/exec/join_probe_operator.h index 4072baa72fc..679446147ef 100644 --- a/be/src/pipeline/exec/join_probe_operator.h +++ b/be/src/pipeline/exec/join_probe_operator.h @@ -31,6 +31,7 @@ class JoinProbeLocalState : public PipelineXLocalState<SharedStateArg> { public: using Base = PipelineXLocalState<SharedStateArg>; Status init(RuntimeState* state, LocalStateInfo& info) override; + Status open(RuntimeState* state) override; Status close(RuntimeState* state) override; virtual void add_tuple_is_null_column(vectorized::Block* block) = 0; diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp index c40af83bd58..90c809c5359 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp @@ -135,13 +135,9 @@ Status MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalState RETURN_IF_ERROR(Base::init(state, info)); RETURN_IF_ERROR(RuntimeFilterConsumer::init(state)); SCOPED_TIMER(exec_time_counter()); - SCOPED_TIMER(_open_timer); + SCOPED_TIMER(_init_timer); auto& p = _parent->cast<Parent>(); _shared_state->multi_cast_data_streamer.set_dep_by_sender_idx(p._consumer_id, _dependency); - _output_expr_contexts.resize(p._output_expr_contexts.size()); - for (size_t i = 0; i < p._output_expr_contexts.size(); i++) { - RETURN_IF_ERROR(p._output_expr_contexts[i]->clone(state, _output_expr_contexts[i])); - } _wait_for_rf_timer = ADD_TIMER(_runtime_profile, "WaitForRuntimeFilter"); // init profile for runtime filter RuntimeFilterConsumer::_init_profile(profile()); @@ -150,6 +146,19 @@ Status MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalState return Status::OK(); } +Status MultiCastDataStreamSourceLocalState::open(RuntimeState* state) { + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_open_timer); + RETURN_IF_ERROR(Base::open(state)); + RETURN_IF_ERROR(_acquire_runtime_filter(true)); + auto& p = _parent->cast<Parent>(); + _output_expr_contexts.resize(p._output_expr_contexts.size()); + for (size_t i = 0; i < p._output_expr_contexts.size(); i++) { + RETURN_IF_ERROR(p._output_expr_contexts[i]->clone(state, _output_expr_contexts[i])); + } + return Status::OK(); +} + Status MultiCastDataStreamSourceLocalState::close(RuntimeState* state) { if (_closed) { return Status::OK(); diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h b/be/src/pipeline/exec/multi_cast_data_stream_source.h index fd1f6f2c033..8d14b4f266b 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.h +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h @@ -102,11 +102,7 @@ public: MultiCastDataStreamSourceLocalState(RuntimeState* state, OperatorXBase* parent); Status init(RuntimeState* state, LocalStateInfo& info) override; - Status open(RuntimeState* state) override { - RETURN_IF_ERROR(Base::open(state)); - RETURN_IF_ERROR(_acquire_runtime_filter(true)); - return Status::OK(); - } + Status open(RuntimeState* state) override; Status close(RuntimeState* state) override; friend class MultiCastDataStreamerSourceOperatorX; diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp index f074afce374..66612700fed 100644 --- a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp +++ b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp @@ -36,10 +36,6 @@ Status NestedLoopJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkSta SCOPED_TIMER(_open_timer); auto& p = _parent->cast<NestedLoopJoinBuildSinkOperatorX>(); _shared_state->join_op_variants = p._join_op_variants; - _filter_src_expr_ctxs.resize(p._filter_src_expr_ctxs.size()); - for (size_t i = 0; i < _filter_src_expr_ctxs.size(); i++) { - RETURN_IF_ERROR(p._filter_src_expr_ctxs[i]->clone(state, _filter_src_expr_ctxs[i])); - } _runtime_filters.resize(p._runtime_filter_descs.size()); for (size_t i = 0; i < p._runtime_filter_descs.size(); i++) { RETURN_IF_ERROR(state->register_producer_runtime_filter( @@ -48,6 +44,18 @@ Status NestedLoopJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkSta return Status::OK(); } +Status NestedLoopJoinBuildSinkLocalState::open(RuntimeState* state) { + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_open_timer); + RETURN_IF_ERROR(JoinBuildSinkLocalState::open(state)); + auto& p = _parent->cast<NestedLoopJoinBuildSinkOperatorX>(); + _filter_src_expr_ctxs.resize(p._filter_src_expr_ctxs.size()); + for (size_t i = 0; i < _filter_src_expr_ctxs.size(); i++) { + RETURN_IF_ERROR(p._filter_src_expr_ctxs[i]->clone(state, _filter_src_expr_ctxs[i])); + } + return Status::OK(); +} + NestedLoopJoinBuildSinkOperatorX::NestedLoopJoinBuildSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.h b/be/src/pipeline/exec/nested_loop_join_build_operator.h index 52f723b13ae..da42e961f47 100644 --- a/be/src/pipeline/exec/nested_loop_join_build_operator.h +++ b/be/src/pipeline/exec/nested_loop_join_build_operator.h @@ -56,6 +56,7 @@ public: ~NestedLoopJoinBuildSinkLocalState() = default; Status init(RuntimeState* state, LocalSinkStateInfo& info) override; + Status open(RuntimeState* state) override; vectorized::VExprContextSPtrs& filter_src_expr_ctxs() { return _filter_src_expr_ctxs; } RuntimeProfile::Counter* runtime_filter_compute_timer() { diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp index 271891709b0..c7afa2c399c 100644 --- a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp +++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp @@ -52,6 +52,14 @@ NestedLoopJoinProbeLocalState::NestedLoopJoinProbeLocalState(RuntimeState* state Status NestedLoopJoinProbeLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(JoinProbeLocalState::init(state, info)); SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_init_timer); + _loop_join_timer = ADD_TIMER(profile(), "LoopGenerateJoin"); + return Status::OK(); +} + +Status NestedLoopJoinProbeLocalState::open(RuntimeState* state) { + RETURN_IF_ERROR(JoinProbeLocalState::open(state)); + SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); auto& p = _parent->cast<NestedLoopJoinProbeOperatorX>(); _join_conjuncts.resize(p._join_conjuncts.size()); @@ -59,8 +67,6 @@ Status NestedLoopJoinProbeLocalState::init(RuntimeState* state, LocalStateInfo& RETURN_IF_ERROR(p._join_conjuncts[i]->clone(state, _join_conjuncts[i])); } _construct_mutable_join_block(); - - _loop_join_timer = ADD_TIMER(profile(), "LoopGenerateJoin"); return Status::OK(); } diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.h b/be/src/pipeline/exec/nested_loop_join_probe_operator.h index 7a8be87d922..de9f11b437e 100644 --- a/be/src/pipeline/exec/nested_loop_join_probe_operator.h +++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.h @@ -66,6 +66,7 @@ public: block->get_by_position(i).column->assume_mutable()->clear(); \ } Status init(RuntimeState* state, LocalStateInfo& info) override; + Status open(RuntimeState* state) override; Status close(RuntimeState* state) override; template <typename JoinOpType, bool set_build_side_flag, bool set_probe_side_flag> Status generate_join_block_data(RuntimeState* state, JoinOpType& join_op_variants); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
