This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/spill_and_reserve by this push:
     new 580dee6cfe6 [refactor]refine the logic of SpillRunnable (#43446)
580dee6cfe6 is described below

commit 580dee6cfe6647a5e3e0a52fedb505a6ea4bc740
Author: Jerry Hu <[email protected]>
AuthorDate: Fri Nov 8 15:59:00 2024 +0800

    [refactor]refine the logic of SpillRunnable (#43446)
---
 be/src/pipeline/exec/operator.h                    |   4 -
 .../exec/partitioned_aggregation_sink_operator.cpp |  10 +-
 .../partitioned_aggregation_source_operator.cpp    |  10 +-
 .../exec/partitioned_hash_join_probe_operator.cpp  |  30 +---
 .../exec/partitioned_hash_join_sink_operator.cpp   |  36 ++--
 .../exec/partitioned_hash_join_sink_operator.h     |   7 +-
 be/src/pipeline/exec/spill_sort_sink_operator.cpp  |  12 +-
 .../pipeline/exec/spill_sort_source_operator.cpp   |  10 +-
 be/src/pipeline/exec/spill_utils.h                 | 181 +++++++++++++--------
 be/src/runtime/memory/mem_tracker_limiter.cpp      |   2 +-
 be/src/runtime/memory/thread_mem_tracker_mgr.cpp   |   1 +
 11 files changed, 158 insertions(+), 145 deletions(-)

diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 75abc8fe624..76a53d9741d 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -389,8 +389,6 @@ public:
         }
     }
 
-    std::atomic_int _spilling_task_count {0};
-
     // Total time of spill, including spill task scheduling time,
     // serialize block time, write disk file time,
     // and read disk file time, deserialize block time etc.
@@ -784,8 +782,6 @@ public:
         COUNTER_SET(_spill_min_rows_of_partition, min_rows);
     }
 
-    std::atomic_int _spilling_task_count {0};
-
     std::vector<int64_t> _rows_in_partitions;
 
     // Total time of spill, including spill task scheduling time,
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index cd4eba2abfe..de2f3b29d36 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -304,13 +304,9 @@ Status PartitionedAggSinkLocalState::revoke_memory(
 
     state->get_query_ctx()->increase_revoking_tasks_count();
 
-    MonotonicStopWatch submit_timer;
-    submit_timer.start();
-    _spilling_task_count = 1;
-    auto spill_runnable = std::make_shared<SpillRunnable>(
-            state, spill_context, _spilling_task_count, _profile, submit_timer,
-            _shared_state->shared_from_this(), Base::_spill_dependency, true, 
true,
-            [this, &parent, state, query_id, size_to_revoke, spill_context] {
+    auto spill_runnable = std::make_shared<SpillSinkRunnable>(
+            state, spill_context, _spill_dependency, _profile, 
_shared_state->shared_from_this(),
+            [this, &parent, state, query_id, size_to_revoke] {
                 Status status;
                 
DBUG_EXECUTE_IF("fault_inject::partitioned_agg_sink::revoke_memory_cancel", {
                     status = Status::InternalError(
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index 04042c5841a..027e726e358 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -319,13 +319,9 @@ Status 
PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b
     });
     _spill_dependency->block();
 
-    MonotonicStopWatch submit_timer;
-    submit_timer.start();
-    _spilling_task_count = 1;
     return 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit(
-            std::make_shared<SpillRunnable>(state, nullptr, 
_spilling_task_count,
-                                            _runtime_profile.get(), 
submit_timer,
-                                            _shared_state->shared_from_this(), 
_spill_dependency,
-                                            false, false, 
exception_catch_func));
+            std::make_shared<SpillRecoverRunnable>(state, _spill_dependency, 
_runtime_profile.get(),
+                                                   
_shared_state->shared_from_this(),
+                                                   exception_catch_func));
 }
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index f1ca6f8d7de..bdba90aac37 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -243,7 +243,7 @@ Status 
PartitionedHashJoinProbeLocalState::spill_probe_blocks(
         return Status::OK();
     };
 
-    auto exception_catch_func = [query_id, spill_func, spill_context]() {
+    auto exception_catch_func = [query_id, spill_func]() {
         
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::spill_probe_blocks_cancel",
 {
             auto status = Status::InternalError(
                     "fault_inject partitioned_hash_join_probe "
@@ -265,14 +265,9 @@ Status 
PartitionedHashJoinProbeLocalState::spill_probe_blocks(
                 "fault_inject partitioned_hash_join_probe spill_probe_blocks 
submit_func failed");
     });
 
-    MonotonicStopWatch submit_timer;
-    submit_timer.start();
-
-    _spilling_task_count = 1;
-    auto spill_runnable = std::make_shared<SpillRunnable>(
-            state, spill_context, _spilling_task_count, 
_runtime_profile.get(), submit_timer,
-            _shared_state->shared_from_this(), _spill_dependency, false, true,
-            exception_catch_func);
+    auto spill_runnable = std::make_shared<SpillNonSinkRunnable>(
+            state, spill_context, _spill_dependency, _runtime_profile.get(),
+            _shared_state->shared_from_this(), exception_catch_func);
     return spill_io_pool->submit(std::move(spill_runnable));
 }
 
@@ -398,13 +393,8 @@ Status 
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim
                                 "recovery_build_blocks submit_func failed");
                     });
 
-    MonotonicStopWatch submit_timer;
-    submit_timer.start();
-
-    _spilling_task_count = 1;
-    auto spill_runnable = std::make_shared<SpillRunnable>(
-            state, nullptr, _spilling_task_count, _runtime_profile.get(), 
submit_timer,
-            _shared_state->shared_from_this(), _spill_dependency, false, false,
+    auto spill_runnable = std::make_shared<SpillRecoverRunnable>(
+            state, _spill_dependency, _runtime_profile.get(), 
_shared_state->shared_from_this(),
             exception_catch_func);
     VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: " << 
_parent->node_id()
                << ", task id: " << state->task_id() << ", partition: " << 
partition_index
@@ -511,12 +501,8 @@ Status 
PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_disk(Runtim
                                 "fault_inject partitioned_hash_join_probe "
                                 "recovery_probe_blocks submit_func failed");
                     });
-    MonotonicStopWatch submit_timer;
-    submit_timer.start();
-    _spilling_task_count = 1;
-    return spill_io_pool->submit(std::make_shared<SpillRunnable>(
-            state, nullptr, _spilling_task_count, _runtime_profile.get(), 
submit_timer,
-            _shared_state->shared_from_this(), _spill_dependency, false, false,
+    return spill_io_pool->submit(std::make_shared<SpillRecoverRunnable>(
+            state, _spill_dependency, _runtime_profile.get(), 
_shared_state->shared_from_this(),
             exception_catch_func));
 }
 
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index baa99d3fe14..a8c2b495365 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -271,12 +271,9 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
         return status;
     };
 
-    MonotonicStopWatch submit_timer;
-    submit_timer.start();
-    _spilling_task_count = 1;
-    auto spill_runnable = std::make_shared<SpillRunnable>(
-            state, spill_context, _spilling_task_count, _profile, submit_timer,
-            _shared_state->shared_from_this(), _spill_dependency, true, true, 
exception_catch_func);
+    auto spill_runnable = std::make_shared<SpillSinkRunnable>(
+            state, spill_context, _spill_dependency, _profile, 
_shared_state->shared_from_this(),
+            exception_catch_func);
 
     auto* thread_pool = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
 
@@ -313,6 +310,10 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
     DCHECK(spill_io_pool != nullptr);
 
     auto spill_fin_cb = [this, state, query_id, spill_context]() {
+        if (_spilling_task_count.fetch_sub(1) != 1) {
+            return Status::OK();
+        }
+
         Status status;
         if (_child_eos) {
             VLOG_DEBUG << "query:" << print_id(this->state()->query_id()) << 
", hash join sink "
@@ -324,12 +325,14 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
                                   COUNTER_UPDATE(_in_mem_rows_counter, 
block->rows());
                               }
                           });
-            auto st = _finish_spilling();
-            if (status.ok()) {
-                status = st;
-            }
+            status = _finish_spilling();
             _dependency->set_ready_to_read();
         }
+
+        if (spill_context) {
+            spill_context->on_task_finished();
+        }
+        _spill_dependency->set_ready();
         return status;
     };
     for (size_t i = 0; i != _shared_state->partitioned_build_blocks.size(); 
++i) {
@@ -355,10 +358,9 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
         // For every stream, the task counter is increased +1
         // so that when a stream finished, it should desc -1
         state->get_query_ctx()->increase_revoking_tasks_count();
-        auto spill_runnable = std::make_shared<SpillRunnable>(
-                state, spill_context, _spilling_task_count, _profile, 
submit_timer,
-                _shared_state->shared_from_this(), _spill_dependency, true, 
true,
-                [this, query_id, spilling_stream, i, spill_context] {
+        auto spill_runnable = std::make_shared<SpillSinkRunnable>(
+                state, nullptr, nullptr, _profile, 
_shared_state->shared_from_this(),
+                [this, query_id, spilling_stream, i] {
                     DBUG_EXECUTE_IF(
                             
"fault_inject::partitioned_hash_join_sink::revoke_memory_cancel", {
                                 auto status = Status::InternalError(
@@ -371,8 +373,7 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
                     SCOPED_TIMER(_spill_build_timer);
 
                     auto status = [&]() {
-                        RETURN_IF_CATCH_EXCEPTION(
-                                return _spill_to_disk(i, spilling_stream, 
spill_context));
+                        RETURN_IF_CATCH_EXCEPTION(return _spill_to_disk(i, 
spilling_stream));
                     }();
 
                     _state->get_query_ctx()->decrease_revoking_tasks_count();
@@ -470,8 +471,7 @@ Status 
PartitionedHashJoinSinkLocalState::_partition_block(RuntimeState* state,
 }
 
 Status PartitionedHashJoinSinkLocalState::_spill_to_disk(
-        uint32_t partition_index, const vectorized::SpillStreamSPtr& 
spilling_stream,
-        const std::shared_ptr<SpillContext>& spill_context) {
+        uint32_t partition_index, const vectorized::SpillStreamSPtr& 
spilling_stream) {
     auto& partitioned_block = 
_shared_state->partitioned_build_blocks[partition_index];
 
     Status status = _shared_state->_spill_status.status();
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index 87412b53910..58b19004f33 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -19,6 +19,8 @@
 
 #include <stdint.h>
 
+#include <atomic>
+
 #include "common/status.h"
 #include "operator.h"
 #include "pipeline/exec/hashjoin_build_sink.h"
@@ -54,8 +56,7 @@ protected:
             : 
PipelineXSpillSinkLocalState<PartitionedHashJoinSharedState>(parent, state) {}
 
     Status _spill_to_disk(uint32_t partition_index,
-                          const vectorized::SpillStreamSPtr& spilling_stream,
-                          const std::shared_ptr<SpillContext>& spill_context);
+                          const vectorized::SpillStreamSPtr& spilling_stream);
 
     Status _partition_block(RuntimeState* state, vectorized::Block* in_block, 
size_t begin,
                             size_t end);
@@ -68,7 +69,7 @@ protected:
     friend class PartitionedHashJoinSinkOperatorX;
 
     std::atomic<bool> _spilling_finished {false};
-    vectorized::Block _pending_block;
+    std::atomic_int32_t _spilling_task_count {0};
 
     bool _child_eos {false};
 
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp 
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index 00a60a4c747..83c7ccfc1a3 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -281,7 +281,7 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* 
state,
         return Status::OK();
     };
 
-    auto exception_catch_func = [query_id, spill_context, spill_func]() {
+    auto exception_catch_func = [query_id, spill_func]() {
         DBUG_EXECUTE_IF("fault_inject::spill_sort_sink::revoke_memory_cancel", 
{
             auto status = Status::InternalError(
                     "fault_inject spill_sort_sink "
@@ -303,14 +303,10 @@ Status 
SpillSortSinkLocalState::revoke_memory(RuntimeState* state,
     if (status.ok()) {
         state->get_query_ctx()->increase_revoking_tasks_count();
 
-        MonotonicStopWatch submit_timer;
-        submit_timer.start();
-        _spilling_task_count = 1;
         status = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit(
-                std::make_shared<SpillRunnable>(
-                        state, spill_context, _spilling_task_count, _profile, 
submit_timer,
-                        _shared_state->shared_from_this(), _spill_dependency, 
true, true,
-                        exception_catch_func));
+                std::make_shared<SpillSinkRunnable>(state, spill_context, 
_spill_dependency,
+                                                    _profile, 
_shared_state->shared_from_this(),
+                                                    exception_catch_func));
     }
     if (!status.ok()) {
         if (!_eos) {
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp 
b/be/src/pipeline/exec/spill_sort_source_operator.cpp
index 2308c49d893..b0b5ebbcbd7 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp
@@ -173,14 +173,10 @@ Status 
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
                 "merge_sort_spill_data submit_func failed");
     });
 
-    MonotonicStopWatch submit_timer;
-    submit_timer.start();
-    _spilling_task_count = 1;
     return 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit(
-            std::make_shared<SpillRunnable>(state, nullptr, 
_spilling_task_count,
-                                            _runtime_profile.get(), 
submit_timer,
-                                            _shared_state->shared_from_this(), 
_spill_dependency,
-                                            false, false, 
exception_catch_func));
+            std::make_shared<SpillRecoverRunnable>(state, _spill_dependency, 
_runtime_profile.get(),
+                                                   
_shared_state->shared_from_this(),
+                                                   exception_catch_func));
 }
 
 Status SpillSortLocalState::_create_intermediate_merger(
diff --git a/be/src/pipeline/exec/spill_utils.h 
b/be/src/pipeline/exec/spill_utils.h
index 687779badbb..bf877382129 100644
--- a/be/src/pipeline/exec/spill_utils.h
+++ b/be/src/pipeline/exec/spill_utils.h
@@ -78,21 +78,17 @@ private:
 };
 
 class SpillRunnable : public Runnable {
-public:
+protected:
     SpillRunnable(RuntimeState* state, std::shared_ptr<SpillContext> 
spill_context,
-                  std::atomic_int& spilling_task_count, RuntimeProfile* 
profile,
-                  MonotonicStopWatch submit_timer,
-                  const std::shared_ptr<BasicSpillSharedState>& shared_state,
-                  std::shared_ptr<Dependency> spill_dependency, bool is_sink, 
bool is_write,
+                  std::shared_ptr<Dependency> spill_dependency, 
RuntimeProfile* profile,
+                  const std::shared_ptr<BasicSpillSharedState>& shared_state, 
bool is_write,
                   std::function<Status()> spill_exec_func,
                   std::function<Status()> spill_fin_cb = {})
-            : _is_sink(is_sink),
-              _is_write(is_write),
-              _state(state),
+            : _state(state),
+              _profile(profile),
               _spill_context(std::move(spill_context)),
-              _spilling_task_count(spilling_task_count),
               _spill_dependency(std::move(spill_dependency)),
-              _submit_timer(submit_timer),
+              _is_write_task(is_write),
               _task_context_holder(state->get_task_execution_context()),
               _shared_state_holder(shared_state),
               _spill_exec_func(std::move(spill_exec_func)),
@@ -100,63 +96,44 @@ public:
         _exec_timer = profile->get_counter("ExecTime");
         _spill_total_timer = profile->get_counter("SpillTotalTime");
 
-        _spill_write_timer = profile->get_counter("SpillWriteTime");
-        _spill_write_wait_in_queue_timer = 
profile->get_counter("SpillWriteTaskWaitInQueueTime");
-        _write_wait_in_queue_task_count = 
profile->get_counter("SpillWriteTaskWaitInQueueCount");
-        _writing_task_count = profile->get_counter("SpillWriteTaskCount");
-
-        _spill_revover_timer = profile->get_counter("SpillRecoverTime");
-        _spill_read_wait_in_queue_timer = 
profile->get_counter("SpillReadTaskWaitInQueueTime");
-        _read_wait_in_queue_task_count = 
profile->get_counter("SpillReadTaskWaitInQueueCount");
-        _reading_task_count = profile->get_counter("SpillReadTaskCount");
         if (is_write) {
+            _spill_write_wait_in_queue_timer =
+                    profile->get_counter("SpillWriteTaskWaitInQueueTime");
+            _write_wait_in_queue_task_count =
+                    profile->get_counter("SpillWriteTaskWaitInQueueCount");
+            _writing_task_count = profile->get_counter("SpillWriteTaskCount");
             COUNTER_UPDATE(_write_wait_in_queue_task_count, 1);
-        } else {
-            COUNTER_UPDATE(_read_wait_in_queue_task_count, 1);
         }
+
+        _submit_timer.start();
     }
 
+public:
     ~SpillRunnable() override = default;
 
     void run() override {
+        const auto submit_elapsed_time = _submit_timer.elapsed_time();
         // Should lock task context before scope task, because the _state maybe
         // destroyed when run is called.
         auto task_context_holder = _task_context_holder.lock();
         if (!task_context_holder) {
             return;
         }
+        SCOPED_ATTACH_TASK(_state);
 
-        auto submit_elapsed_time = _submit_timer.elapsed_time();
-        if (_is_write) {
-            _spill_write_wait_in_queue_timer->update(submit_elapsed_time);
-        } else {
-            _spill_read_wait_in_queue_timer->update(submit_elapsed_time);
-        }
         _exec_timer->update(submit_elapsed_time);
         _spill_total_timer->update(submit_elapsed_time);
 
         SCOPED_TIMER(_exec_timer);
         SCOPED_TIMER(_spill_total_timer);
 
-        std::shared_ptr<ScopedTimer<MonotonicStopWatch>> write_or_read_timer;
-        if (_is_write) {
-            write_or_read_timer =
-                    
std::make_shared<ScopedTimer<MonotonicStopWatch>>(_spill_write_timer);
-            COUNTER_UPDATE(_write_wait_in_queue_task_count, -1);
-            COUNTER_UPDATE(_writing_task_count, 1);
-        } else {
-            write_or_read_timer =
-                    
std::make_shared<ScopedTimer<MonotonicStopWatch>>(_spill_revover_timer);
-            COUNTER_UPDATE(_read_wait_in_queue_task_count, -1);
-            COUNTER_UPDATE(_reading_task_count, 1);
-        }
-        SCOPED_ATTACH_TASK(_state);
+        auto* spill_timer = _get_spill_timer();
+        DCHECK(spill_timer != nullptr);
+        SCOPED_TIMER(spill_timer);
+
+        _on_task_started(submit_elapsed_time);
+
         Defer defer([&] {
-            if (_is_write) {
-                COUNTER_UPDATE(_writing_task_count, -1);
-            } else {
-                COUNTER_UPDATE(_reading_task_count, -1);
-            }
             {
                 std::function<Status()> tmp;
                 std::swap(tmp, _spill_exec_func);
@@ -177,51 +154,119 @@ public:
         }
         shared_state_holder->_spill_status.update(_spill_exec_func());
 
-        auto num = _spilling_task_count.fetch_sub(1);
-        DCHECK_GE(_spilling_task_count, 0);
+        _on_task_finished();
+        if (_spill_fin_cb) {
+            shared_state_holder->_spill_status.update(_spill_fin_cb());
+        }
 
-        if (num == 1) {
-            if (_spill_fin_cb) {
-                shared_state_holder->_spill_status.update(_spill_fin_cb());
-            }
-            if (_spill_context) {
-                if (_is_sink) {
-                    _spill_context->on_task_finished();
-                } else {
-                    _spill_context->on_non_sink_task_finished();
-                }
-            }
+        if (_spill_dependency) {
             _spill_dependency->set_ready();
         }
     }
 
-private:
-    bool _is_sink;
-    bool _is_write;
+protected:
+    virtual void _on_task_finished() {
+        if (_spill_context) {
+            _spill_context->on_task_finished();
+        }
+    }
+
+    virtual RuntimeProfile::Counter* _get_spill_timer() {
+        return _profile->get_counter("SpillWriteTime");
+    }
+
+    virtual void _on_task_started(uint64_t submit_elapsed_time) {
+        if (_is_write_task) {
+            COUNTER_UPDATE(_spill_write_wait_in_queue_timer, 
submit_elapsed_time);
+            COUNTER_UPDATE(_write_wait_in_queue_task_count, -1);
+            COUNTER_UPDATE(_writing_task_count, 1);
+        }
+    }
+
     RuntimeState* _state;
+    RuntimeProfile* _profile;
     std::shared_ptr<SpillContext> _spill_context;
-    std::atomic_int& _spilling_task_count;
     std::shared_ptr<Dependency> _spill_dependency;
 
+    bool _is_write_task;
+
+private:
     MonotonicStopWatch _submit_timer;
 
     RuntimeProfile::Counter* _exec_timer = nullptr;
     RuntimeProfile::Counter* _spill_total_timer;
 
-    RuntimeProfile::Counter* _spill_write_timer;
     RuntimeProfile::Counter* _spill_write_wait_in_queue_timer = nullptr;
     RuntimeProfile::Counter* _write_wait_in_queue_task_count = nullptr;
     RuntimeProfile::Counter* _writing_task_count = nullptr;
 
-    RuntimeProfile::Counter* _spill_revover_timer;
-    RuntimeProfile::Counter* _spill_read_wait_in_queue_timer = nullptr;
-    RuntimeProfile::Counter* _read_wait_in_queue_task_count = nullptr;
-    RuntimeProfile::Counter* _reading_task_count = nullptr;
-
     std::weak_ptr<TaskExecutionContext> _task_context_holder;
     std::weak_ptr<BasicSpillSharedState> _shared_state_holder;
     std::function<Status()> _spill_exec_func;
     std::function<Status()> _spill_fin_cb;
 };
 
+class SpillSinkRunnable : public SpillRunnable {
+public:
+    SpillSinkRunnable(RuntimeState* state, std::shared_ptr<SpillContext> 
spill_context,
+                      std::shared_ptr<Dependency> spill_dependency, 
RuntimeProfile* profile,
+                      const std::shared_ptr<BasicSpillSharedState>& 
shared_state,
+                      std::function<Status()> spill_exec_func,
+                      std::function<Status()> spill_fin_cb = {})
+            : SpillRunnable(state, spill_context, spill_dependency, profile, 
shared_state, true,
+                            spill_exec_func, spill_fin_cb) {}
+};
+
+class SpillNonSinkRunnable : public SpillRunnable {
+public:
+    SpillNonSinkRunnable(RuntimeState* state, std::shared_ptr<SpillContext> 
spill_context,
+                         std::shared_ptr<Dependency> spill_dependency, 
RuntimeProfile* profile,
+                         const std::shared_ptr<BasicSpillSharedState>& 
shared_state,
+                         std::function<Status()> spill_exec_func,
+                         std::function<Status()> spill_fin_cb = {})
+            : SpillRunnable(state, spill_context, spill_dependency, profile, 
shared_state, true,
+                            spill_exec_func, spill_fin_cb) {}
+
+protected:
+    void _on_task_finished() override {
+        if (_spill_context) {
+            _spill_context->on_non_sink_task_finished();
+        }
+    }
+};
+
+class SpillRecoverRunnable : public SpillRunnable {
+public:
+    SpillRecoverRunnable(RuntimeState* state, std::shared_ptr<Dependency> 
spill_dependency,
+                         RuntimeProfile* profile,
+                         const std::shared_ptr<BasicSpillSharedState>& 
shared_state,
+                         std::function<Status()> spill_exec_func,
+                         std::function<Status()> spill_fin_cb = {})
+            : SpillRunnable(state, nullptr, spill_dependency, profile, 
shared_state, false,
+                            spill_exec_func, spill_fin_cb) {
+        _spill_revover_timer = profile->get_counter("SpillRecoverTime");
+        _spill_read_wait_in_queue_timer = 
profile->get_counter("SpillReadTaskWaitInQueueTime");
+        _read_wait_in_queue_task_count = 
profile->get_counter("SpillReadTaskWaitInQueueCount");
+        _reading_task_count = profile->get_counter("SpillReadTaskCount");
+
+        COUNTER_UPDATE(_read_wait_in_queue_task_count, 1);
+    }
+
+protected:
+    RuntimeProfile::Counter* _get_spill_timer() override {
+        return _profile->get_counter("SpillRecoverTime");
+    }
+
+    void _on_task_started(uint64_t submit_elapsed_time) override {
+        COUNTER_UPDATE(_spill_read_wait_in_queue_timer, submit_elapsed_time);
+        COUNTER_UPDATE(_read_wait_in_queue_task_count, -1);
+        COUNTER_UPDATE(_reading_task_count, 1);
+    }
+
+private:
+    RuntimeProfile::Counter* _spill_revover_timer;
+    RuntimeProfile::Counter* _spill_read_wait_in_queue_timer = nullptr;
+    RuntimeProfile::Counter* _read_wait_in_queue_task_count = nullptr;
+    RuntimeProfile::Counter* _reading_task_count = nullptr;
+};
 } // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp 
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index c0555c2fcf3..068c3427b84 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -136,7 +136,7 @@ MemTrackerLimiter::~MemTrackerLimiter() {
                    << ", mem tracker label: " << _label
                    << ", peak consumption: " << peak_consumption() << 
print_address_sanitizers();
     }
-    DCHECK(reserved_consumption() == 0);
+    DCHECK_EQ(reserved_consumption(), 0);
     memory_memtrackerlimiter_cnt << -1;
 }
 
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp 
b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
index d0365645285..3b40426f6ef 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
@@ -51,6 +51,7 @@ void ThreadMemTrackerMgr::attach_limiter_tracker(
         // _untracked_mem temporary store bytes that not synchronized to 
process reserved memory,
         // but bytes have been subtracted from thread _reserved_mem.
         
doris::GlobalMemoryArbitrator::release_process_reserved_memory(_untracked_mem);
+        _limiter_tracker->release_reserved(_untracked_mem);
         _reserved_mem = 0;
         _untracked_mem = 0;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to