This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 198f8c296fb [refactor](pipelineX) Reduce prepare overhead (PART III)
(#33689)
198f8c296fb is described below
commit 198f8c296fbb86e8cd770161bebc339097b5cb8f
Author: Gabriel <[email protected]>
AuthorDate: Wed Apr 17 11:20:10 2024 +0800
[refactor](pipelineX) Reduce prepare overhead (PART III) (#33689)
---
be/src/pipeline/exec/join_probe_operator.cpp | 17 ++++++++++++-----
be/src/pipeline/exec/join_probe_operator.h | 1 +
.../pipeline/exec/multi_cast_data_stream_source.cpp | 19 ++++++++++++++-----
be/src/pipeline/exec/multi_cast_data_stream_source.h | 6 +-----
.../pipeline/exec/nested_loop_join_build_operator.cpp | 16 ++++++++++++----
.../pipeline/exec/nested_loop_join_build_operator.h | 1 +
.../pipeline/exec/nested_loop_join_probe_operator.cpp | 10 ++++++++--
.../pipeline/exec/nested_loop_join_probe_operator.h | 1 +
8 files changed, 50 insertions(+), 21 deletions(-)
diff --git a/be/src/pipeline/exec/join_probe_operator.cpp
b/be/src/pipeline/exec/join_probe_operator.cpp
index c78e5423709..31331568aac 100644
--- a/be/src/pipeline/exec/join_probe_operator.cpp
+++ b/be/src/pipeline/exec/join_probe_operator.cpp
@@ -28,6 +28,18 @@ template <typename SharedStateArg, typename Derived>
Status JoinProbeLocalState<SharedStateArg, Derived>::init(RuntimeState* state,
LocalStateInfo&
info) {
RETURN_IF_ERROR(Base::init(state, info));
+
+ _probe_timer = ADD_TIMER(Base::profile(), "ProbeTime");
+ _join_filter_timer = ADD_TIMER(Base::profile(), "JoinFilterTimer");
+ _build_output_block_timer = ADD_TIMER(Base::profile(), "BuildOutputBlock");
+ _probe_rows_counter = ADD_COUNTER_WITH_LEVEL(Base::profile(), "ProbeRows",
TUnit::UNIT, 1);
+
+ return Status::OK();
+}
+
+template <typename SharedStateArg, typename Derived>
+Status JoinProbeLocalState<SharedStateArg, Derived>::open(RuntimeState* state)
{
+ RETURN_IF_ERROR(Base::open(state));
auto& p = Base::_parent->template cast<typename Derived::Parent>();
// only use in outer join as the bool column to mark for function of
`tuple_is_null`
if (p._is_outer_join) {
@@ -39,11 +51,6 @@ Status JoinProbeLocalState<SharedStateArg,
Derived>::init(RuntimeState* state,
RETURN_IF_ERROR(p._output_expr_ctxs[i]->clone(state,
_output_expr_ctxs[i]));
}
- _probe_timer = ADD_TIMER(Base::profile(), "ProbeTime");
- _join_filter_timer = ADD_TIMER(Base::profile(), "JoinFilterTimer");
- _build_output_block_timer = ADD_TIMER(Base::profile(), "BuildOutputBlock");
- _probe_rows_counter = ADD_COUNTER_WITH_LEVEL(Base::profile(), "ProbeRows",
TUnit::UNIT, 1);
-
return Status::OK();
}
diff --git a/be/src/pipeline/exec/join_probe_operator.h
b/be/src/pipeline/exec/join_probe_operator.h
index 4072baa72fc..679446147ef 100644
--- a/be/src/pipeline/exec/join_probe_operator.h
+++ b/be/src/pipeline/exec/join_probe_operator.h
@@ -31,6 +31,7 @@ class JoinProbeLocalState : public
PipelineXLocalState<SharedStateArg> {
public:
using Base = PipelineXLocalState<SharedStateArg>;
Status init(RuntimeState* state, LocalStateInfo& info) override;
+ Status open(RuntimeState* state) override;
Status close(RuntimeState* state) override;
virtual void add_tuple_is_null_column(vectorized::Block* block) = 0;
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
index c40af83bd58..90c809c5359 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
@@ -135,13 +135,9 @@ Status
MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalState
RETURN_IF_ERROR(Base::init(state, info));
RETURN_IF_ERROR(RuntimeFilterConsumer::init(state));
SCOPED_TIMER(exec_time_counter());
- SCOPED_TIMER(_open_timer);
+ SCOPED_TIMER(_init_timer);
auto& p = _parent->cast<Parent>();
_shared_state->multi_cast_data_streamer.set_dep_by_sender_idx(p._consumer_id,
_dependency);
- _output_expr_contexts.resize(p._output_expr_contexts.size());
- for (size_t i = 0; i < p._output_expr_contexts.size(); i++) {
- RETURN_IF_ERROR(p._output_expr_contexts[i]->clone(state,
_output_expr_contexts[i]));
- }
_wait_for_rf_timer = ADD_TIMER(_runtime_profile, "WaitForRuntimeFilter");
// init profile for runtime filter
RuntimeFilterConsumer::_init_profile(profile());
@@ -150,6 +146,19 @@ Status
MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalState
return Status::OK();
}
+Status MultiCastDataStreamSourceLocalState::open(RuntimeState* state) {
+ SCOPED_TIMER(exec_time_counter());
+ SCOPED_TIMER(_open_timer);
+ RETURN_IF_ERROR(Base::open(state));
+ RETURN_IF_ERROR(_acquire_runtime_filter(true));
+ auto& p = _parent->cast<Parent>();
+ _output_expr_contexts.resize(p._output_expr_contexts.size());
+ for (size_t i = 0; i < p._output_expr_contexts.size(); i++) {
+ RETURN_IF_ERROR(p._output_expr_contexts[i]->clone(state,
_output_expr_contexts[i]));
+ }
+ return Status::OK();
+}
+
Status MultiCastDataStreamSourceLocalState::close(RuntimeState* state) {
if (_closed) {
return Status::OK();
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h
b/be/src/pipeline/exec/multi_cast_data_stream_source.h
index fd1f6f2c033..8d14b4f266b 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h
@@ -102,11 +102,7 @@ public:
MultiCastDataStreamSourceLocalState(RuntimeState* state, OperatorXBase*
parent);
Status init(RuntimeState* state, LocalStateInfo& info) override;
- Status open(RuntimeState* state) override {
- RETURN_IF_ERROR(Base::open(state));
- RETURN_IF_ERROR(_acquire_runtime_filter(true));
- return Status::OK();
- }
+ Status open(RuntimeState* state) override;
Status close(RuntimeState* state) override;
friend class MultiCastDataStreamerSourceOperatorX;
diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
index f074afce374..66612700fed 100644
--- a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
+++ b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
@@ -36,10 +36,6 @@ Status NestedLoopJoinBuildSinkLocalState::init(RuntimeState*
state, LocalSinkSta
SCOPED_TIMER(_open_timer);
auto& p = _parent->cast<NestedLoopJoinBuildSinkOperatorX>();
_shared_state->join_op_variants = p._join_op_variants;
- _filter_src_expr_ctxs.resize(p._filter_src_expr_ctxs.size());
- for (size_t i = 0; i < _filter_src_expr_ctxs.size(); i++) {
- RETURN_IF_ERROR(p._filter_src_expr_ctxs[i]->clone(state,
_filter_src_expr_ctxs[i]));
- }
_runtime_filters.resize(p._runtime_filter_descs.size());
for (size_t i = 0; i < p._runtime_filter_descs.size(); i++) {
RETURN_IF_ERROR(state->register_producer_runtime_filter(
@@ -48,6 +44,18 @@ Status NestedLoopJoinBuildSinkLocalState::init(RuntimeState*
state, LocalSinkSta
return Status::OK();
}
+Status NestedLoopJoinBuildSinkLocalState::open(RuntimeState* state) {
+ SCOPED_TIMER(exec_time_counter());
+ SCOPED_TIMER(_open_timer);
+ RETURN_IF_ERROR(JoinBuildSinkLocalState::open(state));
+ auto& p = _parent->cast<NestedLoopJoinBuildSinkOperatorX>();
+ _filter_src_expr_ctxs.resize(p._filter_src_expr_ctxs.size());
+ for (size_t i = 0; i < _filter_src_expr_ctxs.size(); i++) {
+ RETURN_IF_ERROR(p._filter_src_expr_ctxs[i]->clone(state,
_filter_src_expr_ctxs[i]));
+ }
+ return Status::OK();
+}
+
NestedLoopJoinBuildSinkOperatorX::NestedLoopJoinBuildSinkOperatorX(ObjectPool*
pool,
int
operator_id,
const
TPlanNode& tnode,
diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.h
b/be/src/pipeline/exec/nested_loop_join_build_operator.h
index 52f723b13ae..da42e961f47 100644
--- a/be/src/pipeline/exec/nested_loop_join_build_operator.h
+++ b/be/src/pipeline/exec/nested_loop_join_build_operator.h
@@ -56,6 +56,7 @@ public:
~NestedLoopJoinBuildSinkLocalState() = default;
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
+ Status open(RuntimeState* state) override;
vectorized::VExprContextSPtrs& filter_src_expr_ctxs() { return
_filter_src_expr_ctxs; }
RuntimeProfile::Counter* runtime_filter_compute_timer() {
diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
index 271891709b0..c7afa2c399c 100644
--- a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
@@ -52,6 +52,14 @@
NestedLoopJoinProbeLocalState::NestedLoopJoinProbeLocalState(RuntimeState* state
Status NestedLoopJoinProbeLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
RETURN_IF_ERROR(JoinProbeLocalState::init(state, info));
SCOPED_TIMER(exec_time_counter());
+ SCOPED_TIMER(_init_timer);
+ _loop_join_timer = ADD_TIMER(profile(), "LoopGenerateJoin");
+ return Status::OK();
+}
+
+Status NestedLoopJoinProbeLocalState::open(RuntimeState* state) {
+ RETURN_IF_ERROR(JoinProbeLocalState::open(state));
+ SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
auto& p = _parent->cast<NestedLoopJoinProbeOperatorX>();
_join_conjuncts.resize(p._join_conjuncts.size());
@@ -59,8 +67,6 @@ Status NestedLoopJoinProbeLocalState::init(RuntimeState*
state, LocalStateInfo&
RETURN_IF_ERROR(p._join_conjuncts[i]->clone(state,
_join_conjuncts[i]));
}
_construct_mutable_join_block();
-
- _loop_join_timer = ADD_TIMER(profile(), "LoopGenerateJoin");
return Status::OK();
}
diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.h
b/be/src/pipeline/exec/nested_loop_join_probe_operator.h
index 7a8be87d922..de9f11b437e 100644
--- a/be/src/pipeline/exec/nested_loop_join_probe_operator.h
+++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.h
@@ -66,6 +66,7 @@ public:
block->get_by_position(i).column->assume_mutable()->clear(); \
}
Status init(RuntimeState* state, LocalStateInfo& info) override;
+ Status open(RuntimeState* state) override;
Status close(RuntimeState* state) override;
template <typename JoinOpType, bool set_build_side_flag, bool
set_probe_side_flag>
Status generate_join_block_data(RuntimeState* state, JoinOpType&
join_op_variants);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]