This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push:
new 580dee6cfe6 [refactor]refine the logic of SpillRunnable (#43446)
580dee6cfe6 is described below
commit 580dee6cfe6647a5e3e0a52fedb505a6ea4bc740
Author: Jerry Hu <[email protected]>
AuthorDate: Fri Nov 8 15:59:00 2024 +0800
[refactor]refine the logic of SpillRunnable (#43446)
---
be/src/pipeline/exec/operator.h | 4 -
.../exec/partitioned_aggregation_sink_operator.cpp | 10 +-
.../partitioned_aggregation_source_operator.cpp | 10 +-
.../exec/partitioned_hash_join_probe_operator.cpp | 30 +---
.../exec/partitioned_hash_join_sink_operator.cpp | 36 ++--
.../exec/partitioned_hash_join_sink_operator.h | 7 +-
be/src/pipeline/exec/spill_sort_sink_operator.cpp | 12 +-
.../pipeline/exec/spill_sort_source_operator.cpp | 10 +-
be/src/pipeline/exec/spill_utils.h | 181 +++++++++++++--------
be/src/runtime/memory/mem_tracker_limiter.cpp | 2 +-
be/src/runtime/memory/thread_mem_tracker_mgr.cpp | 1 +
11 files changed, 158 insertions(+), 145 deletions(-)
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 75abc8fe624..76a53d9741d 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -389,8 +389,6 @@ public:
}
}
- std::atomic_int _spilling_task_count {0};
-
// Total time of spill, including spill task scheduling time,
// serialize block time, write disk file time,
// and read disk file time, deserialize block time etc.
@@ -784,8 +782,6 @@ public:
COUNTER_SET(_spill_min_rows_of_partition, min_rows);
}
- std::atomic_int _spilling_task_count {0};
-
std::vector<int64_t> _rows_in_partitions;
// Total time of spill, including spill task scheduling time,
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index cd4eba2abfe..de2f3b29d36 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -304,13 +304,9 @@ Status PartitionedAggSinkLocalState::revoke_memory(
state->get_query_ctx()->increase_revoking_tasks_count();
- MonotonicStopWatch submit_timer;
- submit_timer.start();
- _spilling_task_count = 1;
- auto spill_runnable = std::make_shared<SpillRunnable>(
- state, spill_context, _spilling_task_count, _profile, submit_timer,
- _shared_state->shared_from_this(), Base::_spill_dependency, true,
true,
- [this, &parent, state, query_id, size_to_revoke, spill_context] {
+ auto spill_runnable = std::make_shared<SpillSinkRunnable>(
+ state, spill_context, _spill_dependency, _profile,
_shared_state->shared_from_this(),
+ [this, &parent, state, query_id, size_to_revoke] {
Status status;
DBUG_EXECUTE_IF("fault_inject::partitioned_agg_sink::revoke_memory_cancel", {
status = Status::InternalError(
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index 04042c5841a..027e726e358 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -319,13 +319,9 @@ Status
PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b
});
_spill_dependency->block();
- MonotonicStopWatch submit_timer;
- submit_timer.start();
- _spilling_task_count = 1;
return
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit(
- std::make_shared<SpillRunnable>(state, nullptr,
_spilling_task_count,
- _runtime_profile.get(),
submit_timer,
- _shared_state->shared_from_this(),
_spill_dependency,
- false, false,
exception_catch_func));
+ std::make_shared<SpillRecoverRunnable>(state, _spill_dependency,
_runtime_profile.get(),
+
_shared_state->shared_from_this(),
+ exception_catch_func));
}
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index f1ca6f8d7de..bdba90aac37 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -243,7 +243,7 @@ Status
PartitionedHashJoinProbeLocalState::spill_probe_blocks(
return Status::OK();
};
- auto exception_catch_func = [query_id, spill_func, spill_context]() {
+ auto exception_catch_func = [query_id, spill_func]() {
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::spill_probe_blocks_cancel",
{
auto status = Status::InternalError(
"fault_inject partitioned_hash_join_probe "
@@ -265,14 +265,9 @@ Status
PartitionedHashJoinProbeLocalState::spill_probe_blocks(
"fault_inject partitioned_hash_join_probe spill_probe_blocks
submit_func failed");
});
- MonotonicStopWatch submit_timer;
- submit_timer.start();
-
- _spilling_task_count = 1;
- auto spill_runnable = std::make_shared<SpillRunnable>(
- state, spill_context, _spilling_task_count,
_runtime_profile.get(), submit_timer,
- _shared_state->shared_from_this(), _spill_dependency, false, true,
- exception_catch_func);
+ auto spill_runnable = std::make_shared<SpillNonSinkRunnable>(
+ state, spill_context, _spill_dependency, _runtime_profile.get(),
+ _shared_state->shared_from_this(), exception_catch_func);
return spill_io_pool->submit(std::move(spill_runnable));
}
@@ -398,13 +393,8 @@ Status
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim
"recovery_build_blocks submit_func failed");
});
- MonotonicStopWatch submit_timer;
- submit_timer.start();
-
- _spilling_task_count = 1;
- auto spill_runnable = std::make_shared<SpillRunnable>(
- state, nullptr, _spilling_task_count, _runtime_profile.get(),
submit_timer,
- _shared_state->shared_from_this(), _spill_dependency, false, false,
+ auto spill_runnable = std::make_shared<SpillRecoverRunnable>(
+ state, _spill_dependency, _runtime_profile.get(),
_shared_state->shared_from_this(),
exception_catch_func);
VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: " <<
_parent->node_id()
<< ", task id: " << state->task_id() << ", partition: " <<
partition_index
@@ -511,12 +501,8 @@ Status
PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_disk(Runtim
"fault_inject partitioned_hash_join_probe "
"recovery_probe_blocks submit_func failed");
});
- MonotonicStopWatch submit_timer;
- submit_timer.start();
- _spilling_task_count = 1;
- return spill_io_pool->submit(std::make_shared<SpillRunnable>(
- state, nullptr, _spilling_task_count, _runtime_profile.get(),
submit_timer,
- _shared_state->shared_from_this(), _spill_dependency, false, false,
+ return spill_io_pool->submit(std::make_shared<SpillRecoverRunnable>(
+ state, _spill_dependency, _runtime_profile.get(),
_shared_state->shared_from_this(),
exception_catch_func));
}
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index baa99d3fe14..a8c2b495365 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -271,12 +271,9 @@ Status
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
return status;
};
- MonotonicStopWatch submit_timer;
- submit_timer.start();
- _spilling_task_count = 1;
- auto spill_runnable = std::make_shared<SpillRunnable>(
- state, spill_context, _spilling_task_count, _profile, submit_timer,
- _shared_state->shared_from_this(), _spill_dependency, true, true,
exception_catch_func);
+ auto spill_runnable = std::make_shared<SpillSinkRunnable>(
+ state, spill_context, _spill_dependency, _profile,
_shared_state->shared_from_this(),
+ exception_catch_func);
auto* thread_pool =
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
@@ -313,6 +310,10 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
DCHECK(spill_io_pool != nullptr);
auto spill_fin_cb = [this, state, query_id, spill_context]() {
+ if (_spilling_task_count.fetch_sub(1) != 1) {
+ return Status::OK();
+ }
+
Status status;
if (_child_eos) {
VLOG_DEBUG << "query:" << print_id(this->state()->query_id()) <<
", hash join sink "
@@ -324,12 +325,14 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
COUNTER_UPDATE(_in_mem_rows_counter,
block->rows());
}
});
- auto st = _finish_spilling();
- if (status.ok()) {
- status = st;
- }
+ status = _finish_spilling();
_dependency->set_ready_to_read();
}
+
+ if (spill_context) {
+ spill_context->on_task_finished();
+ }
+ _spill_dependency->set_ready();
return status;
};
for (size_t i = 0; i != _shared_state->partitioned_build_blocks.size();
++i) {
@@ -355,10 +358,9 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
// For every stream, the task counter is increased +1
// so that when a stream finished, it should desc -1
state->get_query_ctx()->increase_revoking_tasks_count();
- auto spill_runnable = std::make_shared<SpillRunnable>(
- state, spill_context, _spilling_task_count, _profile,
submit_timer,
- _shared_state->shared_from_this(), _spill_dependency, true,
true,
- [this, query_id, spilling_stream, i, spill_context] {
+ auto spill_runnable = std::make_shared<SpillSinkRunnable>(
+ state, nullptr, nullptr, _profile,
_shared_state->shared_from_this(),
+ [this, query_id, spilling_stream, i] {
DBUG_EXECUTE_IF(
"fault_inject::partitioned_hash_join_sink::revoke_memory_cancel", {
auto status = Status::InternalError(
@@ -371,8 +373,7 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
SCOPED_TIMER(_spill_build_timer);
auto status = [&]() {
- RETURN_IF_CATCH_EXCEPTION(
- return _spill_to_disk(i, spilling_stream,
spill_context));
+ RETURN_IF_CATCH_EXCEPTION(return _spill_to_disk(i,
spilling_stream));
}();
_state->get_query_ctx()->decrease_revoking_tasks_count();
@@ -470,8 +471,7 @@ Status
PartitionedHashJoinSinkLocalState::_partition_block(RuntimeState* state,
}
Status PartitionedHashJoinSinkLocalState::_spill_to_disk(
- uint32_t partition_index, const vectorized::SpillStreamSPtr&
spilling_stream,
- const std::shared_ptr<SpillContext>& spill_context) {
+ uint32_t partition_index, const vectorized::SpillStreamSPtr&
spilling_stream) {
auto& partitioned_block =
_shared_state->partitioned_build_blocks[partition_index];
Status status = _shared_state->_spill_status.status();
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index 87412b53910..58b19004f33 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -19,6 +19,8 @@
#include <stdint.h>
+#include <atomic>
+
#include "common/status.h"
#include "operator.h"
#include "pipeline/exec/hashjoin_build_sink.h"
@@ -54,8 +56,7 @@ protected:
:
PipelineXSpillSinkLocalState<PartitionedHashJoinSharedState>(parent, state) {}
Status _spill_to_disk(uint32_t partition_index,
- const vectorized::SpillStreamSPtr& spilling_stream,
- const std::shared_ptr<SpillContext>& spill_context);
+ const vectorized::SpillStreamSPtr& spilling_stream);
Status _partition_block(RuntimeState* state, vectorized::Block* in_block,
size_t begin,
size_t end);
@@ -68,7 +69,7 @@ protected:
friend class PartitionedHashJoinSinkOperatorX;
std::atomic<bool> _spilling_finished {false};
- vectorized::Block _pending_block;
+ std::atomic_int32_t _spilling_task_count {0};
bool _child_eos {false};
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index 00a60a4c747..83c7ccfc1a3 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -281,7 +281,7 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState*
state,
return Status::OK();
};
- auto exception_catch_func = [query_id, spill_context, spill_func]() {
+ auto exception_catch_func = [query_id, spill_func]() {
DBUG_EXECUTE_IF("fault_inject::spill_sort_sink::revoke_memory_cancel",
{
auto status = Status::InternalError(
"fault_inject spill_sort_sink "
@@ -303,14 +303,10 @@ Status
SpillSortSinkLocalState::revoke_memory(RuntimeState* state,
if (status.ok()) {
state->get_query_ctx()->increase_revoking_tasks_count();
- MonotonicStopWatch submit_timer;
- submit_timer.start();
- _spilling_task_count = 1;
status =
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit(
- std::make_shared<SpillRunnable>(
- state, spill_context, _spilling_task_count, _profile,
submit_timer,
- _shared_state->shared_from_this(), _spill_dependency,
true, true,
- exception_catch_func));
+ std::make_shared<SpillSinkRunnable>(state, spill_context,
_spill_dependency,
+ _profile,
_shared_state->shared_from_this(),
+ exception_catch_func));
}
if (!status.ok()) {
if (!_eos) {
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp
b/be/src/pipeline/exec/spill_sort_source_operator.cpp
index 2308c49d893..b0b5ebbcbd7 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp
@@ -173,14 +173,10 @@ Status
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
"merge_sort_spill_data submit_func failed");
});
- MonotonicStopWatch submit_timer;
- submit_timer.start();
- _spilling_task_count = 1;
return
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit(
- std::make_shared<SpillRunnable>(state, nullptr,
_spilling_task_count,
- _runtime_profile.get(),
submit_timer,
- _shared_state->shared_from_this(),
_spill_dependency,
- false, false,
exception_catch_func));
+ std::make_shared<SpillRecoverRunnable>(state, _spill_dependency,
_runtime_profile.get(),
+
_shared_state->shared_from_this(),
+ exception_catch_func));
}
Status SpillSortLocalState::_create_intermediate_merger(
diff --git a/be/src/pipeline/exec/spill_utils.h
b/be/src/pipeline/exec/spill_utils.h
index 687779badbb..bf877382129 100644
--- a/be/src/pipeline/exec/spill_utils.h
+++ b/be/src/pipeline/exec/spill_utils.h
@@ -78,21 +78,17 @@ private:
};
class SpillRunnable : public Runnable {
-public:
+protected:
SpillRunnable(RuntimeState* state, std::shared_ptr<SpillContext>
spill_context,
- std::atomic_int& spilling_task_count, RuntimeProfile*
profile,
- MonotonicStopWatch submit_timer,
- const std::shared_ptr<BasicSpillSharedState>& shared_state,
- std::shared_ptr<Dependency> spill_dependency, bool is_sink,
bool is_write,
+ std::shared_ptr<Dependency> spill_dependency,
RuntimeProfile* profile,
+ const std::shared_ptr<BasicSpillSharedState>& shared_state,
bool is_write,
std::function<Status()> spill_exec_func,
std::function<Status()> spill_fin_cb = {})
- : _is_sink(is_sink),
- _is_write(is_write),
- _state(state),
+ : _state(state),
+ _profile(profile),
_spill_context(std::move(spill_context)),
- _spilling_task_count(spilling_task_count),
_spill_dependency(std::move(spill_dependency)),
- _submit_timer(submit_timer),
+ _is_write_task(is_write),
_task_context_holder(state->get_task_execution_context()),
_shared_state_holder(shared_state),
_spill_exec_func(std::move(spill_exec_func)),
@@ -100,63 +96,44 @@ public:
_exec_timer = profile->get_counter("ExecTime");
_spill_total_timer = profile->get_counter("SpillTotalTime");
- _spill_write_timer = profile->get_counter("SpillWriteTime");
- _spill_write_wait_in_queue_timer =
profile->get_counter("SpillWriteTaskWaitInQueueTime");
- _write_wait_in_queue_task_count =
profile->get_counter("SpillWriteTaskWaitInQueueCount");
- _writing_task_count = profile->get_counter("SpillWriteTaskCount");
-
- _spill_revover_timer = profile->get_counter("SpillRecoverTime");
- _spill_read_wait_in_queue_timer =
profile->get_counter("SpillReadTaskWaitInQueueTime");
- _read_wait_in_queue_task_count =
profile->get_counter("SpillReadTaskWaitInQueueCount");
- _reading_task_count = profile->get_counter("SpillReadTaskCount");
if (is_write) {
+ _spill_write_wait_in_queue_timer =
+ profile->get_counter("SpillWriteTaskWaitInQueueTime");
+ _write_wait_in_queue_task_count =
+ profile->get_counter("SpillWriteTaskWaitInQueueCount");
+ _writing_task_count = profile->get_counter("SpillWriteTaskCount");
COUNTER_UPDATE(_write_wait_in_queue_task_count, 1);
- } else {
- COUNTER_UPDATE(_read_wait_in_queue_task_count, 1);
}
+
+ _submit_timer.start();
}
+public:
~SpillRunnable() override = default;
void run() override {
+ const auto submit_elapsed_time = _submit_timer.elapsed_time();
// Should lock task context before scope task, because the _state maybe
// destroyed when run is called.
auto task_context_holder = _task_context_holder.lock();
if (!task_context_holder) {
return;
}
+ SCOPED_ATTACH_TASK(_state);
- auto submit_elapsed_time = _submit_timer.elapsed_time();
- if (_is_write) {
- _spill_write_wait_in_queue_timer->update(submit_elapsed_time);
- } else {
- _spill_read_wait_in_queue_timer->update(submit_elapsed_time);
- }
_exec_timer->update(submit_elapsed_time);
_spill_total_timer->update(submit_elapsed_time);
SCOPED_TIMER(_exec_timer);
SCOPED_TIMER(_spill_total_timer);
- std::shared_ptr<ScopedTimer<MonotonicStopWatch>> write_or_read_timer;
- if (_is_write) {
- write_or_read_timer =
-
std::make_shared<ScopedTimer<MonotonicStopWatch>>(_spill_write_timer);
- COUNTER_UPDATE(_write_wait_in_queue_task_count, -1);
- COUNTER_UPDATE(_writing_task_count, 1);
- } else {
- write_or_read_timer =
-
std::make_shared<ScopedTimer<MonotonicStopWatch>>(_spill_revover_timer);
- COUNTER_UPDATE(_read_wait_in_queue_task_count, -1);
- COUNTER_UPDATE(_reading_task_count, 1);
- }
- SCOPED_ATTACH_TASK(_state);
+ auto* spill_timer = _get_spill_timer();
+ DCHECK(spill_timer != nullptr);
+ SCOPED_TIMER(spill_timer);
+
+ _on_task_started(submit_elapsed_time);
+
Defer defer([&] {
- if (_is_write) {
- COUNTER_UPDATE(_writing_task_count, -1);
- } else {
- COUNTER_UPDATE(_reading_task_count, -1);
- }
{
std::function<Status()> tmp;
std::swap(tmp, _spill_exec_func);
@@ -177,51 +154,119 @@ public:
}
shared_state_holder->_spill_status.update(_spill_exec_func());
- auto num = _spilling_task_count.fetch_sub(1);
- DCHECK_GE(_spilling_task_count, 0);
+ _on_task_finished();
+ if (_spill_fin_cb) {
+ shared_state_holder->_spill_status.update(_spill_fin_cb());
+ }
- if (num == 1) {
- if (_spill_fin_cb) {
- shared_state_holder->_spill_status.update(_spill_fin_cb());
- }
- if (_spill_context) {
- if (_is_sink) {
- _spill_context->on_task_finished();
- } else {
- _spill_context->on_non_sink_task_finished();
- }
- }
+ if (_spill_dependency) {
_spill_dependency->set_ready();
}
}
-private:
- bool _is_sink;
- bool _is_write;
+protected:
+ virtual void _on_task_finished() {
+ if (_spill_context) {
+ _spill_context->on_task_finished();
+ }
+ }
+
+ virtual RuntimeProfile::Counter* _get_spill_timer() {
+ return _profile->get_counter("SpillWriteTime");
+ }
+
+ virtual void _on_task_started(uint64_t submit_elapsed_time) {
+ if (_is_write_task) {
+ COUNTER_UPDATE(_spill_write_wait_in_queue_timer,
submit_elapsed_time);
+ COUNTER_UPDATE(_write_wait_in_queue_task_count, -1);
+ COUNTER_UPDATE(_writing_task_count, 1);
+ }
+ }
+
RuntimeState* _state;
+ RuntimeProfile* _profile;
std::shared_ptr<SpillContext> _spill_context;
- std::atomic_int& _spilling_task_count;
std::shared_ptr<Dependency> _spill_dependency;
+ bool _is_write_task;
+
+private:
MonotonicStopWatch _submit_timer;
RuntimeProfile::Counter* _exec_timer = nullptr;
RuntimeProfile::Counter* _spill_total_timer;
- RuntimeProfile::Counter* _spill_write_timer;
RuntimeProfile::Counter* _spill_write_wait_in_queue_timer = nullptr;
RuntimeProfile::Counter* _write_wait_in_queue_task_count = nullptr;
RuntimeProfile::Counter* _writing_task_count = nullptr;
- RuntimeProfile::Counter* _spill_revover_timer;
- RuntimeProfile::Counter* _spill_read_wait_in_queue_timer = nullptr;
- RuntimeProfile::Counter* _read_wait_in_queue_task_count = nullptr;
- RuntimeProfile::Counter* _reading_task_count = nullptr;
-
std::weak_ptr<TaskExecutionContext> _task_context_holder;
std::weak_ptr<BasicSpillSharedState> _shared_state_holder;
std::function<Status()> _spill_exec_func;
std::function<Status()> _spill_fin_cb;
};
+class SpillSinkRunnable : public SpillRunnable {
+public:
+ SpillSinkRunnable(RuntimeState* state, std::shared_ptr<SpillContext>
spill_context,
+ std::shared_ptr<Dependency> spill_dependency,
RuntimeProfile* profile,
+ const std::shared_ptr<BasicSpillSharedState>&
shared_state,
+ std::function<Status()> spill_exec_func,
+ std::function<Status()> spill_fin_cb = {})
+ : SpillRunnable(state, spill_context, spill_dependency, profile,
shared_state, true,
+ spill_exec_func, spill_fin_cb) {}
+};
+
+class SpillNonSinkRunnable : public SpillRunnable {
+public:
+ SpillNonSinkRunnable(RuntimeState* state, std::shared_ptr<SpillContext>
spill_context,
+ std::shared_ptr<Dependency> spill_dependency,
RuntimeProfile* profile,
+ const std::shared_ptr<BasicSpillSharedState>&
shared_state,
+ std::function<Status()> spill_exec_func,
+ std::function<Status()> spill_fin_cb = {})
+ : SpillRunnable(state, spill_context, spill_dependency, profile,
shared_state, true,
+ spill_exec_func, spill_fin_cb) {}
+
+protected:
+ void _on_task_finished() override {
+ if (_spill_context) {
+ _spill_context->on_non_sink_task_finished();
+ }
+ }
+};
+
+class SpillRecoverRunnable : public SpillRunnable {
+public:
+ SpillRecoverRunnable(RuntimeState* state, std::shared_ptr<Dependency>
spill_dependency,
+ RuntimeProfile* profile,
+ const std::shared_ptr<BasicSpillSharedState>&
shared_state,
+ std::function<Status()> spill_exec_func,
+ std::function<Status()> spill_fin_cb = {})
+ : SpillRunnable(state, nullptr, spill_dependency, profile,
shared_state, false,
+ spill_exec_func, spill_fin_cb) {
+ _spill_revover_timer = profile->get_counter("SpillRecoverTime");
+ _spill_read_wait_in_queue_timer =
profile->get_counter("SpillReadTaskWaitInQueueTime");
+ _read_wait_in_queue_task_count =
profile->get_counter("SpillReadTaskWaitInQueueCount");
+ _reading_task_count = profile->get_counter("SpillReadTaskCount");
+
+ COUNTER_UPDATE(_read_wait_in_queue_task_count, 1);
+ }
+
+protected:
+ RuntimeProfile::Counter* _get_spill_timer() override {
+ return _profile->get_counter("SpillRecoverTime");
+ }
+
+ void _on_task_started(uint64_t submit_elapsed_time) override {
+ COUNTER_UPDATE(_spill_read_wait_in_queue_timer, submit_elapsed_time);
+ COUNTER_UPDATE(_read_wait_in_queue_task_count, -1);
+ COUNTER_UPDATE(_reading_task_count, 1);
+ }
+
+private:
+ RuntimeProfile::Counter* _spill_revover_timer;
+ RuntimeProfile::Counter* _spill_read_wait_in_queue_timer = nullptr;
+ RuntimeProfile::Counter* _read_wait_in_queue_task_count = nullptr;
+ RuntimeProfile::Counter* _reading_task_count = nullptr;
+};
} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index c0555c2fcf3..068c3427b84 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -136,7 +136,7 @@ MemTrackerLimiter::~MemTrackerLimiter() {
<< ", mem tracker label: " << _label
<< ", peak consumption: " << peak_consumption() <<
print_address_sanitizers();
}
- DCHECK(reserved_consumption() == 0);
+ DCHECK_EQ(reserved_consumption(), 0);
memory_memtrackerlimiter_cnt << -1;
}
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
index d0365645285..3b40426f6ef 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
@@ -51,6 +51,7 @@ void ThreadMemTrackerMgr::attach_limiter_tracker(
// _untracked_mem temporary store bytes that not synchronized to
process reserved memory,
// but bytes have been subtracted from thread _reserved_mem.
doris::GlobalMemoryArbitrator::release_process_reserved_memory(_untracked_mem);
+ _limiter_tracker->release_reserved(_untracked_mem);
_reserved_mem = 0;
_untracked_mem = 0;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]