This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4ebc91fa148 [chore](spill) add timers for performance tuning (#33185)
4ebc91fa148 is described below
commit 4ebc91fa148f6d064513772837dad87bba33e19b
Author: Jerry Hu <[email protected]>
AuthorDate: Wed Apr 3 15:19:04 2024 +0800
[chore](spill) add timers for performance tuning (#33185)
---
.../exec/partitioned_aggregation_sink_operator.cpp | 5 +-
.../partitioned_aggregation_source_operator.cpp | 7 +-
.../exec/partitioned_hash_join_probe_operator.cpp | 100 ++++++++++++---------
.../exec/partitioned_hash_join_probe_operator.h | 24 +++--
.../exec/partitioned_hash_join_sink_operator.cpp | 53 +++++------
.../exec/partitioned_hash_join_sink_operator.h | 12 +--
be/src/pipeline/exec/spill_sort_sink_operator.cpp | 10 ++-
.../pipeline/exec/spill_sort_source_operator.cpp | 9 +-
be/src/pipeline/pipeline_x/operator.h | 6 ++
9 files changed, 137 insertions(+), 89 deletions(-)
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index 64996724e15..3dea330c117 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -252,13 +252,16 @@ Status
PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
auto execution_context = state->get_task_execution_context();
_shared_state_holder = _shared_state->shared_from_this();
+ MonotonicStopWatch submit_timer;
+ submit_timer.start();
status =
ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool()->submit_func(
- [this, &parent, state, execution_context] {
+ [this, &parent, state, execution_context, submit_timer] {
auto execution_context_lock = execution_context.lock();
if (!execution_context_lock) {
LOG(INFO) << "execution_context released, maybe query was
cancelled.";
return Status::Cancelled("Cancelled");
}
+
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
SCOPED_ATTACH_TASK(state);
SCOPED_TIMER(Base::_spill_timer);
Defer defer {[&]() {
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index 5db80788f41..5680b75c87e 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -197,9 +197,13 @@ Status
PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
auto execution_context = state->get_task_execution_context();
_shared_state_holder = _shared_state->shared_from_this();
+
+ MonotonicStopWatch submit_timer;
+ submit_timer.start();
+
RETURN_IF_ERROR(
ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool()->submit_func(
- [this, state, execution_context] {
+ [this, state, execution_context, submit_timer] {
auto execution_context_lock = execution_context.lock();
if (!execution_context_lock) {
LOG(INFO) << "execution_context released, maybe
query was cancelled.";
@@ -207,6 +211,7 @@ Status
PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
return Status::Cancelled("Cancelled");
}
+
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
SCOPED_ATTACH_TASK(state);
Defer defer {[&]() {
if (!_status.ok()) {
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index c23e12c3705..1a05b78b052 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -25,10 +25,11 @@ namespace doris::pipeline {
PartitionedHashJoinProbeLocalState::PartitionedHashJoinProbeLocalState(RuntimeState*
state,
OperatorXBase* parent)
- : JoinProbeLocalState(state, parent) {}
+ : PipelineXSpillLocalState(state, parent),
+ _child_block(vectorized::Block::create_unique()) {}
Status PartitionedHashJoinProbeLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
- RETURN_IF_ERROR(JoinProbeLocalState::init(state, info));
+ RETURN_IF_ERROR(PipelineXSpillLocalState::init(state, info));
_internal_runtime_profile.reset(new RuntimeProfile("internal_profile"));
auto& p = _parent->cast<PartitionedHashJoinProbeOperatorX>();
@@ -38,45 +39,32 @@ Status
PartitionedHashJoinProbeLocalState::init(RuntimeState* state, LocalStateI
RETURN_IF_ERROR(_partitioner->init(p._probe_exprs));
RETURN_IF_ERROR(_partitioner->prepare(state, p._child_x->row_desc()));
- _spill_and_partition_label = ADD_LABEL_COUNTER(profile(),
"SpillAndPartition");
- _partition_timer = ADD_CHILD_TIMER(profile(), "PartitionTime",
"SpillAndPartition");
- _partition_shuffle_timer =
- ADD_CHILD_TIMER(profile(), "PartitionShuffleTime",
"SpillAndPartition");
- _spill_build_rows =
- ADD_CHILD_COUNTER(profile(), "SpillBuildRows", TUnit::UNIT,
"SpillAndPartition");
- _recovery_build_rows =
- ADD_CHILD_COUNTER(profile(), "RecoveryBuildRows", TUnit::UNIT,
"SpillAndPartition");
- _spill_probe_rows =
- ADD_CHILD_COUNTER(profile(), "SpillProbeRows", TUnit::UNIT,
"SpillAndPartition");
- _recovery_probe_rows =
- ADD_CHILD_COUNTER(profile(), "RecoveryProbeRows", TUnit::UNIT,
"SpillAndPartition");
- _spill_build_blocks =
- ADD_CHILD_COUNTER(profile(), "SpillBuildBlocks", TUnit::UNIT,
"SpillAndPartition");
+ _spill_and_partition_label = ADD_LABEL_COUNTER(profile(), "Partition");
+ _partition_timer = ADD_CHILD_TIMER(profile(), "PartitionTime",
"Partition");
+ _partition_shuffle_timer = ADD_CHILD_TIMER(profile(),
"PartitionShuffleTime", "Partition");
+ _spill_build_rows = ADD_CHILD_COUNTER(profile(), "SpillBuildRows",
TUnit::UNIT, "Spill");
+ _spill_build_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile(),
"SpillBuildTime", "Spill", 1);
+ _recovery_build_rows = ADD_CHILD_COUNTER(profile(), "RecoveryBuildRows",
TUnit::UNIT, "Spill");
+ _recovery_build_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile(),
"RecoveryBuildTime", "Spill", 1);
+ _spill_probe_rows = ADD_CHILD_COUNTER(profile(), "SpillProbeRows",
TUnit::UNIT, "Spill");
+ _recovery_probe_rows = ADD_CHILD_COUNTER(profile(), "RecoveryProbeRows",
TUnit::UNIT, "Spill");
+ _spill_build_blocks = ADD_CHILD_COUNTER(profile(), "SpillBuildBlocks",
TUnit::UNIT, "Spill");
_recovery_build_blocks =
- ADD_CHILD_COUNTER(profile(), "RecoveryBuildBlocks", TUnit::UNIT,
"SpillAndPartition");
- _spill_probe_blocks =
- ADD_CHILD_COUNTER(profile(), "SpillProbeBlocks", TUnit::UNIT,
"SpillAndPartition");
+ ADD_CHILD_COUNTER(profile(), "RecoveryBuildBlocks", TUnit::UNIT,
"Spill");
+ _spill_probe_blocks = ADD_CHILD_COUNTER(profile(), "SpillProbeBlocks",
TUnit::UNIT, "Spill");
+ _spill_probe_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile(),
"SpillProbeTime", "Spill", 1);
_recovery_probe_blocks =
- ADD_CHILD_COUNTER(profile(), "RecoveryProbeBlocks", TUnit::UNIT,
"SpillAndPartition");
+ ADD_CHILD_COUNTER(profile(), "RecoveryProbeBlocks", TUnit::UNIT,
"Spill");
+ _recovery_probe_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile(),
"RecoveryProbeTime", "Spill", 1);
- _spill_serialize_block_timer = ADD_CHILD_TIMER_WITH_LEVEL(
- Base::profile(), "SpillSerializeBlockTime", "SpillAndPartition",
1);
- _spill_write_disk_timer = ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(),
"SpillWriteDiskTime",
- "SpillAndPartition",
1);
+ _spill_serialize_block_timer =
+ ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(),
"SpillSerializeBlockTime", "Spill", 1);
+ _spill_write_disk_timer =
+ ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteDiskTime",
"Spill", 1);
_spill_data_size = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(),
"SpillWriteDataSize",
- TUnit::BYTES,
"SpillAndPartition", 1);
+ TUnit::BYTES, "Spill", 1);
_spill_block_count = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(),
"SpillWriteBlockCount",
- TUnit::UNIT,
"SpillAndPartition", 1);
- _spill_read_data_time = ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(),
"SpillReadDataTime",
- "SpillAndPartition", 1);
- _spill_deserialize_time = ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(),
"SpillDeserializeTime",
- "SpillAndPartition",
1);
- _spill_read_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(),
"SpillReadDataSize",
- TUnit::BYTES,
"SpillAndPartition", 1);
- _spill_write_wait_io_timer = ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(),
"SpillWriteWaitIOTime",
-
"SpillAndPartition", 1);
- _spill_read_wait_io_timer = ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(),
"SpillReadWaitIOTime",
-
"SpillAndPartition", 1);
+ TUnit::UNIT, "Spill", 1);
// Build phase
_build_phase_label = ADD_LABEL_COUNTER(profile(), "BuildPhase");
@@ -109,6 +97,10 @@ Status
PartitionedHashJoinProbeLocalState::init(RuntimeState* state, LocalStateI
_process_other_join_conjunct_timer =
ADD_CHILD_TIMER(profile(), "OtherJoinConjunctTime", "ProbePhase");
_init_probe_side_timer = ADD_CHILD_TIMER(profile(), "InitProbeSideTime",
"ProbePhase");
+ _probe_timer = ADD_CHILD_TIMER(profile(), "ProbeTime", "ProbePhase");
+ _join_filter_timer = ADD_CHILD_TIMER(profile(), "JoinFilterTimer",
"ProbePhase");
+ _build_output_block_timer = ADD_CHILD_TIMER(profile(), "BuildOutputBlock",
"ProbePhase");
+ _probe_rows_counter = ADD_CHILD_COUNTER(profile(), "ProbeRows",
TUnit::UNIT, "ProbePhase");
return Status::OK();
}
#define UPDATE_PROFILE(counter, name) \
@@ -149,7 +141,7 @@ void
PartitionedHashJoinProbeLocalState::update_probe_profile(RuntimeProfile* ch
#undef UPDATE_PROFILE
Status PartitionedHashJoinProbeLocalState::open(RuntimeState* state) {
- RETURN_IF_ERROR(PipelineXLocalStateBase::open(state));
+ RETURN_IF_ERROR(PipelineXSpillLocalState::open(state));
return _partitioner->open(state);
}
Status PartitionedHashJoinProbeLocalState::close(RuntimeState* state) {
@@ -157,7 +149,7 @@ Status
PartitionedHashJoinProbeLocalState::close(RuntimeState* state) {
return Status::OK();
}
dec_running_big_mem_op_num(state);
- RETURN_IF_ERROR(JoinProbeLocalState::close(state));
+ RETURN_IF_ERROR(PipelineXSpillLocalState::close(state));
return Status::OK();
}
@@ -187,13 +179,17 @@ Status
PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state
build_spilling_stream->get_spill_root_dir());
auto execution_context = state->get_task_execution_context();
_shared_state_holder = _shared_state->shared_from_this();
+ MonotonicStopWatch submit_timer;
+ submit_timer.start();
return spill_io_pool->submit_func(
- [execution_context, state, &build_spilling_stream, &mutable_block,
this] {
+ [execution_context, state, &build_spilling_stream, &mutable_block,
submit_timer, this] {
auto execution_context_lock = execution_context.lock();
if (!execution_context_lock) {
LOG(INFO) << "execution_context released, maybe query was
cancelled.";
return;
}
+
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
+ SCOPED_TIMER(_spill_build_timer);
(void)state; // avoid ut compile error
SCOPED_ATTACH_TASK(state);
if (_spill_status_ok) {
@@ -248,14 +244,18 @@ Status
PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
if (!blocks.empty()) {
auto execution_context = state->get_task_execution_context();
_shared_state_holder = _shared_state->shared_from_this();
+ MonotonicStopWatch submit_timer;
+ submit_timer.start();
return spill_io_pool->submit_func(
- [execution_context, state, &blocks, spilling_stream, this] {
+ [execution_context, state, &blocks, spilling_stream,
submit_timer, this] {
auto execution_context_lock = execution_context.lock();
if (!execution_context_lock) {
LOG(INFO) << "execution_context released, maybe query
was cancelled.";
_dependency->set_ready();
return;
}
+
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
+ SCOPED_TIMER(_spill_probe_timer);
SCOPED_ATTACH_TASK(state);
COUNTER_UPDATE(_spill_probe_blocks, blocks.size());
while (!blocks.empty() && !state->is_cancelled()) {
@@ -329,12 +329,19 @@ Status
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
auto execution_context = state->get_task_execution_context();
_shared_state_holder = _shared_state->shared_from_this();
- auto read_func = [this, state, &spilled_stream, &mutable_block,
execution_context] {
+
+ MonotonicStopWatch submit_timer;
+ submit_timer.start();
+
+ auto read_func = [this, state, &spilled_stream, &mutable_block,
execution_context,
+ submit_timer] {
auto execution_context_lock = execution_context.lock();
if (!execution_context_lock) {
LOG(INFO) << "execution_context released, maybe query was
cancelled.";
return;
}
+ _spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
+ SCOPED_TIMER(_recovery_build_timer);
Defer defer([this] { --_spilling_task_count; });
(void)state; // avoid ut compile error
SCOPED_ATTACH_TASK(state);
@@ -403,12 +410,19 @@ Status
PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(Runti
/// TODO: maybe recovery more blocks each time.
auto execution_context = state->get_task_execution_context();
_shared_state_holder = _shared_state->shared_from_this();
- auto read_func = [this, execution_context, state, &spilled_stream,
&blocks] {
+
+ MonotonicStopWatch submit_timer;
+ submit_timer.start();
+
+ auto read_func = [this, execution_context, state, &spilled_stream,
&blocks, submit_timer] {
auto execution_context_lock = execution_context.lock();
if (!execution_context_lock) {
LOG(INFO) << "execution_context released, maybe query was
cancelled.";
return;
}
+
+ _spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
+ SCOPED_TIMER(_recovery_probe_timer);
Defer defer([this] { --_spilling_task_count; });
(void)state; // avoid ut compile error
SCOPED_ATTACH_TASK(state);
@@ -827,4 +841,4 @@ Status
PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori
return Status::OK();
}
-} // namespace doris::pipeline
\ No newline at end of file
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index 8270817758d..143576e1b86 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -38,16 +38,13 @@ using PartitionerType =
vectorized::XXHashPartitioner<LocalExchangeChannelIds>;
class PartitionedHashJoinProbeOperatorX;
class PartitionedHashJoinProbeLocalState final
- : public JoinProbeLocalState<PartitionedHashJoinSharedState,
- PartitionedHashJoinProbeLocalState> {
+ : public PipelineXSpillLocalState<PartitionedHashJoinSharedState> {
public:
using Parent = PartitionedHashJoinProbeOperatorX;
ENABLE_FACTORY_CREATOR(PartitionedHashJoinProbeLocalState);
PartitionedHashJoinProbeLocalState(RuntimeState* state, OperatorXBase*
parent);
~PartitionedHashJoinProbeLocalState() override = default;
- void add_tuple_is_null_column(vectorized::Block* block) override {}
-
Status init(RuntimeState* state, LocalStateInfo& info) override;
Status open(RuntimeState* state) override;
Status close(RuntimeState* state) override;
@@ -68,9 +65,15 @@ public:
friend class PartitionedHashJoinProbeOperatorX;
private:
+ template <typename LocalStateType>
+ friend class StatefulOperatorX;
+
std::shared_ptr<BasicSharedState> _in_mem_shared_state_sptr;
uint32_t _partition_cursor {0};
+ std::unique_ptr<vectorized::Block> _child_block;
+ bool _child_eos {false};
+
std::mutex _spill_lock;
Status _spill_status;
@@ -98,22 +101,21 @@ private:
RuntimeProfile::Counter* _partition_shuffle_timer = nullptr;
RuntimeProfile::Counter* _spill_build_rows = nullptr;
RuntimeProfile::Counter* _spill_build_blocks = nullptr;
+ RuntimeProfile::Counter* _spill_build_timer = nullptr;
RuntimeProfile::Counter* _recovery_build_rows = nullptr;
RuntimeProfile::Counter* _recovery_build_blocks = nullptr;
+ RuntimeProfile::Counter* _recovery_build_timer = nullptr;
RuntimeProfile::Counter* _spill_probe_rows = nullptr;
RuntimeProfile::Counter* _spill_probe_blocks = nullptr;
+ RuntimeProfile::Counter* _spill_probe_timer = nullptr;
RuntimeProfile::Counter* _recovery_probe_rows = nullptr;
RuntimeProfile::Counter* _recovery_probe_blocks = nullptr;
+ RuntimeProfile::Counter* _recovery_probe_timer = nullptr;
- RuntimeProfile::Counter* _spill_read_data_time = nullptr;
- RuntimeProfile::Counter* _spill_deserialize_time = nullptr;
- RuntimeProfile::Counter* _spill_read_bytes = nullptr;
RuntimeProfile::Counter* _spill_serialize_block_timer = nullptr;
RuntimeProfile::Counter* _spill_write_disk_timer = nullptr;
RuntimeProfile::Counter* _spill_data_size = nullptr;
RuntimeProfile::Counter* _spill_block_count = nullptr;
- RuntimeProfile::Counter* _spill_write_wait_io_timer = nullptr;
- RuntimeProfile::Counter* _spill_read_wait_io_timer = nullptr;
RuntimeProfile::Counter* _build_phase_label = nullptr;
RuntimeProfile::Counter* _build_rows_counter = nullptr;
@@ -137,6 +139,10 @@ private:
RuntimeProfile::Counter* _init_probe_side_timer = nullptr;
RuntimeProfile::Counter* _build_side_output_timer = nullptr;
RuntimeProfile::Counter* _process_other_join_conjunct_timer = nullptr;
+ RuntimeProfile::Counter* _probe_timer = nullptr;
+ RuntimeProfile::Counter* _probe_rows_counter = nullptr;
+ RuntimeProfile::Counter* _join_filter_timer = nullptr;
+ RuntimeProfile::Counter* _build_output_block_timer = nullptr;
};
class PartitionedHashJoinProbeOperatorX final
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index f38354d5de2..8b9accd30ad 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -25,7 +25,7 @@ namespace doris::pipeline {
Status PartitionedHashJoinSinkLocalState::init(doris::RuntimeState* state,
doris::pipeline::LocalSinkStateInfo& info) {
- RETURN_IF_ERROR(PipelineXSinkLocalState::init(state, info));
+ RETURN_IF_ERROR(PipelineXSpillSinkLocalState::init(state, info));
auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
_shared_state->partitioned_build_blocks.resize(p._partition_count);
_shared_state->spilled_streams.resize(p._partition_count);
@@ -33,30 +33,26 @@ Status
PartitionedHashJoinSinkLocalState::init(doris::RuntimeState* state,
_partitioner = std::make_unique<PartitionerType>(p._partition_count);
RETURN_IF_ERROR(_partitioner->init(p._build_exprs));
- _partition_timer = ADD_TIMER(profile(), "PartitionTime");
- _partition_shuffle_timer = ADD_TIMER(profile(), "PartitionShuffleTime");
-
- _spill_serialize_block_timer = ADD_TIMER_WITH_LEVEL(profile(),
"SpillSerializeBlockTime", 1);
- _spill_write_disk_timer = ADD_TIMER_WITH_LEVEL(profile(),
"SpillWriteDiskTime", 1);
- _spill_data_size = ADD_COUNTER_WITH_LEVEL(profile(), "SpillWriteDataSize",
TUnit::BYTES, 1);
- _spill_block_count = ADD_COUNTER_WITH_LEVEL(profile(),
"SpillWriteBlockCount", TUnit::UNIT, 1);
- _spill_write_wait_io_timer = ADD_TIMER_WITH_LEVEL(profile(),
"SpillWriteWaitIOTime", 1);
+ _partition_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile(), "PartitionTime",
"Spill", 1);
+ _partition_shuffle_timer =
+ ADD_CHILD_TIMER_WITH_LEVEL(profile(), "PartitionShuffleTime",
"Spill", 1);
+ _spill_build_timer = ADD_CHILD_TIMER_WITH_LEVEL(profile(),
"SpillBuildTime", "Spill", 1);
return _partitioner->prepare(state, p._child_x->row_desc());
}
Status PartitionedHashJoinSinkLocalState::open(RuntimeState* state) {
- RETURN_IF_ERROR(PipelineXSinkLocalState::open(state));
+ RETURN_IF_ERROR(PipelineXSpillSinkLocalState::open(state));
return _partitioner->open(state);
}
Status PartitionedHashJoinSinkLocalState::close(RuntimeState* state, Status
exec_status) {
- SCOPED_TIMER(PipelineXSinkLocalState::exec_time_counter());
- SCOPED_TIMER(PipelineXSinkLocalState::_close_timer);
- if (PipelineXSinkLocalState::_closed) {
+ SCOPED_TIMER(PipelineXSpillSinkLocalState::exec_time_counter());
+ SCOPED_TIMER(PipelineXSpillSinkLocalState::_close_timer);
+ if (PipelineXSpillSinkLocalState::_closed) {
return Status::OK();
}
dec_running_big_mem_op_num(state);
- return PipelineXSinkLocalState::close(state, exec_status);
+ return PipelineXSpillSinkLocalState::close(state, exec_status);
}
Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
@@ -90,16 +86,23 @@ Status
PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
DCHECK(spill_io_pool != nullptr);
auto execution_context = state->get_task_execution_context();
_shared_state_holder = _shared_state->shared_from_this();
- auto st = spill_io_pool->submit_func([this, execution_context, state,
spilling_stream, i] {
- auto execution_context_lock = execution_context.lock();
- if (!execution_context_lock) {
- LOG(INFO) << "execution_context released, maybe query was
cancelled.";
- return;
- }
- (void)state; // avoid ut compile error
- SCOPED_ATTACH_TASK(state);
- _spill_to_disk(i, spilling_stream);
- });
+
+ MonotonicStopWatch submit_timer;
+ submit_timer.start();
+
+ auto st = spill_io_pool->submit_func(
+ [this, execution_context, state, spilling_stream, i,
submit_timer] {
+ auto execution_context_lock = execution_context.lock();
+ if (!execution_context_lock) {
+ LOG(INFO) << "execution_context released, maybe query
was cancelled.";
+ return;
+ }
+
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
+ SCOPED_TIMER(_spill_build_timer);
+ (void)state; // avoid ut compile error
+ SCOPED_ATTACH_TASK(state);
+ _spill_to_disk(i, spilling_stream);
+ });
if (!st.ok()) {
--_spilling_streams_count;
@@ -274,4 +277,4 @@ Status
PartitionedHashJoinSinkOperatorX::revoke_memory(RuntimeState* state) {
return local_state.revoke_memory(state);
}
-} // namespace doris::pipeline
\ No newline at end of file
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index e364e225f66..96e751360d4 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -39,7 +39,7 @@ using PartitionerType =
vectorized::XXHashPartitioner<LocalExchangeChannelIds>;
class PartitionedHashJoinSinkOperatorX;
class PartitionedHashJoinSinkLocalState
- : public PipelineXSinkLocalState<PartitionedHashJoinSharedState> {
+ : public PipelineXSpillSinkLocalState<PartitionedHashJoinSharedState> {
public:
using Parent = PartitionedHashJoinSinkOperatorX;
ENABLE_FACTORY_CREATOR(PartitionedHashJoinSinkLocalState);
@@ -51,7 +51,7 @@ public:
protected:
PartitionedHashJoinSinkLocalState(DataSinkOperatorXBase* parent,
RuntimeState* state)
- : PipelineXSinkLocalState<PartitionedHashJoinSharedState>(parent,
state) {}
+ :
PipelineXSpillSinkLocalState<PartitionedHashJoinSharedState>(parent, state) {}
void _spill_to_disk(uint32_t partition_index,
const vectorized::SpillStreamSPtr& spilling_stream);
@@ -76,11 +76,7 @@ protected:
RuntimeProfile::Counter* _partition_timer = nullptr;
RuntimeProfile::Counter* _partition_shuffle_timer = nullptr;
- RuntimeProfile::Counter* _spill_serialize_block_timer = nullptr;
- RuntimeProfile::Counter* _spill_write_disk_timer = nullptr;
- RuntimeProfile::Counter* _spill_data_size = nullptr;
- RuntimeProfile::Counter* _spill_block_count = nullptr;
- RuntimeProfile::Counter* _spill_write_wait_io_timer = nullptr;
+ RuntimeProfile::Counter* _spill_build_timer = nullptr;
};
class PartitionedHashJoinSinkOperatorX
@@ -139,4 +135,4 @@ private:
};
} // namespace pipeline
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index 662e195f3e5..523ff2cfaaf 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -55,6 +55,9 @@ void SpillSortSinkLocalState::_init_counters() {
_spill_merge_sort_timer =
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "SpillMergeSortTime",
"Spill", 1);
+
+ _spill_wait_in_queue_timer =
+ ADD_CHILD_TIMER_WITH_LEVEL(profile(), "SpillWaitInQueueTime",
"Spill", 1);
}
#define UPDATE_PROFILE(counter, name) \
do { \
@@ -227,17 +230,22 @@ Status
SpillSortSinkLocalState::revoke_memory(RuntimeState* state) {
auto execution_context = state->get_task_execution_context();
_shared_state_holder = _shared_state->shared_from_this();
+
+ MonotonicStopWatch submit_timer;
+ submit_timer.start();
+
status =
ExecEnv::GetInstance()
->spill_stream_mgr()
->get_spill_io_thread_pool(_spilling_stream->get_spill_root_dir())
- ->submit_func([this, state, &parent, execution_context] {
+ ->submit_func([this, state, &parent, execution_context,
submit_timer] {
auto execution_context_lock = execution_context.lock();
if (!execution_context_lock) {
LOG(INFO) << "execution_context released, maybe
query was cancelled.";
return Status::OK();
}
+
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
SCOPED_ATTACH_TASK(state);
Defer defer {[&]() {
if (!_shared_state->sink_status.ok()) {
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp
b/be/src/pipeline/exec/spill_sort_source_operator.cpp
index c021687e1df..c53c057088c 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp
@@ -43,6 +43,8 @@ Status SpillSortLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
TUnit::BYTES, "Spill", 1);
_spill_block_count = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(),
"SpillWriteBlockCount",
TUnit::UNIT, "Spill", 1);
+ _spill_wait_in_queue_timer =
+ ADD_CHILD_TIMER_WITH_LEVEL(profile(), "SpillWaitInQueueTime",
"Spill", 1);
return Status::OK();
}
@@ -82,13 +84,18 @@ Status
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
auto execution_context = state->get_task_execution_context();
_shared_state_holder = _shared_state->shared_from_this();
- auto spill_func = [this, state, &parent, execution_context] {
+
+ MonotonicStopWatch submit_timer;
+ submit_timer.start();
+
+ auto spill_func = [this, state, &parent, execution_context, submit_timer] {
auto execution_context_lock = execution_context.lock();
if (!execution_context_lock) {
LOG(INFO) << "execution_context released, maybe query was
cancelled.";
return Status::OK();
}
+ _spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
SCOPED_TIMER(_spill_merge_sort_timer);
SCOPED_ATTACH_TASK(state);
Defer defer {[&]() {
diff --git a/be/src/pipeline/pipeline_x/operator.h
b/be/src/pipeline/pipeline_x/operator.h
index 6b1355d0863..20c038c7469 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -455,6 +455,8 @@ public:
ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(),
"SpillDeserializeTime", "Spill", 1);
_spill_read_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(),
"SpillReadDataSize",
TUnit::BYTES,
"Spill", 1);
+ _spill_wait_in_queue_timer =
+ ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(),
"SpillWaitInQueueTime", "Spill", 1);
_spill_write_wait_io_timer =
ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(),
"SpillWriteWaitIOTime", "Spill", 1);
_spill_read_wait_io_timer =
@@ -469,6 +471,7 @@ public:
RuntimeProfile::Counter* _spill_read_bytes;
RuntimeProfile::Counter* _spill_write_wait_io_timer = nullptr;
RuntimeProfile::Counter* _spill_read_wait_io_timer = nullptr;
+ RuntimeProfile::Counter* _spill_wait_in_queue_timer = nullptr;
};
class DataSinkOperatorXBase;
@@ -776,6 +779,8 @@ public:
TUnit::BYTES, "Spill",
1);
_spill_block_count = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(),
"SpillWriteBlockCount",
TUnit::UNIT,
"Spill", 1);
+ _spill_wait_in_queue_timer =
+ ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(),
"SpillWaitInQueueTime", "Spill", 1);
_spill_write_wait_io_timer =
ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(),
"SpillWriteWaitIOTime", "Spill", 1);
_spill_read_wait_io_timer =
@@ -789,6 +794,7 @@ public:
RuntimeProfile::Counter* _spill_write_disk_timer = nullptr;
RuntimeProfile::Counter* _spill_data_size = nullptr;
RuntimeProfile::Counter* _spill_block_count = nullptr;
+ RuntimeProfile::Counter* _spill_wait_in_queue_timer = nullptr;
RuntimeProfile::Counter* _spill_write_wait_io_timer = nullptr;
RuntimeProfile::Counter* _spill_read_wait_io_timer = nullptr;
};
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]