This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 8735b184704 [improvement](spill) improve spill timers (#33156)
8735b184704 is described below
commit 8735b1847047573e1037d0df50336727b9ac192d
Author: TengJianPing <[email protected]>
AuthorDate: Tue Apr 2 22:31:46 2024 +0800
[improvement](spill) improve spill timers (#33156)
---
.../pipeline/exec/partitioned_aggregation_sink_operator.h | 3 ++-
.../exec/partitioned_aggregation_source_operator.cpp | 6 +++---
.../pipeline/exec/partitioned_hash_join_probe_operator.cpp | 14 ++++++++++----
.../pipeline/exec/partitioned_hash_join_probe_operator.h | 2 ++
.../pipeline/exec/partitioned_hash_join_sink_operator.cpp | 4 +++-
be/src/pipeline/exec/partitioned_hash_join_sink_operator.h | 1 +
be/src/pipeline/exec/spill_sort_sink_operator.cpp | 6 +++---
be/src/pipeline/exec/spill_sort_source_operator.cpp | 5 +++--
be/src/pipeline/pipeline_x/operator.h | 12 ++++++++++++
be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 4 ++--
be/src/vec/spill/spill_stream.cpp | 6 +++++-
be/src/vec/spill/spill_stream.h | 10 ++++++++--
12 files changed, 54 insertions(+), 19 deletions(-)
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
index 542046556ec..d63f272092b 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
@@ -131,7 +131,8 @@ public:
RETURN_IF_ERROR(status);
spill_stream->set_write_counters(Base::_spill_serialize_block_timer,
Base::_spill_block_count,
Base::_spill_data_size,
- Base::_spill_write_disk_timer);
+ Base::_spill_write_disk_timer,
+ Base::_spill_write_wait_io_timer);
status = to_block(context, keys, values, null_key_data);
RETURN_IF_ERROR(status);
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index c328598ac44..5db80788f41 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -227,9 +227,9 @@ Status
PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
!_shared_state->spill_partitions.empty()) {
for (auto& stream :
_shared_state->spill_partitions[0]->spill_streams_) {
-
stream->set_read_counters(Base::_spill_read_data_time,
-
Base::_spill_deserialize_time,
-
Base::_spill_read_bytes);
+ stream->set_read_counters(
+ Base::_spill_read_data_time,
Base::_spill_deserialize_time,
+ Base::_spill_read_bytes,
Base::_spill_read_wait_io_timer);
vectorized::Block block;
bool eos = false;
while (!eos) {
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 8f859820252..c23e12c3705 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -73,6 +73,10 @@ Status
PartitionedHashJoinProbeLocalState::init(RuntimeState* state, LocalStateI
"SpillAndPartition",
1);
_spill_read_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(),
"SpillReadDataSize",
TUnit::BYTES,
"SpillAndPartition", 1);
+ _spill_write_wait_io_timer = ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(),
"SpillWriteWaitIOTime",
+
"SpillAndPartition", 1);
+ _spill_read_wait_io_timer = ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(),
"SpillReadWaitIOTime",
+
"SpillAndPartition", 1);
// Build phase
_build_phase_label = ADD_LABEL_COUNTER(profile(), "BuildPhase");
@@ -175,7 +179,8 @@ Status
PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state
std::numeric_limits<size_t>::max(), _runtime_profile.get()));
RETURN_IF_ERROR(build_spilling_stream->prepare_spill());
build_spilling_stream->set_write_counters(_spill_serialize_block_timer,
_spill_block_count,
- _spill_data_size,
_spill_write_disk_timer);
+ _spill_data_size,
_spill_write_disk_timer,
+ _spill_write_wait_io_timer);
}
auto* spill_io_pool =
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(
@@ -225,7 +230,8 @@ Status
PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
_runtime_profile.get()));
RETURN_IF_ERROR(spilling_stream->prepare_spill());
spilling_stream->set_write_counters(_spill_serialize_block_timer,
_spill_block_count,
- _spill_data_size,
_spill_write_disk_timer);
+ _spill_data_size,
_spill_write_disk_timer,
+ _spill_write_wait_io_timer);
}
auto* spill_io_pool =
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(
@@ -294,7 +300,7 @@ Status
PartitionedHashJoinProbeLocalState::finish_spilling(uint32_t partition_in
build_spilling_stream->end_spill(Status::OK());
RETURN_IF_ERROR(build_spilling_stream->spill_eof());
build_spilling_stream->set_read_counters(_spill_read_data_time,
_spill_deserialize_time,
- _spill_read_bytes);
+ _spill_read_bytes,
_spill_read_wait_io_timer);
}
auto& probe_spilling_stream = _probe_spilling_streams[partition_index];
@@ -303,7 +309,7 @@ Status
PartitionedHashJoinProbeLocalState::finish_spilling(uint32_t partition_in
probe_spilling_stream->end_spill(Status::OK());
RETURN_IF_ERROR(probe_spilling_stream->spill_eof());
probe_spilling_stream->set_read_counters(_spill_read_data_time,
_spill_deserialize_time,
- _spill_read_bytes);
+ _spill_read_bytes,
_spill_read_wait_io_timer);
}
return Status::OK();
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index 7337017fde6..8270817758d 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -112,6 +112,8 @@ private:
RuntimeProfile::Counter* _spill_write_disk_timer = nullptr;
RuntimeProfile::Counter* _spill_data_size = nullptr;
RuntimeProfile::Counter* _spill_block_count = nullptr;
+ RuntimeProfile::Counter* _spill_write_wait_io_timer = nullptr;
+ RuntimeProfile::Counter* _spill_read_wait_io_timer = nullptr;
RuntimeProfile::Counter* _build_phase_label = nullptr;
RuntimeProfile::Counter* _build_rows_counter = nullptr;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index dd119ade14b..f38354d5de2 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -40,6 +40,7 @@ Status
PartitionedHashJoinSinkLocalState::init(doris::RuntimeState* state,
_spill_write_disk_timer = ADD_TIMER_WITH_LEVEL(profile(),
"SpillWriteDiskTime", 1);
_spill_data_size = ADD_COUNTER_WITH_LEVEL(profile(), "SpillWriteDataSize",
TUnit::BYTES, 1);
_spill_block_count = ADD_COUNTER_WITH_LEVEL(profile(),
"SpillWriteBlockCount", TUnit::UNIT, 1);
+ _spill_write_wait_io_timer = ADD_TIMER_WITH_LEVEL(profile(),
"SpillWriteWaitIOTime", 1);
return _partitioner->prepare(state, p._child_x->row_desc());
}
@@ -80,7 +81,8 @@ Status
PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
std::numeric_limits<size_t>::max(), _profile));
RETURN_IF_ERROR(spilling_stream->prepare_spill());
spilling_stream->set_write_counters(_spill_serialize_block_timer,
_spill_block_count,
- _spill_data_size,
_spill_write_disk_timer);
+ _spill_data_size,
_spill_write_disk_timer,
+ _spill_write_wait_io_timer);
}
auto* spill_io_pool =
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index 4d25acd1b20..e364e225f66 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -80,6 +80,7 @@ protected:
RuntimeProfile::Counter* _spill_write_disk_timer = nullptr;
RuntimeProfile::Counter* _spill_data_size = nullptr;
RuntimeProfile::Counter* _spill_block_count = nullptr;
+ RuntimeProfile::Counter* _spill_write_wait_io_timer = nullptr;
};
class PartitionedHashJoinSinkOperatorX
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index 9c3ae278ff6..662e195f3e5 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -210,9 +210,9 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState*
state) {
SpillSortSharedState::SORT_BLOCK_SPILL_BATCH_BYTES, profile());
RETURN_IF_ERROR(status);
- _spilling_stream->set_write_counters(Base::_spill_serialize_block_timer,
- Base::_spill_block_count,
Base::_spill_data_size,
- Base::_spill_write_disk_timer);
+ _spilling_stream->set_write_counters(
+ Base::_spill_serialize_block_timer, Base::_spill_block_count,
Base::_spill_data_size,
+ Base::_spill_write_disk_timer, Base::_spill_write_wait_io_timer);
status = _spilling_stream->prepare_spill();
RETURN_IF_ERROR(status);
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp
b/be/src/pipeline/exec/spill_sort_source_operator.cpp
index 6ae30a482f7..c021687e1df 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp
@@ -134,7 +134,8 @@ Status
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
bool eos = false;
tmp_stream->set_write_counters(_spill_serialize_block_timer,
_spill_block_count,
- _spill_data_size,
_spill_write_disk_timer);
+ _spill_data_size,
_spill_write_disk_timer,
+ _spill_write_wait_io_timer);
while (!eos && !state->is_cancelled()) {
merge_sorted_block.clear_column_data();
{
@@ -170,7 +171,7 @@ Status SpillSortLocalState::_create_intermediate_merger(
for (int i = 0; i < num_blocks && !_shared_state->sorted_streams.empty();
++i) {
auto stream = _shared_state->sorted_streams.front();
stream->set_read_counters(Base::_spill_read_data_time,
Base::_spill_deserialize_time,
- Base::_spill_read_bytes);
+ Base::_spill_read_bytes,
Base::_spill_read_wait_io_timer);
_current_merging_streams.emplace_back(stream);
child_block_suppliers.emplace_back(
std::bind(std::mem_fn(&vectorized::SpillStream::read_next_block_sync),
stream.get(),
diff --git a/be/src/pipeline/pipeline_x/operator.h
b/be/src/pipeline/pipeline_x/operator.h
index aa2bf7aa5e0..6b1355d0863 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -455,6 +455,10 @@ public:
ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(),
"SpillDeserializeTime", "Spill", 1);
_spill_read_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(),
"SpillReadDataSize",
TUnit::BYTES,
"Spill", 1);
+ _spill_write_wait_io_timer =
+ ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(),
"SpillWriteWaitIOTime", "Spill", 1);
+ _spill_read_wait_io_timer =
+ ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(),
"SpillReadWaitIOTime", "Spill", 1);
return Status::OK();
}
@@ -463,6 +467,8 @@ public:
RuntimeProfile::Counter* _spill_read_data_time;
RuntimeProfile::Counter* _spill_deserialize_time;
RuntimeProfile::Counter* _spill_read_bytes;
+ RuntimeProfile::Counter* _spill_write_wait_io_timer = nullptr;
+ RuntimeProfile::Counter* _spill_read_wait_io_timer = nullptr;
};
class DataSinkOperatorXBase;
@@ -770,6 +776,10 @@ public:
TUnit::BYTES, "Spill",
1);
_spill_block_count = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(),
"SpillWriteBlockCount",
TUnit::UNIT,
"Spill", 1);
+ _spill_write_wait_io_timer =
+ ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(),
"SpillWriteWaitIOTime", "Spill", 1);
+ _spill_read_wait_io_timer =
+ ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(),
"SpillReadWaitIOTime", "Spill", 1);
return Status::OK();
}
@@ -779,6 +789,8 @@ public:
RuntimeProfile::Counter* _spill_write_disk_timer = nullptr;
RuntimeProfile::Counter* _spill_data_size = nullptr;
RuntimeProfile::Counter* _spill_block_count = nullptr;
+ RuntimeProfile::Counter* _spill_write_wait_io_timer = nullptr;
+ RuntimeProfile::Counter* _spill_read_wait_io_timer = nullptr;
};
/**
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index cf63074318b..6a1ddd59f52 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -321,11 +321,12 @@ bool PipelineXTask::should_revoke_memory(RuntimeState*
state, int64_t revocable_
LOG_ONCE(INFO) << "no workload group for query " <<
print_id(state->query_id());
return false;
}
+ const auto min_revocable_mem_bytes = state->min_revocable_mem();
bool is_wg_mem_low_water_mark = false;
bool is_wg_mem_high_water_mark = false;
wg->check_mem_used(&is_wg_mem_low_water_mark, &is_wg_mem_high_water_mark);
if (is_wg_mem_high_water_mark) {
- if (revocable_mem_bytes > 0) {
+ if (revocable_mem_bytes > min_revocable_mem_bytes) {
LOG_EVERY_N(INFO, 5) << "revoke memory, hight water mark";
return true;
}
@@ -346,7 +347,6 @@ bool PipelineXTask::should_revoke_memory(RuntimeState*
state, int64_t revocable_
mem_limit_of_op = query_weighted_limit / big_memory_operator_num;
}
- const auto min_revocable_mem_bytes = state->min_revocable_mem();
LOG_EVERY_N(INFO, 5) << "revoke memory, low water mark,
revocable_mem_bytes: "
<< PrettyPrinter::print_bytes(revocable_mem_bytes)
<< ", mem_limit_of_op: " <<
PrettyPrinter::print_bytes(mem_limit_of_op)
diff --git a/be/src/vec/spill/spill_stream.cpp
b/be/src/vec/spill/spill_stream.cpp
index f245f8fa309..843a9fc9658 100644
--- a/be/src/vec/spill/spill_stream.cpp
+++ b/be/src/vec/spill/spill_stream.cpp
@@ -95,6 +95,7 @@ void SpillStream::end_spill(const Status& status) {
Status SpillStream::wait_spill() {
if (spill_promise_) {
+ SCOPED_TIMER(write_wait_io_timer_);
auto status = spill_future_.get();
spill_promise_.reset();
return status;
@@ -141,7 +142,10 @@ Status SpillStream::read_next_block_sync(Block* block,
bool* eos) {
return status;
}
- status = read_future_.get();
+ {
+ SCOPED_TIMER(read_wait_io_timer_);
+ status = read_future_.get();
+ }
read_promise_.reset();
return status;
}
diff --git a/be/src/vec/spill/spill_stream.h b/be/src/vec/spill/spill_stream.h
index 4d53b439712..afec4734d8a 100644
--- a/be/src/vec/spill/spill_stream.h
+++ b/be/src/vec/spill/spill_stream.h
@@ -64,15 +64,19 @@ public:
void set_write_counters(RuntimeProfile::Counter* serialize_timer,
RuntimeProfile::Counter* write_block_counter,
RuntimeProfile::Counter* write_bytes_counter,
- RuntimeProfile::Counter* write_timer) {
+ RuntimeProfile::Counter* write_timer,
+ RuntimeProfile::Counter* wait_io_timer) {
writer_->set_counters(serialize_timer, write_block_counter,
write_bytes_counter,
write_timer);
+ write_wait_io_timer_ = wait_io_timer;
}
void set_read_counters(RuntimeProfile::Counter* read_timer,
RuntimeProfile::Counter* deserialize_timer,
- RuntimeProfile::Counter* read_bytes) {
+ RuntimeProfile::Counter* read_bytes,
+ RuntimeProfile::Counter* wait_io_timer) {
reader_->set_counters(read_timer, deserialize_timer, read_bytes);
+ read_wait_io_timer_ = wait_io_timer;
}
private:
@@ -100,6 +104,8 @@ private:
SpillReaderUPtr reader_;
RuntimeProfile* profile_ = nullptr;
+ RuntimeProfile::Counter* write_wait_io_timer_ = nullptr;
+ RuntimeProfile::Counter* read_wait_io_timer_ = nullptr;
};
using SpillStreamSPtr = std::shared_ptr<SpillStream>;
} // namespace vectorized
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]