This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 74560490956 branch-4.0: [bugfix](memory) should count memory when 
cancel query is called #58252 (#58256)
74560490956 is described below

commit 74560490956077693e657bd292f6a92bf03a4115
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Nov 24 13:12:28 2025 +0800

    branch-4.0: [bugfix](memory) should count memory when cancel query is 
called #58252 (#58256)
    
    Cherry-picked from #58252
    
    Co-authored-by: yiguolei <[email protected]>
---
 .../exec/partitioned_aggregation_sink_operator.cpp   |  2 +-
 .../exec/partitioned_aggregation_source_operator.cpp |  2 +-
 .../exec/partitioned_hash_join_probe_operator.cpp    | 13 +++++++------
 .../exec/partitioned_hash_join_sink_operator.cpp     |  4 ++--
 be/src/pipeline/exec/spill_sort_sink_operator.cpp    |  4 ++--
 be/src/runtime/fragment_mgr.cpp                      |  1 +
 be/src/runtime/thread_context.cpp                    | 15 ---------------
 be/src/runtime/thread_context.h                      | 20 +-------------------
 8 files changed, 15 insertions(+), 46 deletions(-)

diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index 4c6e108a871..2594221e153 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -441,7 +441,7 @@ Status PartitionedAggSinkLocalState::revoke_memory(
                     status = Status::InternalError(
                             "fault_inject partitioned_agg_sink "
                             "revoke_memory canceled");
-                    
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, status);
+                    state->get_query_ctx()->cancel(status);
                     return status;
                 });
                 Defer defer {[&]() {
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index 4869a02dc56..9adff22d52f 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -302,7 +302,7 @@ Status 
PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b
             auto st = Status::InternalError(
                     "fault_inject partitioned_agg_source "
                     "merge spill data canceled");
-            ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, st);
+            state->get_query_ctx()->cancel(st);
             return st;
         });
 
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 40b13e77b59..223a7f24013 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -231,12 +231,12 @@ Status 
PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
         return Status::OK();
     };
 
-    auto exception_catch_func = [query_id, spill_func]() {
+    auto exception_catch_func = [query_id, state, spill_func]() {
         
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::spill_probe_blocks_cancel",
 {
             auto status = Status::InternalError(
                     "fault_inject partitioned_hash_join_probe "
                     "spill_probe_blocks canceled");
-            ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, 
status);
+            state->get_query_ctx()->cancel(status);
             return status;
         });
 
@@ -347,12 +347,13 @@ Status 
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim
         return status;
     };
 
-    auto exception_catch_func = [read_func, query_id]() {
+    auto exception_catch_func = [read_func, state, query_id]() {
         
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::recover_build_blocks_cancel",
 {
             auto status = Status::InternalError(
                     "fault_inject partitioned_hash_join_probe "
                     "recover_build_blocks canceled");
-            ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, 
status);
+
+            state->get_query_ctx()->cancel(status);
             return status;
         });
 
@@ -451,12 +452,12 @@ Status 
PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_disk(Runtim
         return st;
     };
 
-    auto exception_catch_func = [read_func, query_id]() {
+    auto exception_catch_func = [read_func, state, query_id]() {
         
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::recover_probe_blocks_cancel",
 {
             auto status = Status::InternalError(
                     "fault_inject partitioned_hash_join_probe "
                     "recover_probe_blocks canceled");
-            ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, 
status);
+            state->get_query_ctx()->cancel(status);
             return status;
         });
 
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index a2c1b7cefc2..ae3ef2c4d57 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -368,12 +368,12 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
 
     SpillSinkRunnable spill_runnable(
             state, nullptr, operator_profile(),
-            [this, query_id] {
+            [this, state, query_id] {
                 
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_sink::revoke_memory_cancel",
 {
                     auto status = Status::InternalError(
                             "fault_inject partitioned_hash_join_sink "
                             "revoke_memory canceled");
-                    
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, status);
+                    state->get_query_ctx()->cancel(status);
                     return status;
                 });
                 SCOPED_TIMER(_spill_build_timer);
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp 
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index a2308ce415f..bac215e3f3c 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -264,12 +264,12 @@ Status 
SpillSortSinkLocalState::revoke_memory(RuntimeState* state,
         return Status::OK();
     };
 
-    auto exception_catch_func = [query_id, spill_func]() {
+    auto exception_catch_func = [query_id, state, spill_func]() {
         DBUG_EXECUTE_IF("fault_inject::spill_sort_sink::revoke_memory_cancel", 
{
             auto status = Status::InternalError(
                     "fault_inject spill_sort_sink "
                     "revoke_memory canceled");
-            ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, 
status);
+            state->get_query_ctx()->cancel(status);
             return status;
         });
 
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index e7f3ca3426c..0e72f3f4f5d 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -912,6 +912,7 @@ void FragmentMgr::cancel_query(const TUniqueId query_id, 
const Status reason) {
             return;
         }
     }
+    SCOPED_ATTACH_TASK(query_ctx->resource_ctx());
     query_ctx->cancel(reason);
     remove_query_context(query_id);
     LOG(INFO) << "Query " << print_id(query_id)
diff --git a/be/src/runtime/thread_context.cpp 
b/be/src/runtime/thread_context.cpp
index 9577b87a4f1..7e8354f6ff6 100644
--- a/be/src/runtime/thread_context.cpp
+++ b/be/src/runtime/thread_context.cpp
@@ -122,19 +122,4 @@ 
AddThreadMemTrackerConsumer::~AddThreadMemTrackerConsumer() {
     ThreadLocalHandle::del_thread_local_if_count_is_zero();
 }
 
-AddThreadMemTrackerConsumerByHook::AddThreadMemTrackerConsumerByHook(
-        const std::shared_ptr<MemTracker>& mem_tracker)
-        : _mem_tracker(mem_tracker) {
-    ThreadLocalHandle::create_thread_local_if_not_exits();
-    DCHECK(mem_tracker != nullptr);
-    use_mem_hook = true;
-    
thread_context()->thread_mem_tracker_mgr->push_consumer_tracker(_mem_tracker.get());
-}
-
-AddThreadMemTrackerConsumerByHook::~AddThreadMemTrackerConsumerByHook() {
-    thread_context()->thread_mem_tracker_mgr->pop_consumer_tracker();
-    use_mem_hook = false;
-    ThreadLocalHandle::del_thread_local_if_count_is_zero();
-}
-
 } // namespace doris
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index 94f50b48d17..54e565cc424 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -80,13 +80,6 @@
 #define SCOPED_PEAK_MEM(peak_mem) \
     auto VARNAME_LINENUM(scope_peak_mem) = doris::ScopedPeakMem(peak_mem)
 
-// Count a code segment memory (memory malloc - memory free) to MemTracker.
-// Compared to count `scope_mem`, MemTracker is easier to observe from the 
outside and is thread-safe.
-// Usage example: std::unique_ptr<MemTracker> tracker = 
std::make_unique<MemTracker>("first_tracker");
-//                { SCOPED_CONSUME_MEM_TRACKER_BY_HOOK(_mem_tracker.get()); 
xxx; xxx; }
-#define SCOPED_CONSUME_MEM_TRACKER_BY_HOOK(mem_tracker) \
-    auto VARNAME_LINENUM(add_mem_consumer) = 
doris::AddThreadMemTrackerConsumerByHook(mem_tracker)
-
 #define SCOPED_SKIP_MEMORY_CHECK() \
     auto VARNAME_LINENUM(scope_skip_memory_check) = 
doris::ScopeSkipMemoryCheck()
 
@@ -154,8 +147,6 @@ static std::string NO_THREAD_CONTEXT_MSG =
 // Is true after ThreadContext construction.
 inline thread_local bool pthread_context_ptr_init = false;
 inline thread_local constinit ThreadContext* thread_context_ptr = nullptr;
-// use mem hook to consume thread mem tracker.
-inline thread_local bool use_mem_hook = false;
 
 // The thread context saves some info about a working thread.
 // 2 required info:
@@ -383,15 +374,6 @@ private:
     bool _need_pop = false;
 };
 
-class AddThreadMemTrackerConsumerByHook {
-public:
-    explicit AddThreadMemTrackerConsumerByHook(const 
std::shared_ptr<MemTracker>& mem_tracker);
-    ~AddThreadMemTrackerConsumerByHook();
-
-private:
-    std::shared_ptr<MemTracker> _mem_tracker;
-};
-
 class ScopeSkipMemoryCheck {
 public:
     explicit ScopeSkipMemoryCheck() {
@@ -409,7 +391,7 @@ public:
 // must call create_thread_local_if_not_exits() before use thread_context().
 #define CONSUME_THREAD_MEM_TRACKER(size)                                       
                    \
     do {                                                                       
                    \
-        if (size == 0 || doris::use_mem_hook) {                                
                    \
+        if (size == 0) {                                                       
                    \
             break;                                                             
                    \
         }                                                                      
                    \
         if (doris::pthread_context_ptr_init) {                                 
                    \


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to