This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7e6644b1eea [pipelineX](profile) Improve exchange sink profile (#26117)
7e6644b1eea is described below
commit 7e6644b1eea088b62ef59cf5b87cadd354b3e428
Author: Gabriel <[email protected]>
AuthorDate: Tue Oct 31 14:10:42 2023 +0800
[pipelineX](profile) Improve exchange sink profile (#26117)
---
be/src/pipeline/exec/exchange_source_operator.cpp | 2 +-
be/src/pipeline/exec/exchange_source_operator.h | 17 +++--------------
be/src/pipeline/exec/hashjoin_build_sink.h | 1 -
be/src/pipeline/exec/repeat_operator.cpp | 1 -
be/src/pipeline/exec/repeat_operator.h | 1 +
be/src/pipeline/pipeline_x/operator.cpp | 8 ++++++--
6 files changed, 11 insertions(+), 19 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp
b/be/src/pipeline/exec/exchange_source_operator.cpp
index 362853fa18e..86d7c3728d9 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -64,7 +64,7 @@ Status ExchangeLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
static const std::string timer_name =
"WaitForDependency[" + source_dependency->name() + "]Time";
_wait_for_dependency_timer = ADD_TIMER(_runtime_profile, timer_name);
- metrics[i] = ADD_CHILD_TIMER(_runtime_profile, "WaitForData",
timer_name);
+ metrics[i] = ADD_CHILD_TIMER(_runtime_profile,
fmt::format("WaitForData{}", i), timer_name);
}
RETURN_IF_ERROR(_parent->cast<ExchangeSourceOperatorX>()._vsort_exec_exprs.clone(
state, vsort_exec_exprs));
diff --git a/be/src/pipeline/exec/exchange_source_operator.h
b/be/src/pipeline/exec/exchange_source_operator.h
index 12c6c38e4bc..c41268f8eac 100644
--- a/be/src/pipeline/exec/exchange_source_operator.h
+++ b/be/src/pipeline/exec/exchange_source_operator.h
@@ -54,16 +54,8 @@ struct ExchangeDataDependency final : public Dependency {
public:
ENABLE_FACTORY_CREATOR(ExchangeDataDependency);
ExchangeDataDependency(int id, vectorized::VDataStreamRecvr::SenderQueue*
sender_queue)
- : Dependency(id, "DataDependency"), _sender_queue(sender_queue),
_always_done(false) {}
+ : Dependency(id, "DataDependency"), _always_done(false) {}
void* shared_state() override { return nullptr; }
- [[nodiscard]] Dependency* read_blocked_by() override {
- if (config::enable_fuzzy_mode && _sender_queue->should_wait() &&
- _read_dependency_watcher.elapsed_time() >
SLOW_DEPENDENCY_THRESHOLD) {
- LOG(WARNING) << "========Dependency may be blocked by some
reasons: " << name() << " "
- << id();
- }
- return _sender_queue->should_wait() ? this : nullptr;
- }
void set_always_done() {
_always_done = true;
@@ -74,17 +66,14 @@ public:
_ready_for_read = true;
}
- void set_ready_for_read() override {
- if (_always_done || !_ready_for_read) {
+ void block_reading() override {
+ if (_always_done) {
return;
}
_ready_for_read = false;
- // ScannerContext is set done outside this function now and only stop
watcher here.
- _read_dependency_watcher.start();
}
private:
- vectorized::VDataStreamRecvr::SenderQueue* _sender_queue;
std::atomic<bool> _always_done;
};
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h
b/be/src/pipeline/exec/hashjoin_build_sink.h
index 8ba6e2fba3b..10056a30e72 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -68,7 +68,6 @@ public:
Status process_build_block(RuntimeState* state, vectorized::Block& block,
uint8_t offset);
void init_short_circuit_for_probe();
- HashJoinBuildSinkOperatorX* join_build() { return
(HashJoinBuildSinkOperatorX*)_parent; }
bool build_unique() const;
std::vector<TRuntimeFilterDesc>& runtime_filter_descs() const;
diff --git a/be/src/pipeline/exec/repeat_operator.cpp
b/be/src/pipeline/exec/repeat_operator.cpp
index c9e0a38ec4c..ce00746380d 100644
--- a/be/src/pipeline/exec/repeat_operator.cpp
+++ b/be/src/pipeline/exec/repeat_operator.cpp
@@ -211,7 +211,6 @@ Status RepeatOperatorX::push(RuntimeState* state,
vectorized::Block* input_block
Status RepeatOperatorX::pull(doris::RuntimeState* state, vectorized::Block*
output_block,
SourceState& source_state) const {
auto& local_state = get_local_state(state);
- SCOPED_TIMER(local_state.profile()->total_time_counter());
auto& _repeat_id_idx = local_state._repeat_id_idx;
auto& _child_block = *local_state._child_block;
auto& _child_eos = local_state._child_eos;
diff --git a/be/src/pipeline/exec/repeat_operator.h
b/be/src/pipeline/exec/repeat_operator.h
index f6c52a0be8d..18d373b77df 100644
--- a/be/src/pipeline/exec/repeat_operator.h
+++ b/be/src/pipeline/exec/repeat_operator.h
@@ -70,6 +70,7 @@ private:
std::unique_ptr<vectorized::Block> _intermediate_block {};
vectorized::VExprContextSPtrs _expr_ctxs;
};
+
class RepeatOperatorX final : public StatefulOperatorX<RepeatLocalState> {
public:
using Base = StatefulOperatorX<RepeatLocalState>;
diff --git a/be/src/pipeline/pipeline_x/operator.cpp
b/be/src/pipeline/pipeline_x/operator.cpp
index c29e1b8af87..cc22c96debd 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -447,11 +447,15 @@ Status
StatefulOperatorX<LocalStateType>::get_block(RuntimeState* state, vectori
local_state._child_source_state != SourceState::FINISHED) {
return Status::OK();
}
- RETURN_IF_ERROR(
- push(state, local_state._child_block.get(),
local_state._child_source_state));
+ {
+ SCOPED_TIMER(local_state.profile()->total_time_counter());
+ RETURN_IF_ERROR(
+ push(state, local_state._child_block.get(),
local_state._child_source_state));
+ }
}
if (!need_more_input_data(state)) {
+ SCOPED_TIMER(local_state.profile()->total_time_counter());
SourceState new_state = SourceState::DEPEND_ON_SOURCE;
RETURN_IF_ERROR(pull(state, block, new_state));
if (new_state == SourceState::FINISHED) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]