This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 74560490956 branch-4.0: [bugfix](memory) should count memory when
cancel query is called #58252 (#58256)
74560490956 is described below
commit 74560490956077693e657bd292f6a92bf03a4115
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Nov 24 13:12:28 2025 +0800
branch-4.0: [bugfix](memory) should count memory when cancel query is
called #58252 (#58256)
Cherry-picked from #58252
Co-authored-by: yiguolei <[email protected]>
---
.../exec/partitioned_aggregation_sink_operator.cpp | 2 +-
.../exec/partitioned_aggregation_source_operator.cpp | 2 +-
.../exec/partitioned_hash_join_probe_operator.cpp | 13 +++++++------
.../exec/partitioned_hash_join_sink_operator.cpp | 4 ++--
be/src/pipeline/exec/spill_sort_sink_operator.cpp | 4 ++--
be/src/runtime/fragment_mgr.cpp | 1 +
be/src/runtime/thread_context.cpp | 15 ---------------
be/src/runtime/thread_context.h | 20 +-------------------
8 files changed, 15 insertions(+), 46 deletions(-)
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index 4c6e108a871..2594221e153 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -441,7 +441,7 @@ Status PartitionedAggSinkLocalState::revoke_memory(
status = Status::InternalError(
"fault_inject partitioned_agg_sink "
"revoke_memory canceled");
-
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, status);
+ state->get_query_ctx()->cancel(status);
return status;
});
Defer defer {[&]() {
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index 4869a02dc56..9adff22d52f 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -302,7 +302,7 @@ Status
PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b
auto st = Status::InternalError(
"fault_inject partitioned_agg_source "
"merge spill data canceled");
- ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, st);
+ state->get_query_ctx()->cancel(st);
return st;
});
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 40b13e77b59..223a7f24013 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -231,12 +231,12 @@ Status
PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
return Status::OK();
};
- auto exception_catch_func = [query_id, spill_func]() {
+ auto exception_catch_func = [query_id, state, spill_func]() {
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::spill_probe_blocks_cancel",
{
auto status = Status::InternalError(
"fault_inject partitioned_hash_join_probe "
"spill_probe_blocks canceled");
- ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id,
status);
+ state->get_query_ctx()->cancel(status);
return status;
});
@@ -347,12 +347,13 @@ Status
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim
return status;
};
- auto exception_catch_func = [read_func, query_id]() {
+ auto exception_catch_func = [read_func, state, query_id]() {
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::recover_build_blocks_cancel",
{
auto status = Status::InternalError(
"fault_inject partitioned_hash_join_probe "
"recover_build_blocks canceled");
- ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id,
status);
+
+ state->get_query_ctx()->cancel(status);
return status;
});
@@ -451,12 +452,12 @@ Status
PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_disk(Runtim
return st;
};
- auto exception_catch_func = [read_func, query_id]() {
+ auto exception_catch_func = [read_func, state, query_id]() {
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::recover_probe_blocks_cancel",
{
auto status = Status::InternalError(
"fault_inject partitioned_hash_join_probe "
"recover_probe_blocks canceled");
- ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id,
status);
+ state->get_query_ctx()->cancel(status);
return status;
});
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index a2c1b7cefc2..ae3ef2c4d57 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -368,12 +368,12 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
SpillSinkRunnable spill_runnable(
state, nullptr, operator_profile(),
- [this, query_id] {
+ [this, state, query_id] {
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_sink::revoke_memory_cancel",
{
auto status = Status::InternalError(
"fault_inject partitioned_hash_join_sink "
"revoke_memory canceled");
-
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, status);
+ state->get_query_ctx()->cancel(status);
return status;
});
SCOPED_TIMER(_spill_build_timer);
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index a2308ce415f..bac215e3f3c 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -264,12 +264,12 @@ Status
SpillSortSinkLocalState::revoke_memory(RuntimeState* state,
return Status::OK();
};
- auto exception_catch_func = [query_id, spill_func]() {
+ auto exception_catch_func = [query_id, state, spill_func]() {
DBUG_EXECUTE_IF("fault_inject::spill_sort_sink::revoke_memory_cancel",
{
auto status = Status::InternalError(
"fault_inject spill_sort_sink "
"revoke_memory canceled");
- ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id,
status);
+ state->get_query_ctx()->cancel(status);
return status;
});
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index e7f3ca3426c..0e72f3f4f5d 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -912,6 +912,7 @@ void FragmentMgr::cancel_query(const TUniqueId query_id,
const Status reason) {
return;
}
}
+ SCOPED_ATTACH_TASK(query_ctx->resource_ctx());
query_ctx->cancel(reason);
remove_query_context(query_id);
LOG(INFO) << "Query " << print_id(query_id)
diff --git a/be/src/runtime/thread_context.cpp
b/be/src/runtime/thread_context.cpp
index 9577b87a4f1..7e8354f6ff6 100644
--- a/be/src/runtime/thread_context.cpp
+++ b/be/src/runtime/thread_context.cpp
@@ -122,19 +122,4 @@
AddThreadMemTrackerConsumer::~AddThreadMemTrackerConsumer() {
ThreadLocalHandle::del_thread_local_if_count_is_zero();
}
-AddThreadMemTrackerConsumerByHook::AddThreadMemTrackerConsumerByHook(
- const std::shared_ptr<MemTracker>& mem_tracker)
- : _mem_tracker(mem_tracker) {
- ThreadLocalHandle::create_thread_local_if_not_exits();
- DCHECK(mem_tracker != nullptr);
- use_mem_hook = true;
-
thread_context()->thread_mem_tracker_mgr->push_consumer_tracker(_mem_tracker.get());
-}
-
-AddThreadMemTrackerConsumerByHook::~AddThreadMemTrackerConsumerByHook() {
- thread_context()->thread_mem_tracker_mgr->pop_consumer_tracker();
- use_mem_hook = false;
- ThreadLocalHandle::del_thread_local_if_count_is_zero();
-}
-
} // namespace doris
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index 94f50b48d17..54e565cc424 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -80,13 +80,6 @@
#define SCOPED_PEAK_MEM(peak_mem) \
auto VARNAME_LINENUM(scope_peak_mem) = doris::ScopedPeakMem(peak_mem)
-// Count a code segment memory (memory malloc - memory free) to MemTracker.
-// Compared to count `scope_mem`, MemTracker is easier to observe from the
outside and is thread-safe.
-// Usage example: std::unique_ptr<MemTracker> tracker =
std::make_unique<MemTracker>("first_tracker");
-// { SCOPED_CONSUME_MEM_TRACKER_BY_HOOK(_mem_tracker.get());
xxx; xxx; }
-#define SCOPED_CONSUME_MEM_TRACKER_BY_HOOK(mem_tracker) \
- auto VARNAME_LINENUM(add_mem_consumer) =
doris::AddThreadMemTrackerConsumerByHook(mem_tracker)
-
#define SCOPED_SKIP_MEMORY_CHECK() \
auto VARNAME_LINENUM(scope_skip_memory_check) =
doris::ScopeSkipMemoryCheck()
@@ -154,8 +147,6 @@ static std::string NO_THREAD_CONTEXT_MSG =
// Is true after ThreadContext construction.
inline thread_local bool pthread_context_ptr_init = false;
inline thread_local constinit ThreadContext* thread_context_ptr = nullptr;
-// use mem hook to consume thread mem tracker.
-inline thread_local bool use_mem_hook = false;
// The thread context saves some info about a working thread.
// 2 required info:
@@ -383,15 +374,6 @@ private:
bool _need_pop = false;
};
-class AddThreadMemTrackerConsumerByHook {
-public:
- explicit AddThreadMemTrackerConsumerByHook(const
std::shared_ptr<MemTracker>& mem_tracker);
- ~AddThreadMemTrackerConsumerByHook();
-
-private:
- std::shared_ptr<MemTracker> _mem_tracker;
-};
-
class ScopeSkipMemoryCheck {
public:
explicit ScopeSkipMemoryCheck() {
@@ -409,7 +391,7 @@ public:
// must call create_thread_local_if_not_exits() before use thread_context().
#define CONSUME_THREAD_MEM_TRACKER(size)
\
do {
\
- if (size == 0 || doris::use_mem_hook) {
\
+ if (size == 0) {
\
break;
\
}
\
if (doris::pthread_context_ptr_init) {
\
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]