This is an automated email from the ASF dual-hosted git repository.

zouxinyi pushed a commit to branch resource_ctx
in repository https://gitbox.apache.org/repos/asf/doris.git

commit ca4c07ed2809157ea7c2ba1a91cd31d739144eb6
Author: Zou Xinyi <[email protected]>
AuthorDate: Mon Jan 6 20:33:55 2025 +0800

    1
---
 be/src/olap/rowid_conversion.h                     |   6 +-
 be/src/runtime/memory/memory_profile.h             |  18 +-
 be/src/runtime/memory/thread_mem_tracker_mgr.cpp   |  34 --
 be/src/runtime/memory/thread_mem_tracker_mgr.h     |  79 +++--
 be/src/runtime/query_context.cpp                   |  26 +-
 be/src/runtime/query_context.h                     |   4 +
 be/src/runtime/thread_context.cpp                  |  62 ++--
 be/src/runtime/thread_context.h                    | 357 ++-------------------
 be/src/runtime/thread_context_impl.h               | 158 +++++++++
 be/src/runtime/workload_management/cpu_context.h   |  71 ++--
 be/src/runtime/workload_management/io_context.h    | 118 +++----
 .../runtime/workload_management/memory_context.h   | 112 ++++---
 .../runtime/workload_management/resource_context.h |  91 +++---
 .../runtime/workload_management/task_controller.h  |  51 +++
 .../workload_management/workload_group_context.h   |  39 +++
 be/src/util/runtime_profile.h                      |   5 +
 be/src/vec/common/allocator.cpp                    |  14 +-
 17 files changed, 614 insertions(+), 631 deletions(-)

diff --git a/be/src/olap/rowid_conversion.h b/be/src/olap/rowid_conversion.h
index 8f9d96a136a..b07d683ddae 100644
--- a/be/src/olap/rowid_conversion.h
+++ b/be/src/olap/rowid_conversion.h
@@ -46,9 +46,9 @@ public:
                         "consuming "
                         "tracker:<{}>, peak used {}, current used {}.",
                         
doris::GlobalMemoryArbitrator::process_limit_exceeded_errmsg_str(),
-                        doris::thread_context()->thread_mem_tracker()->label(),
-                        
doris::thread_context()->thread_mem_tracker()->peak_consumption(),
-                        
doris::thread_context()->thread_mem_tracker()->consumption()));
+                        
doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->label(),
+                        
doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->peak_consumption(),
+                        
doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->consumption()));
             }
 
             uint32_t id = _segments_rowid_map.size();
diff --git a/be/src/runtime/memory/memory_profile.h 
b/be/src/runtime/memory/memory_profile.h
index c6aefb72f22..635a66ebf23 100644
--- a/be/src/runtime/memory/memory_profile.h
+++ b/be/src/runtime/memory/memory_profile.h
@@ -33,27 +33,27 @@ public:
     void make_memory_profile(RuntimeProfile* profile) const;
 
     std::string print_memory_overview_profile() const {
-        return return_memory_profile_str(_memory_overview_profile.get());
+        return _memory_overview_profile->pretty_print();
     }
 
     std::string print_global_memory_profile() const {
-        return return_memory_profile_str(_global_memory_profile.get().get());
+        return _global_memory_profile.get()->pretty_print();
     }
 
     std::string print_metadata_memory_profile() const {
-        return return_memory_profile_str(_metadata_memory_profile.get().get());
+        return _metadata_memory_profile.get()->pretty_print();
     }
 
     std::string print_cache_memory_profile() const {
-        return return_memory_profile_str(_cache_memory_profile.get().get());
+        return _cache_memory_profile.get()->pretty_print();
     }
 
     std::string print_top_memory_tasks_profile() const {
-        return 
return_memory_profile_str(_top_memory_tasks_profile.get().get());
+        return _top_memory_tasks_profile.get()->pretty_print();
     }
 
     std::string print_tasks_memory_profile() const {
-        return return_memory_profile_str(_tasks_memory_profile.get().get());
+        return _tasks_memory_profile.get()->pretty_print();
     }
 
     static int64_t query_current_usage();
@@ -67,12 +67,6 @@ public:
     void print_log_process_usage();
 
 private:
-    std::string return_memory_profile_str(const RuntimeProfile* profile) const 
{
-        std::stringstream ss;
-        profile->pretty_print(&ss);
-        return ss.str();
-    }
-
     void init_memory_overview_counter();
 
     std::unique_ptr<RuntimeProfile> _memory_overview_profile;
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp 
b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
index 3b40426f6ef..510b831633a 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
@@ -20,27 +20,9 @@
 #include <gen_cpp/types.pb.h>
 
 #include "runtime/exec_env.h"
-#include "runtime/fragment_mgr.h"
 
 namespace doris {
 
-class AsyncCancelQueryTask : public Runnable {
-    ENABLE_FACTORY_CREATOR(AsyncCancelQueryTask);
-
-public:
-    AsyncCancelQueryTask(TUniqueId query_id, const std::string& exceed_msg)
-            : _query_id(query_id), _exceed_msg(exceed_msg) {}
-    ~AsyncCancelQueryTask() override = default;
-    void run() override {
-        ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
-                _query_id, Status::MemoryLimitExceeded(_exceed_msg));
-    }
-
-private:
-    TUniqueId _query_id;
-    const std::string _exceed_msg;
-};
-
 void ThreadMemTrackerMgr::attach_limiter_tracker(
         const std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
     DCHECK(mem_tracker);
@@ -71,20 +53,4 @@ void ThreadMemTrackerMgr::detach_limiter_tracker(
     _limiter_tracker = old_mem_tracker;
 }
 
-void ThreadMemTrackerMgr::cancel_query(const std::string& exceed_msg) {
-    if (is_attach_query() && !_is_query_cancelled) {
-        Status submit_st = 
ExecEnv::GetInstance()->lazy_release_obj_pool()->submit(
-                AsyncCancelQueryTask::create_shared(_query_id, exceed_msg));
-        if (submit_st.ok()) {
-            // Use this flag to avoid the cancel request submit to pool many 
times, because even we cancel the query
-            // successfully, but the application may not use if 
(state.iscancelled) to exist quickly. And it may try to
-            // allocate memory and may failed again and the pool will be full.
-            _is_query_cancelled = true;
-        } else {
-            LOG(WARNING) << "Failed to submit cancel query task to pool, 
query_id "
-                         << print_id(_query_id) << ", error st " << submit_st;
-        }
-    }
-}
-
 } // namespace doris
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h 
b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index 9dbf4399492..dbb31c393f7 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -20,7 +20,6 @@
 #include <fmt/format.h>
 #include <gen_cpp/Types_types.h>
 #include <glog/logging.h>
-#include <stdint.h>
 
 #include <algorithm>
 #include <memory>
@@ -35,11 +34,17 @@
 #include "runtime/memory/mem_tracker_limiter.h"
 #include "runtime/workload_group/workload_group.h"
 #include "util/stack_util.h"
-#include "util/uid_util.h"
 
 namespace doris {
 
 constexpr size_t SYNC_PROC_RESERVED_INTERVAL_BYTES = (1ULL << 20); // 1M
+static std::string memory_orphan_check_msg =
+        "If you crash here, it means that SCOPED_ATTACH_TASK and "
+        "SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER are not used correctly. 
starting position of "
+        "each thread is expected to use SCOPED_ATTACH_TASK to bind a 
MemTrackerLimiter belonging "
+        "to Query/Load/Compaction/Other Tasks, otherwise memory alloc using 
Doris Allocator in the "
+        "thread will crash. If you want to switch MemTrackerLimiter during 
thread execution, "
+        "please use SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER, do not repeat 
Attach.";
 
 // Memory Hook is counted in the memory tracker of the current thread.
 class ThreadMemTrackerMgr {
@@ -61,6 +66,24 @@ public:
     void detach_limiter_tracker(const std::shared_ptr<MemTrackerLimiter>& 
old_mem_tracker =
                                         
ExecEnv::GetInstance()->orphan_mem_tracker());
 
+    void attach_task(const std::shared_ptr<MemTrackerLimiter>& mem_tracker,
+                     const std::weak_ptr<WorkloadGroup>& wg_wptr) {
+        DCHECK(mem_tracker);
+        // Orphan is thread default tracker.
+        // will only attach_task at the beginning of the thread function, 
there should be no duplicate attach_task.
+        DCHECK(_limiter_tracker->label() == "Orphan")
+                << ", thread mem tracker label: " << _limiter_tracker->label()
+                << ", attach mem tracker label: " << mem_tracker->label();
+        attach_limiter_tracker(mem_tracker);
+        _wg_wptr = wg_wptr;
+        enable_wait_gc();
+    }
+    void detach_task() {
+        detach_limiter_tracker();
+        _wg_wptr.reset();
+        disable_wait_gc();
+    }
+
     // Must be fast enough! Thread update_tracker may be called very 
frequently.
     bool push_consumer_tracker(MemTracker* mem_tracker);
     void pop_consumer_tracker();
@@ -68,31 +91,17 @@ public:
         return _consumer_tracker_stack.empty() ? "" : 
_consumer_tracker_stack.back()->label();
     }
 
-    void set_query_id(const TUniqueId& query_id) { _query_id = query_id; }
-
-    TUniqueId query_id() { return _query_id; }
-
-    void set_wg_wptr(const std::weak_ptr<WorkloadGroup>& wg_wptr) { _wg_wptr = 
wg_wptr; }
-
-    void reset_wg_wptr() { _wg_wptr.reset(); }
-
     // Note that, If call the memory allocation operation in Memory Hook,
     // such as calling LOG/iostream/sstream/stringstream/etc. related methods,
     // must increase the control to avoid entering infinite recursion, 
otherwise it may cause crash or stuck,
     // Returns whether the memory exceeds limit, and will consume mem trcker 
no matter whether the limit is exceeded.
-    void consume(int64_t size, int skip_large_memory_check = 0);
+    void consume(int64_t size);
     void flush_untracked_mem();
 
     doris::Status try_reserve(int64_t size);
 
     void release_reserved();
 
-    bool is_attach_query() { return _query_id != TUniqueId(); }
-
-    bool is_query_cancelled() const { return _is_query_cancelled; }
-
-    void reset_query_cancelled_flag(bool new_val) { _is_query_cancelled = 
new_val; }
-
     std::shared_ptr<MemTrackerLimiter> limiter_mem_tracker() {
         CHECK(init());
         return _limiter_tracker;
@@ -101,7 +110,6 @@ public:
     void enable_wait_gc() { _wait_gc = true; }
     void disable_wait_gc() { _wait_gc = false; }
     [[nodiscard]] bool wait_gc() const { return _wait_gc; }
-    void cancel_query(const std::string& exceed_msg);
 
     std::string print_debug_string() {
         fmt::memory_buffer consumer_tracker_buf;
@@ -118,6 +126,17 @@ public:
     int64_t untracked_mem() const { return _untracked_mem; }
     int64_t reserved_mem() const { return _reserved_mem; }
 
+    int skip_memory_check = 0;
+    int skip_large_memory_check = 0;
+
+    void memory_orphan_check() {
+#ifdef USE_MEM_TRACKER
+        DCHECK(doris::k_doris_exit || 
!doris::config::enable_memory_orphan_check ||
+               _limiter_tracker->label() != "Orphan")
+                << doris::memory_orphan_check_msg;
+#endif
+    }
+
 private:
     struct LastAttachSnapshot {
         int64_t reserved_mem = 0;
@@ -136,7 +155,7 @@ private:
     // so `attach_limiter_tracker` may be nested.
     std::vector<LastAttachSnapshot> _last_attach_snapshots_stack;
 
-    std::string _failed_consume_msg = std::string();
+    std::string _failed_consume_msg;
     // If true, the Allocator will wait for the GC to free memory if it finds 
that the memory exceed limit.
     // A thread of query/load will only wait once during execution.
     bool _wait_gc = false;
@@ -147,14 +166,13 @@ private:
 
     // If there is a memory new/delete operation in the consume method, it may 
enter infinite recursion.
     bool _stop_consume = false;
-    TUniqueId _query_id = TUniqueId();
-    bool _is_query_cancelled = false;
 };
 
 inline bool ThreadMemTrackerMgr::init() {
     // 1. Initialize in the thread context when the thread starts
     // 2. ExecEnv not initialized when thread start, initialized in 
limiter_mem_tracker().
-    if (_init) return true;
+    if (_init) { return true;
+}
     if (ExecEnv::GetInstance()->orphan_mem_tracker() != nullptr) {
         _limiter_tracker = ExecEnv::GetInstance()->orphan_mem_tracker();
         _wait_gc = true;
@@ -178,7 +196,8 @@ inline void ThreadMemTrackerMgr::pop_consumer_tracker() {
     _consumer_tracker_stack.pop_back();
 }
 
-inline void ThreadMemTrackerMgr::consume(int64_t size, int 
skip_large_memory_check) {
+inline void ThreadMemTrackerMgr::consume(int64_t size) {
+    memory_orphan_check();
     // `consumer_tracker` not support reserve memory and not require use 
`_untracked_mem` to batch consume,
     // because `consumer_tracker` will not be bound by many threads, so there 
is no performance problem.
     for (auto* tracker : _consumer_tracker_stack) {
@@ -236,22 +255,18 @@ inline void ThreadMemTrackerMgr::consume(int64_t size, 
int skip_large_memory_che
             size > doris::config::stacktrace_in_alloc_large_memory_bytes) {
             _stop_consume = true;
             LOG(WARNING) << fmt::format(
-                    "alloc large memory: {}, {}, this is just a warning, not 
prevent memory alloc, "
+                    "alloc large memory: {}, consume tracker: {}, this is just 
a warning, not prevent memory alloc, "
                     "stacktrace:\n{}",
-                    size,
-                    is_attach_query() ? "in query or load: " + 
print_id(_query_id)
-                                      : "not in query or load",
+                    size, _limiter_tracker->label(),
                     get_stack_trace());
             _stop_consume = false;
         }
         if (doris::config::crash_in_alloc_large_memory_bytes > 0 &&
             size > doris::config::crash_in_alloc_large_memory_bytes) {
             throw Exception(Status::FatalError(
-                    "alloc large memory: {}, {}, crash generate core dumpsto 
help analyze, "
+                    "alloc large memory: {}, consume tracker: {}, crash 
generate core dumpsto help analyze, "
                     "stacktrace:\n{}",
-                    size,
-                    is_attach_query() ? "in query or load: " + 
print_id(_query_id)
-                                      : "not in query or load",
+                    size, _limiter_tracker->label(),
                     get_stack_trace()));
         }
     }
@@ -282,6 +297,7 @@ inline doris::Status 
ThreadMemTrackerMgr::try_reserve(int64_t size) {
     DCHECK(_limiter_tracker);
     DCHECK(size >= 0);
     CHECK(init());
+    memory_orphan_check();
     // if _reserved_mem not equal to 0, repeat reserve,
     // _untracked_mem store bytes that not synchronized to process reserved 
memory.
     flush_untracked_mem();
@@ -317,6 +333,7 @@ inline doris::Status 
ThreadMemTrackerMgr::try_reserve(int64_t size) {
 }
 
 inline void ThreadMemTrackerMgr::release_reserved() {
+    memory_orphan_check();
     if (_reserved_mem != 0) {
         
doris::GlobalMemoryArbitrator::release_process_reserved_memory(_reserved_mem +
                                                                        
_untracked_mem);
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index c777c8100ef..cb501731653 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -35,6 +35,10 @@
 #include "runtime/fragment_mgr.h"
 #include "runtime/runtime_query_statistics_mgr.h"
 #include "runtime/runtime_state.h"
+#include "runtime/workload_management/cpu_context.h"
+#include "runtime/workload_management/io_context.h"
+#include "runtime/workload_management/memory_context.h"
+#include "runtime/workload_management/task_controller.h"
 #include "runtime/thread_context.h"
 #include "runtime/workload_group/workload_group_manager.h"
 #include "util/mem_info.h"
@@ -82,6 +86,7 @@ QueryContext::QueryContext(TUniqueId query_id, ExecEnv* 
exec_env,
           _query_source(query_source) {
     _init_query_mem_tracker();
     SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_mem_tracker);
+    _init_resource_context();
     _query_watcher.start();
     _shared_hash_table_controller.reset(new 
vectorized::SharedHashTableController());
     _execution_dependency = pipeline::Dependency::create_unique(-1, -1, 
"ExecutionDependency");
@@ -128,16 +133,33 @@ void QueryContext::_init_query_mem_tracker() {
         query_mem_tracker = MemTrackerLimiter::create_shared(
                 MemTrackerLimiter::Type::LOAD, fmt::format("Load#Id={}", 
print_id(_query_id)),
                 _bytes_limit);
-    } else { // EXTERNAL
+    } else if (_query_options.query_type == TQueryType::EXTERNAL) {
         query_mem_tracker = MemTrackerLimiter::create_shared(
-                MemTrackerLimiter::Type::LOAD, fmt::format("External#Id={}", 
print_id(_query_id)),
+                MemTrackerLimiter::Type::QUERY, fmt::format("External#Id={}", 
print_id(_query_id)),
                 _bytes_limit);
+    } else {
+        LOG(FATAL) << "__builtin_unreachable";
+        __builtin_unreachable();
     }
     if (_query_options.__isset.is_report_success && 
_query_options.is_report_success) {
         query_mem_tracker->enable_print_log_usage();
     }
 }
 
+void QueryContext::_init_resource_context() {
+    if (_query_options.query_type == TQueryType::SELECT) {
+        resource_context = 
ResourceContext::CreateResourceContext<QueryCPUContext, QueryMemoryContext, 
QueryIOContext, WorkloadGroupContext, QueryTaskController>();
+    } else if (_query_options.query_type == TQueryType::LOAD) {
+        resource_context = 
ResourceContext::CreateResourceContext<LoadCPUContext, LoadMemoryContext, 
LoadIOContext, WorkloadGroupContext, LoadTaskController>();
+    } else if (_query_options.query_type == TQueryType::EXTERNAL) {
+        resource_context = 
ResourceContext::CreateResourceContext<QueryCPUContext, QueryMemoryContext, 
QueryIOContext, WorkloadGroupContext, QueryTaskController>();
+    } else {
+        LOG(FATAL) << "__builtin_unreachable";
+        __builtin_unreachable();
+    }
+    
resource_context->memory_context()->set_memtracker_limiter(query_mem_tracker);
+}
+
 QueryContext::~QueryContext() {
     SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_mem_tracker);
     // query mem tracker consumption is equal to 0, it means that after 
QueryContext is created,
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 621c5ebca90..6ca01045706 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -34,6 +34,7 @@
 #include "runtime/exec_env.h"
 #include "runtime/memory/mem_tracker_limiter.h"
 #include "runtime/query_statistics.h"
+#include "runtime/workload_management/resource_context.h"
 #include "runtime/runtime_filter_mgr.h"
 #include "runtime/runtime_predicate.h"
 #include "util/hash_util.hpp"
@@ -235,6 +236,8 @@ public:
     // MemTracker that is shared by all fragment instances running on this 
host.
     std::shared_ptr<MemTrackerLimiter> query_mem_tracker;
 
+    std::shared_ptr<ResourceContext> resource_context;
+
     std::vector<TUniqueId> fragment_instance_ids;
 
     // plan node id -> TFileScanRangeParams
@@ -283,6 +286,7 @@ private:
     std::unique_ptr<ThreadPoolToken> _thread_token {nullptr};
 
     void _init_query_mem_tracker();
+    void _init_resource_context();
 
     std::shared_ptr<vectorized::SharedHashTableController> 
_shared_hash_table_controller;
     std::unordered_map<int, vectorized::RuntimePredicate> _runtime_predicates;
diff --git a/be/src/runtime/thread_context.cpp 
b/be/src/runtime/thread_context.cpp
index c89f532e592..c796f8b6166 100644
--- a/be/src/runtime/thread_context.cpp
+++ b/be/src/runtime/thread_context.cpp
@@ -18,53 +18,47 @@
 #include "runtime/thread_context.h"
 
 #include "common/signal_handler.h"
-#include "runtime/query_context.h"
-#include "runtime/runtime_state.h"
-#include "runtime/workload_group/workload_group_manager.h"
 
 namespace doris {
 class MemTracker;
 
-QueryThreadContext ThreadContext::query_thread_context() {
-    DCHECK(doris::pthread_context_ptr_init);
-    ORPHAN_TRACKER_CHECK();
-    return {_task_id, thread_mem_tracker_mgr->limiter_mem_tracker(), _wg_wptr};
-}
-
-void AttachTask::init(const QueryThreadContext& query_thread_context) {
+AttachTask::AttachTask(const std::shared_ptr<ResourceContext>& rc) {
     ThreadLocalHandle::create_thread_local_if_not_exits();
-    signal::set_signal_task_id(query_thread_context.query_id);
-    thread_context()->attach_task(query_thread_context.query_id,
-                                  query_thread_context.query_mem_tracker,
-                                  query_thread_context.wg_wptr);
-}
-
-AttachTask::AttachTask(const std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
-    QueryThreadContext query_thread_context = {TUniqueId(), mem_tracker};
-    init(query_thread_context);
+    signal::set_signal_task_id(rc->task_id());
+    thread_context()->attach_task(rc);
 }
 
-AttachTask::AttachTask(RuntimeState* runtime_state) {
-    signal::set_signal_is_nereids(runtime_state->is_nereids());
-    QueryThreadContext query_thread_context = {runtime_state->query_id(),
-                                               
runtime_state->query_mem_tracker(),
-                                               
runtime_state->get_query_ctx()->workload_group()};
-    init(query_thread_context);
+AttachTask::~AttachTask() {
+    thread_context()->detach_task();
+    ThreadLocalHandle::del_thread_local_if_count_is_zero();
 }
 
-AttachTask::AttachTask(const QueryThreadContext& query_thread_context) {
-    init(query_thread_context);
+SwitchThreadMemTrackerLimiter::SwitchThreadMemTrackerLimiter(
+            const std::shared_ptr<doris::MemTrackerLimiter>& mem_tracker) {
+    DCHECK(mem_tracker);
+    doris::ThreadLocalHandle::create_thread_local_if_not_exits();
+    if (mem_tracker != 
thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()) {
+        _old_mem_tracker = 
thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
+        
thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker);
+    }
 }
 
-AttachTask::AttachTask(QueryContext* query_ctx) {
-    QueryThreadContext query_thread_context = {query_ctx->query_id(), 
query_ctx->query_mem_tracker,
-                                               query_ctx->workload_group()};
-    init(query_thread_context);
+SwitchThreadMemTrackerLimiter::SwitchThreadMemTrackerLimiter(ResourceContext* 
rc) {
+        doris::ThreadLocalHandle::create_thread_local_if_not_exits();
+    DCHECK(rc->memory_context()->memtracker_limiter());
+    if (rc->memory_context()->memtracker_limiter() !=
+        thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()) {
+        _old_mem_tracker = 
thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
+        thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(
+                rc->memory_context()->memtracker_limiter());
+    }
 }
 
-AttachTask::~AttachTask() {
-    thread_context()->detach_task();
-    ThreadLocalHandle::del_thread_local_if_count_is_zero();
+SwitchThreadMemTrackerLimiter::~SwitchThreadMemTrackerLimiter() {
+    if (_old_mem_tracker != nullptr) {
+        
thread_context()->thread_mem_tracker_mgr->detach_limiter_tracker(_old_mem_tracker);
+    }
+    doris::ThreadLocalHandle::del_thread_local_if_count_is_zero();
 }
 
 AddThreadMemTrackerConsumer::AddThreadMemTrackerConsumer(MemTracker* 
mem_tracker) {
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index 9ba7949ec5a..386671aac08 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -17,21 +17,14 @@
 
 #pragma once
 
-#include <bthread/bthread.h>
-#include <bthread/types.h>
-#include <gen_cpp/Types_types.h>
-#include <stdint.h>
-
 #include <memory>
-#include <ostream>
 #include <string>
-#include <thread>
 
-#include "common/exception.h"
 #include "common/logging.h"
-#include "gutil/macros.h"
+#include "runtime/thread_context_impl.h"
 #include "runtime/exec_env.h"
 #include "runtime/memory/mem_tracker_limiter.h"
+#include "runtime/workload_management/resource_context.h"
 #include "runtime/memory/thread_mem_tracker_mgr.h"
 #include "util/defer_op.h" // IWYU pragma: keep
 
@@ -58,16 +51,7 @@
     auto VARNAME_LINENUM(add_mem_consumer) = 
doris::AddThreadMemTrackerConsumer(mem_tracker)
 
 #define DEFER_RELEASE_RESERVED() \
-    Defer VARNAME_LINENUM(defer) {[&]() { 
doris::thread_context()->release_reserved_memory(); }};
-
-#define ORPHAN_TRACKER_CHECK()                                                 
 \
-    DCHECK(doris::k_doris_exit || !doris::config::enable_memory_orphan_check 
|| \
-           doris::thread_context()->thread_mem_tracker()->label() != "Orphan") 
 \
-            << doris::memory_orphan_check_msg
-
-#define MEMORY_ORPHAN_CHECK()                                                 \
-    DCHECK(doris::k_doris_exit || !doris::config::enable_memory_orphan_check) \
-            << doris::memory_orphan_check_msg;
+    Defer VARNAME_LINENUM(defer) {[&]() { 
doris::thread_context()->thread_mem_tracker_mgr->release_reserved(); }};
 #else
 // thread context need to be initialized, required by Allocator and elsewhere.
 #define SCOPED_ATTACH_TASK(arg1, ...) \
@@ -77,8 +61,6 @@
 #define SCOPED_CONSUME_MEM_TRACKER(mem_tracker) \
     auto VARNAME_LINENUM(scoped_tls_cmt) = doris::ScopedInitThreadContext()
 #define DEFER_RELEASE_RESERVED() (void)0
-#define ORPHAN_TRACKER_CHECK() (void)0
-#define MEMORY_ORPHAN_CHECK() (void)0
 #endif
 
 #if defined(USE_MEM_TRACKER) && !defined(BE_TEST)
@@ -111,9 +93,9 @@
 #define SKIP_LARGE_MEMORY_CHECK(...)                                       \
     do {                                                                   \
         doris::ThreadLocalHandle::create_thread_local_if_not_exits();      \
-        doris::thread_context()->skip_large_memory_check++;                \
+        
doris::thread_context()->thread_mem_tracker_mgr->skip_large_memory_check++;     
           \
         DEFER({                                                            \
-            doris::thread_context()->skip_large_memory_check--;            \
+            
doris::thread_context()->thread_mem_tracker_mgr->skip_large_memory_check--;     
       \
             doris::ThreadLocalHandle::del_thread_local_if_count_is_zero(); \
         });                                                                \
         __VA_ARGS__;                                                       \
@@ -123,7 +105,7 @@
     std::shared_ptr<IOThrottle> iot = nullptr;                      \
     auto* t_ctx = doris::thread_context(true);                      \
     if (t_ctx) {                                                    \
-        iot = t_ctx->get_local_scan_io_throttle(data_dir);          \
+        iot = 
t_ctx->resource_ctx->workload_group_context()->workload_group()->get_local_scan_io_throttle(data_dir);
 \
     }                                                               \
     if (iot) {                                                      \
         iot->acquire(-1);                                           \
@@ -132,7 +114,7 @@
         [&]() {                                                     \
             if (iot) {                                              \
                 iot->update_next_io_time(*bytes_read);              \
-                t_ctx->update_local_scan_io(data_dir, *bytes_read); \
+                    iot = 
t_ctx->resource_ctx->workload_group_context()->workload_group()->update_local_scan_io(data_dir,
 *bytes_read); \
             }                                                       \
         }                                                           \
     }
@@ -141,7 +123,7 @@
     std::shared_ptr<IOThrottle> iot = nullptr;             \
     auto* t_ctx = doris::thread_context(true);             \
     if (t_ctx) {                                           \
-        iot = t_ctx->get_remote_scan_io_throttle();        \
+            iot = 
t_ctx->resource_ctx->workload_group_context()->workload_group()->get_remote_scan_io_throttle(data_dir);
 \
     }                                                      \
     if (iot) {                                             \
         iot->acquire(-1);                                  \
@@ -150,274 +132,21 @@
         [&]() {                                            \
             if (iot) {                                     \
                 iot->update_next_io_time(*bytes_read);     \
-                t_ctx->update_remote_scan_io(*bytes_read); \
+                    iot = 
t_ctx->resource_ctx->workload_group_context()->workload_group()->update_remote_scan_io(*bytes_read);
 \
             }                                              \
         }                                                  \
     }
 
 namespace doris {
 
-class ThreadContext;
 class MemTracker;
 class RuntimeState;
 class QueryThreadContext;
 class WorkloadGroup;
 
-extern bthread_key_t btls_key;
-
-// Is true after ThreadContext construction.
-inline thread_local bool pthread_context_ptr_init = false;
-inline thread_local constinit ThreadContext* thread_context_ptr = nullptr;
 // use mem hook to consume thread mem tracker.
 inline thread_local bool use_mem_hook = false;
 
-static std::string memory_orphan_check_msg =
-        "If you crash here, it means that SCOPED_ATTACH_TASK and "
-        "SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER are not used correctly. 
starting position of "
-        "each thread is expected to use SCOPED_ATTACH_TASK to bind a 
MemTrackerLimiter belonging "
-        "to Query/Load/Compaction/Other Tasks, otherwise memory alloc using 
Doris Allocator in the "
-        "thread will crash. If you want to switch MemTrackerLimiter during 
thread execution, "
-        "please use SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER, do not repeat 
Attach.";
-
-// The thread context saves some info about a working thread.
-// 2 required info:
-//   1. thread_id:   Current thread id, Auto generated.
-//   2. type(abolished):        The type is a enum value indicating which type 
of task current thread is running.
-//                   For example: QUERY, LOAD, COMPACTION, ...
-//   3. task id:     A unique id to identify this task. maybe query id, load 
job id, etc.
-//   4. ThreadMemTrackerMgr
-//
-// There may be other optional info to be added later.
-class ThreadContext {
-public:
-    ThreadContext() { thread_mem_tracker_mgr = 
std::make_unique<ThreadMemTrackerMgr>(); }
-
-    ~ThreadContext() = default;
-
-    void attach_task(const TUniqueId& task_id,
-                     const std::shared_ptr<MemTrackerLimiter>& mem_tracker,
-                     const std::weak_ptr<WorkloadGroup>& wg_wptr) {
-        // will only attach_task at the beginning of the thread function, 
there should be no duplicate attach_task.
-        DCHECK(mem_tracker);
-        // Orphan is thread default tracker.
-        DCHECK(thread_mem_tracker()->label() == "Orphan")
-                << ", thread mem tracker label: " << 
thread_mem_tracker()->label()
-                << ", attach mem tracker label: " << mem_tracker->label();
-        _task_id = task_id;
-        _wg_wptr = wg_wptr;
-        thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker);
-        thread_mem_tracker_mgr->set_query_id(_task_id);
-        thread_mem_tracker_mgr->set_wg_wptr(_wg_wptr);
-        thread_mem_tracker_mgr->enable_wait_gc();
-        thread_mem_tracker_mgr->reset_query_cancelled_flag(false);
-    }
-
-    void detach_task() {
-        _task_id = TUniqueId();
-        _wg_wptr.reset();
-        thread_mem_tracker_mgr->detach_limiter_tracker();
-        thread_mem_tracker_mgr->set_query_id(TUniqueId());
-        thread_mem_tracker_mgr->reset_wg_wptr();
-        thread_mem_tracker_mgr->disable_wait_gc();
-    }
-
-    [[nodiscard]] const TUniqueId& task_id() const { return _task_id; }
-
-    static std::string get_thread_id() {
-        std::stringstream ss;
-        ss << std::this_thread::get_id();
-        return ss.str();
-    }
-    // Note that if set global Memory Hook, After thread_mem_tracker_mgr is 
initialized,
-    // the current thread Hook starts to consume/release mem_tracker.
-    // the use of shared_ptr will cause a crash. The guess is that there is an
-    // intermediate state during the copy construction of shared_ptr. 
Shared_ptr is not equal
-    // to nullptr, but the object it points to is not initialized. At this 
time, when the memory
-    // is released somewhere, the hook is triggered to cause the crash.
-    std::unique_ptr<ThreadMemTrackerMgr> thread_mem_tracker_mgr;
-    [[nodiscard]] std::shared_ptr<MemTrackerLimiter> thread_mem_tracker() 
const {
-        return thread_mem_tracker_mgr->limiter_mem_tracker();
-    }
-
-    QueryThreadContext query_thread_context();
-
-    void consume_memory(const int64_t size) const {
-#ifdef USE_MEM_TRACKER
-        DCHECK(doris::k_doris_exit || 
!doris::config::enable_memory_orphan_check ||
-               thread_mem_tracker()->label() != "Orphan")
-                << doris::memory_orphan_check_msg;
-#endif
-        thread_mem_tracker_mgr->consume(size, skip_large_memory_check);
-    }
-
-    doris::Status try_reserve_memory(const int64_t size) const {
-#ifdef USE_MEM_TRACKER
-        DCHECK(doris::k_doris_exit || 
!doris::config::enable_memory_orphan_check ||
-               thread_mem_tracker()->label() != "Orphan")
-                << doris::memory_orphan_check_msg;
-#endif
-        return thread_mem_tracker_mgr->try_reserve(size);
-    }
-
-    void release_reserved_memory() const {
-#ifdef USE_MEM_TRACKER
-        DCHECK(doris::k_doris_exit || 
!doris::config::enable_memory_orphan_check ||
-               thread_mem_tracker()->label() != "Orphan")
-                << doris::memory_orphan_check_msg;
-#endif
-        thread_mem_tracker_mgr->release_reserved();
-    }
-
-    std::weak_ptr<WorkloadGroup> workload_group() { return _wg_wptr; }
-
-    std::shared_ptr<IOThrottle> get_local_scan_io_throttle(const std::string& 
data_dir) {
-        if (std::shared_ptr<WorkloadGroup> wg_ptr = _wg_wptr.lock()) {
-            return wg_ptr->get_local_scan_io_throttle(data_dir);
-        }
-        return nullptr;
-    }
-
-    std::shared_ptr<IOThrottle> get_remote_scan_io_throttle() {
-        if (std::shared_ptr<WorkloadGroup> wg_ptr = _wg_wptr.lock()) {
-            return wg_ptr->get_remote_scan_io_throttle();
-        }
-        return nullptr;
-    }
-
-    void update_local_scan_io(std::string path, size_t bytes_read) {
-        if (std::shared_ptr<WorkloadGroup> wg_ptr = _wg_wptr.lock()) {
-            wg_ptr->update_local_scan_io(path, bytes_read);
-        }
-    }
-
-    void update_remote_scan_io(size_t bytes_read) {
-        if (std::shared_ptr<WorkloadGroup> wg_ptr = _wg_wptr.lock()) {
-            wg_ptr->update_remote_scan_io(bytes_read);
-        }
-    }
-
-    int thread_local_handle_count = 0;
-    int skip_memory_check = 0;
-    int skip_large_memory_check = 0;
-
-private:
-    TUniqueId _task_id;
-    std::weak_ptr<WorkloadGroup> _wg_wptr;
-};
-
-class ThreadLocalHandle {
-public:
-    static void create_thread_local_if_not_exits() {
-        if (bthread_self() == 0) {
-            if (!pthread_context_ptr_init) {
-                thread_context_ptr = new ThreadContext();
-                pthread_context_ptr_init = true;
-            }
-            DCHECK(thread_context_ptr != nullptr);
-            thread_context_ptr->thread_local_handle_count++;
-        } else {
-            // Avoid calling bthread_getspecific frequently to get bthread 
local.
-            // Very frequent bthread_getspecific will slow, but 
create_thread_local_if_not_exits is not expected to be much.
-            // Cache the pointer of bthread local in pthead local.
-            auto* bthread_context = 
static_cast<ThreadContext*>(bthread_getspecific(btls_key));
-            if (bthread_context == nullptr) {
-                // If bthread_context == nullptr:
-                // 1. First call to bthread_getspecific (and before any 
bthread_setspecific) returns NULL
-                // 2. There are not enough reusable btls in btls pool.
-                // else if bthread_context != nullptr:
-                // 1. A new bthread starts, but get a reuses btls.
-                bthread_context = new ThreadContext;
-                // The brpc server should respond as quickly as possible.
-                bthread_context->thread_mem_tracker_mgr->disable_wait_gc();
-                // set the data so that next time bthread_getspecific in the 
thread returns the data.
-                CHECK(0 == bthread_setspecific(btls_key, bthread_context) || 
doris::k_doris_exit);
-            }
-            DCHECK(bthread_context != nullptr);
-            bthread_context->thread_local_handle_count++;
-        }
-    }
-
-    // `create_thread_local_if_not_exits` and 
`del_thread_local_if_count_is_zero` should be used in pairs,
-    // `del_thread_local_if_count_is_zero` should only be called if 
`create_thread_local_if_not_exits` returns true
-    static void del_thread_local_if_count_is_zero() {
-        if (pthread_context_ptr_init) {
-            // in pthread
-            thread_context_ptr->thread_local_handle_count--;
-            if (thread_context_ptr->thread_local_handle_count == 0) {
-                pthread_context_ptr_init = false;
-                delete doris::thread_context_ptr;
-                thread_context_ptr = nullptr;
-            }
-        } else if (bthread_self() != 0) {
-            // in bthread
-            auto* bthread_context = 
static_cast<ThreadContext*>(bthread_getspecific(btls_key));
-            DCHECK(bthread_context != nullptr);
-            bthread_context->thread_local_handle_count--;
-        } else {
-            throw Exception(Status::FatalError("__builtin_unreachable"));
-        }
-    }
-};
-
-// must call create_thread_local_if_not_exits() before use thread_context().
-static ThreadContext* thread_context(bool allow_return_null = false) {
-    if (pthread_context_ptr_init) {
-        // in pthread
-        DCHECK(bthread_self() == 0);
-        DCHECK(thread_context_ptr != nullptr);
-        return thread_context_ptr;
-    }
-    if (bthread_self() != 0) {
-        // in bthread
-        // bthread switching pthread may be very frequent, remember not to use 
lock or other time-consuming operations.
-        auto* bthread_context = 
static_cast<ThreadContext*>(bthread_getspecific(btls_key));
-        DCHECK(bthread_context != nullptr && 
bthread_context->thread_local_handle_count > 0);
-        return bthread_context;
-    }
-    if (allow_return_null) {
-        return nullptr;
-    }
-    // It means that use thread_context() but this thread not attached a 
query/load using SCOPED_ATTACH_TASK macro.
-    throw Exception(
-            Status::FatalError("__builtin_unreachable, {}", 
doris::memory_orphan_check_msg));
-}
-
-// belong to one query object member, not be shared by multiple queries.
-class QueryThreadContext {
-public:
-    QueryThreadContext() = default;
-    QueryThreadContext(const TUniqueId& query_id,
-                       const std::shared_ptr<MemTrackerLimiter>& mem_tracker,
-                       const std::weak_ptr<WorkloadGroup>& wg_wptr)
-            : query_id(query_id), query_mem_tracker(mem_tracker), 
wg_wptr(wg_wptr) {}
-    // If use WorkloadGroup and can get WorkloadGroup ptr, must as a parameter.
-    QueryThreadContext(const TUniqueId& query_id,
-                       const std::shared_ptr<MemTrackerLimiter>& mem_tracker)
-            : query_id(query_id), query_mem_tracker(mem_tracker) {}
-
-    // Not thread safe, generally be called in class constructor, shared_ptr 
use_count may be
-    // wrong when called by multiple threads, cause crash after object be 
destroyed prematurely.
-    void init_unlocked() {
-#ifndef BE_TEST
-        ORPHAN_TRACKER_CHECK();
-        query_id = doris::thread_context()->task_id();
-        query_mem_tracker = 
doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
-        wg_wptr = doris::thread_context()->workload_group();
-#else
-        query_id = TUniqueId();
-        query_mem_tracker = 
doris::ExecEnv::GetInstance()->orphan_mem_tracker();
-#endif
-    }
-
-    std::shared_ptr<MemTrackerLimiter> get_memory_tracker() { return 
query_mem_tracker; }
-
-    WorkloadGroupPtr get_workload_group_ptr() { return wg_wptr.lock(); }
-
-    TUniqueId query_id;
-    std::shared_ptr<MemTrackerLimiter> query_mem_tracker;
-    std::weak_ptr<WorkloadGroup> wg_wptr;
-};
-
 class ScopedPeakMem {
 public:
     explicit ScopedPeakMem(int64* peak_mem) : _peak_mem(peak_mem), 
_mem_tracker("ScopedPeakMem") {
@@ -446,52 +175,17 @@ public:
 
 class AttachTask {
 public:
-    // not query or load, initialize with memory tracker, empty query id and 
default normal workload group.
-    explicit AttachTask(const std::shared_ptr<MemTrackerLimiter>& mem_tracker);
-
-    // is query or load, initialize with memory tracker, query id and workload 
group wptr.
-    explicit AttachTask(RuntimeState* runtime_state);
-
-    explicit AttachTask(QueryContext* query_ctx);
-
-    explicit AttachTask(const QueryThreadContext& query_thread_context);
-
-    void init(const QueryThreadContext& query_thread_context);
+    explicit AttachTask(const std::shared_ptr<ResourceContext>& rc);
 
     ~AttachTask();
 };
 
 class SwitchThreadMemTrackerLimiter {
 public:
-    explicit SwitchThreadMemTrackerLimiter(
-            const std::shared_ptr<doris::MemTrackerLimiter>& mem_tracker) {
-        DCHECK(mem_tracker);
-        doris::ThreadLocalHandle::create_thread_local_if_not_exits();
-        if (mem_tracker != 
thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()) {
-            _old_mem_tracker = 
thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
-            
thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker);
-        }
-    }
-
-    explicit SwitchThreadMemTrackerLimiter(const doris::QueryThreadContext& 
query_thread_context) {
-        doris::ThreadLocalHandle::create_thread_local_if_not_exits();
-        DCHECK(thread_context()->task_id() ==
-               query_thread_context.query_id); // workload group alse not 
change
-        DCHECK(query_thread_context.query_mem_tracker);
-        if (query_thread_context.query_mem_tracker !=
-            thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()) {
-            _old_mem_tracker = 
thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
-            thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(
-                    query_thread_context.query_mem_tracker);
-        }
-    }
+    explicit SwitchThreadMemTrackerLimiter(const 
std::shared_ptr<doris::MemTrackerLimiter>& mem_tracker);
+    explicit SwitchThreadMemTrackerLimiter(ResourceContext* rc);
 
-    ~SwitchThreadMemTrackerLimiter() {
-        if (_old_mem_tracker != nullptr) {
-            
thread_context()->thread_mem_tracker_mgr->detach_limiter_tracker(_old_mem_tracker);
-        }
-        doris::ThreadLocalHandle::del_thread_local_if_count_is_zero();
-    }
+    ~SwitchThreadMemTrackerLimiter();
 
 private:
     std::shared_ptr<doris::MemTrackerLimiter> _old_mem_tracker {nullptr};
@@ -526,11 +220,11 @@ class ScopeSkipMemoryCheck {
 public:
     explicit ScopeSkipMemoryCheck() {
         ThreadLocalHandle::create_thread_local_if_not_exits();
-        doris::thread_context()->skip_memory_check++;
+        doris::thread_context()->thread_mem_tracker_mgr->skip_memory_check++;
     }
 
     ~ScopeSkipMemoryCheck() {
-        doris::thread_context()->skip_memory_check--;
+        doris::thread_context()->thread_mem_tracker_mgr->skip_memory_check--;
         ThreadLocalHandle::del_thread_local_if_count_is_zero();
     }
 };
@@ -540,14 +234,18 @@ public:
 // used to fix the tracking accuracy of caches.
 #define THREAD_MEM_TRACKER_TRANSFER_TO(size, tracker)                          
              \
     do {                                                                       
              \
-        ORPHAN_TRACKER_CHECK();                                                
              \
+        DCHECK(doris::k_doris_exit || 
!doris::config::enable_memory_orphan_check || \
+           
doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->label() 
!= "Orphan")  \
+            << doris::memory_orphan_check_msg;                                 
                             \
         
doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->transfer_to(
 \
                 size, tracker);                                                
              \
     } while (0)
 
 #define THREAD_MEM_TRACKER_TRANSFER_FROM(size, tracker)                        
                \
     do {                                                                       
                \
-        ORPHAN_TRACKER_CHECK();                                                
                \
+        DCHECK(doris::k_doris_exit || 
!doris::config::enable_memory_orphan_check || \
+           
doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->label() 
!= "Orphan")  \
+            << doris::memory_orphan_check_msg;                                 
                               \
         tracker->transfer_to(                                                  
                \
                 size, 
doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()); \
     } while (0)
@@ -556,20 +254,20 @@ public:
 #define CONSUME_THREAD_MEM_TRACKER_BY_HOOK(size)           \
     do {                                                   \
         if (doris::use_mem_hook) {                         \
-            doris::thread_context()->consume_memory(size); \
+            doris::thread_context()->thread_mem_tracker_mgr->consume(size); \
         }                                                  \
     } while (0)
 #define RELEASE_THREAD_MEM_TRACKER_BY_HOOK(size) 
CONSUME_THREAD_MEM_TRACKER_BY_HOOK(-size)
 #define CONSUME_THREAD_MEM_TRACKER_BY_HOOK_WITH_FN(size_fn, ...)           \
     do {                                                                   \
         if (doris::use_mem_hook) {                                         \
-            doris::thread_context()->consume_memory(size_fn(__VA_ARGS__)); \
+            
doris::thread_context()->thread_mem_tracker_mgr->consume(size_fn(__VA_ARGS__)); 
\
         }                                                                  \
     } while (0)
 #define RELEASE_THREAD_MEM_TRACKER_BY_HOOK_WITH_FN(size_fn, ...)            \
     do {                                                                    \
         if (doris::use_mem_hook) {                                          \
-            doris::thread_context()->consume_memory(-size_fn(__VA_ARGS__)); \
+            
doris::thread_context()->thread_mem_tracker_mgr->consume(-size_fn(__VA_ARGS__));
 \
         }                                                                   \
     } while (0)
 
@@ -582,12 +280,13 @@ public:
         }                                                                      
                \
         if (doris::pthread_context_ptr_init) {                                 
                \
             DCHECK(bthread_self() == 0);                                       
                \
-            doris::thread_context_ptr->consume_memory(size);                   
                \
+            doris::thread_context_ptr->thread_mem_tracker_mgr->consume(size);  
                                 \
         } else if (bthread_self() != 0) {                                      
                \
             
static_cast<doris::ThreadContext*>(bthread_getspecific(doris::btls_key))        
   \
-                    ->consume_memory(size);                                    
                \
+                    ->thread_mem_tracker_mgr->consume(size);                   
                                 \
         } else if (doris::ExecEnv::ready()) {                                  
                \
-            MEMORY_ORPHAN_CHECK();                                             
                \
+            DCHECK(doris::k_doris_exit || 
!doris::config::enable_memory_orphan_check)          \
+                << doris::memory_orphan_check_msg;                             
                \
             
doris::ExecEnv::GetInstance()->orphan_mem_tracker()->consume_no_update_peak(size);
 \
         }                                                                      
                \
     } while (0)
diff --git a/be/src/runtime/thread_context_impl.h 
b/be/src/runtime/thread_context_impl.h
new file mode 100644
index 00000000000..4cd7a4baad9
--- /dev/null
+++ b/be/src/runtime/thread_context_impl.h
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <bthread/bthread.h>
+#include <bthread/types.h>
+
+#include <string>
+#include <thread>
+
+#include "common/exception.h"
+#include "runtime/workload_management/resource_context.h"
+#include "runtime/memory/thread_mem_tracker_mgr.h"
+
+namespace doris {
+
+class ThreadContext;
+
+extern bthread_key_t btls_key;
+
+// Is true after ThreadContext construction.
+inline thread_local bool pthread_context_ptr_init = false;
+inline thread_local constinit ThreadContext* thread_context_ptr = nullptr;
+
+// The thread context saves some info about a working thread.
+// 2 required info:
+//   1. thread_id:   Current thread id, Auto generated.
+//   2. type(abolished):        The type is a enum value indicating which type 
of task current thread is running.
+//                   For example: QUERY, LOAD, COMPACTION, ...
+//   3. task id:     A unique id to identify this task. maybe query id, load 
job id, etc.
+//   4. ThreadMemTrackerMgr
+//
+// There may be other optional info to be added later.
+class ThreadContext {
+public:
+    ThreadContext() { thread_mem_tracker_mgr = 
std::make_unique<ThreadMemTrackerMgr>(); }
+
+    ~ThreadContext() = default;
+
+    void attach_task(const std::shared_ptr<ResourceContext>& rc) {
+        resource_ctx = rc;
+        
thread_mem_tracker_mgr->attach_task(rc->memory_context()->memtracker_limiter(), 
rc->workload_group_context()->workload_group());
+    }
+
+    void detach_task() {
+        resource_ctx.reset();
+        thread_mem_tracker_mgr->detach_task();
+    }
+
+    static std::string get_thread_id() {
+        std::stringstream ss;
+        ss << std::this_thread::get_id();
+        return ss.str();
+    }
+    // Note that if set global Memory Hook, After thread_mem_tracker_mgr is 
initialized,
+    // the current thread Hook starts to consume/release mem_tracker.
+    // the use of shared_ptr will cause a crash. The guess is that there is an
+    // intermediate state during the copy construction of shared_ptr. 
Shared_ptr is not equal
+    // to nullptr, but the object it points to is not initialized. At this 
time, when the memory
+    // is released somewhere, the hook is triggered to cause the crash.
+    std::unique_ptr<ThreadMemTrackerMgr> thread_mem_tracker_mgr;
+    std::shared_ptr<ResourceContext> resource_ctx;
+    int thread_local_handle_count = 0;
+};
+
+class ThreadLocalHandle {
+public:
+    static void create_thread_local_if_not_exits() {
+        if (bthread_self() == 0) {
+            if (!pthread_context_ptr_init) {
+                thread_context_ptr = new ThreadContext();
+                pthread_context_ptr_init = true;
+            }
+            DCHECK(thread_context_ptr != nullptr);
+            thread_context_ptr->thread_local_handle_count++;
+        } else {
+            // Avoid calling bthread_getspecific frequently to get bthread 
local.
+            // Very frequent bthread_getspecific will slow, but 
create_thread_local_if_not_exits is not expected to be much.
+            // Cache the pointer of bthread local in pthead local.
+            auto* bthread_context = 
static_cast<ThreadContext*>(bthread_getspecific(btls_key));
+            if (bthread_context == nullptr) {
+                // If bthread_context == nullptr:
+                // 1. First call to bthread_getspecific (and before any 
bthread_setspecific) returns NULL
+                // 2. There are not enough reusable btls in btls pool.
+                // else if bthread_context != nullptr:
+                // 1. A new bthread starts, but get a reuses btls.
+                bthread_context = new ThreadContext;
+                // The brpc server should respond as quickly as possible.
+                bthread_context->thread_mem_tracker_mgr->disable_wait_gc();
+                // set the data so that next time bthread_getspecific in the 
thread returns the data.
+                CHECK(0 == bthread_setspecific(btls_key, bthread_context) || 
doris::k_doris_exit);
+            }
+            DCHECK(bthread_context != nullptr);
+            bthread_context->thread_local_handle_count++;
+        }
+    }
+
+    // `create_thread_local_if_not_exits` and 
`del_thread_local_if_count_is_zero` should be used in pairs,
+    // `del_thread_local_if_count_is_zero` should only be called if 
`create_thread_local_if_not_exits` returns true
+    static void del_thread_local_if_count_is_zero() {
+        if (pthread_context_ptr_init) {
+            // in pthread
+            thread_context_ptr->thread_local_handle_count--;
+            if (thread_context_ptr->thread_local_handle_count == 0) {
+                pthread_context_ptr_init = false;
+                delete doris::thread_context_ptr;
+                thread_context_ptr = nullptr;
+            }
+        } else if (bthread_self() != 0) {
+            // in bthread
+            auto* bthread_context = 
static_cast<ThreadContext*>(bthread_getspecific(btls_key));
+            DCHECK(bthread_context != nullptr);
+            bthread_context->thread_local_handle_count--;
+        } else {
+            throw Exception(Status::FatalError("__builtin_unreachable"));
+        }
+    }
+};
+
+// must call create_thread_local_if_not_exits() before use thread_context().
+static ThreadContext* thread_context(bool allow_return_null = false) {
+    if (pthread_context_ptr_init) {
+        // in pthread
+        DCHECK(bthread_self() == 0);
+        DCHECK(thread_context_ptr != nullptr);
+        return thread_context_ptr;
+    }
+    if (bthread_self() != 0) {
+        // in bthread
+        // bthread switching pthread may be very frequent, remember not to use 
lock or other time-consuming operations.
+        auto* bthread_context = 
static_cast<ThreadContext*>(bthread_getspecific(btls_key));
+        DCHECK(bthread_context != nullptr && 
bthread_context->thread_local_handle_count > 0);
+        return bthread_context;
+    }
+    if (allow_return_null) {
+        return nullptr;
+    }
+    // It means that use thread_context() but this thread not attached a 
query/load using SCOPED_ATTACH_TASK macro.
+    throw Exception(
+            Status::FatalError("__builtin_unreachable, {}", 
doris::memory_orphan_check_msg));
+}
+
+} // namespace doris
diff --git a/be/src/runtime/workload_management/cpu_context.h 
b/be/src/runtime/workload_management/cpu_context.h
index ba6681074eb..41574a5c3b1 100644
--- a/be/src/runtime/workload_management/cpu_context.h
+++ b/be/src/runtime/workload_management/cpu_context.h
@@ -17,16 +17,8 @@
 
 #pragma once
 
-#include <stddef.h>
-#include <stdint.h>
-
-#include <atomic>
-#include <memory>
-#include <queue>
-#include <shared_mutex>
-#include <string>
-
-#include "common/status.h"
+#include "common/factory_creator.h"
+#include "util/runtime_profile.h"
 
 namespace doris {
 
@@ -34,28 +26,55 @@ class CPUContext : public 
std::enable_shared_from_this<CPUContext> {
     ENABLE_FACTORY_CREATOR(CPUContext);
 
 public:
-    // Used to collect cpu execution stats.
-    // The stats is not thread safe.
-    // For example, you should use a seperate object for every scanner and do 
merge and reset
-    class CPUStats {
-    public:
-        // Should add some cpu stats relared method here.
-        void reset();
-        void merge(CPUStats& stats);
-        std::string debug_string();
-    };
+    /*
+    * --------------------------------
+    * |          Property            |
+    * --------------------------------
+    * 1. operate them thread-safe.
+    * 2. all tasks are unified.
+    * 3. should not be operated frequently, use local variables to update 
Counter.
+    */
+
+    RuntimeProfile::Counter* cpu_cost_ms_counter_;
+
+    RuntimeProfile* profile() { return profile_.get(); }
+    std::string debug_string() { return profile_->pretty_print(); }
+
+    /*
+    * --------------------------------
+    * |           Action             |
+    * --------------------------------
+    */
 
-public:
-    CPUContext() {}
-    virtual ~CPUContext() = default;
     // Bind current thread to cgroup, only some load thread should do this.
     void bind_workload_group() {
-        // Call workload group method to bind current thread to cgroup
+        // TODO: Call workload group method to bind current thread to cgroup
     }
-    CPUStats* cpu_stats() { return &stats_; }
+
+protected:
+    CPUContext() { init_profile(); }
+    virtual ~CPUContext() = default;
 
 private:
-    CPUStats stats_;
+    void init_profile() {
+        profile_ = std::make_unique<RuntimeProfile>("MemoryContext");
+        cpu_cost_ms_counter_ = ADD_COUNTER(profile_, "RevokeWaitTimeMs", 
TUnit::TIME_MS);
+    }
+
+    // Used to collect memory execution stats.
+    std::unique_ptr<RuntimeProfile> profile_;
+};
+
+class QueryCPUContext : public CPUContext {
+    QueryCPUContext() = default;
+};
+
+class LoadCPUContext : public CPUContext {
+    LoadCPUContext() = default;
+};
+
+class CompactionCPUContext : public CPUContext {
+    CompactionCPUContext() = default;
 };
 
 } // namespace doris
diff --git a/be/src/runtime/workload_management/io_context.h 
b/be/src/runtime/workload_management/io_context.h
index 9d34b1811db..7ffca1eca96 100644
--- a/be/src/runtime/workload_management/io_context.h
+++ b/be/src/runtime/workload_management/io_context.h
@@ -17,16 +17,9 @@
 
 #pragma once
 
-#include <stddef.h>
-#include <stdint.h>
-
-#include <atomic>
-#include <memory>
-#include <queue>
-#include <shared_mutex>
-#include <string>
-
-#include "common/status.h"
+#include "common/factory_creator.h"
+#include "runtime/workload_management/io_throttle.h"
+#include "util/runtime_profile.h"
 
 namespace doris {
 
@@ -34,57 +27,70 @@ class IOContext : public 
std::enable_shared_from_this<IOContext> {
     ENABLE_FACTORY_CREATOR(IOContext);
 
 public:
-    // Used to collect io execution stats.
-    class IOStats {
-    public:
-        IOStats() = default;
-        virtual ~IOStats() = default;
-        void merge(IOStats stats);
-        void reset();
-        std::string debug_string();
-        int64_t scan_rows() { return scan_rows_; }
-        int64_t scan_bytes() { return scan_bytes_; }
-        int64_t scan_bytes_from_local_storage() { return 
scan_bytes_from_local_storage_; }
-        int64_t scan_bytes_from_remote_storage() { return 
scan_bytes_from_remote_storage_; }
-        int64_t returned_rows() { return returned_rows_; }
-        int64_t shuffle_send_bytes() { return shuffle_send_bytes_; }
-        int64_t shuffle_send_rows() { return shuffle_send_rows_; }
-
-        int64_t incr_scan_rows(int64_t delta) { return scan_rows_ + delta; }
-        int64_t incr_scan_bytes(int64_t delta) { return scan_bytes_ + delta; }
-        int64_t incr_scan_bytes_from_local_storage(int64_t delta) {
-            return scan_bytes_from_local_storage_ + delta;
-        }
-        int64_t incr_scan_bytes_from_remote_storage(int64_t delta) {
-            return scan_bytes_from_remote_storage_ + delta;
-        }
-        int64_t incr_returned_rows(int64_t delta) { return returned_rows_ + 
delta; }
-        int64_t incr_shuffle_send_bytes(int64_t delta) { return 
shuffle_send_bytes_ + delta; }
-        int64_t incr_shuffle_send_rows(int64_t delta) { return 
shuffle_send_rows_ + delta; }
-        std::string debug_string();
-
-    private:
-        int64_t scan_rows_ = 0;
-        int64_t scan_bytes_ = 0;
-        int64_t scan_bytes_from_local_storage_ = 0;
-        int64_t scan_bytes_from_remote_storage_ = 0;
-        // number rows returned by query.
-        // only set once by result sink when closing.
-        int64_t returned_rows_ = 0;
-        int64_t shuffle_send_bytes_ = 0;
-        int64_t shuffle_send_rows_ = 0;
-    };
+    /*
+    * --------------------------------
+    * |          Property            |
+    * --------------------------------
+    * 1. operate them thread-safe.
+    * 2. all tasks are unified.
+    * 3. should not be operated frequently, use local variables to update 
Counter.
+    */
+
+    RuntimeProfile::Counter* scan_rows_counter_;
+    RuntimeProfile::Counter* scan_bytes_counter_;
+    RuntimeProfile::Counter* scan_bytes_from_local_storage_counter_;
+    RuntimeProfile::Counter* scan_bytes_from_remote_storage_counter_;
+    // number rows returned by query.
+    // only set once by result sink when closing.
+    RuntimeProfile::Counter* returned_rows_counter_;
+    RuntimeProfile::Counter* shuffle_send_bytes_counter_;
+    RuntimeProfile::Counter* shuffle_send_rows_counter_;
+    RuntimeProfile* profile() { return profile_.get(); }
+    std::string debug_string() { return profile_->pretty_print(); }
+
+    /*
+    * --------------------------------
+    * |           Action             |
+    * --------------------------------
+    */
 
-public:
-    IOContext() {}
-    virtual ~IOContext() = default;
     IOThrottle* io_throttle() {
-        // get io throttle from workload group
+        // TODO: get io throttle from workload group
+        return nullptr;
     }
-    IOStats* stats() { return &stats_; }
+
+protected:
+    IOContext() { init_profile(); }
+    virtual ~IOContext() = default;
+
 
 private:
-    IOStats stats_;
+    void init_profile() {
+        profile_ = std::make_unique<RuntimeProfile>("MemoryContext");
+        scan_rows_counter_ = ADD_COUNTER(profile_, "ScanRows", TUnit::UNIT);
+        scan_bytes_counter_ = ADD_COUNTER(profile_, "ScanBytes", TUnit::BYTES);
+        scan_bytes_from_local_storage_counter_ = ADD_COUNTER(profile_, 
"ScanBytesFromLocalStorage", TUnit::BYTES);
+        scan_bytes_from_remote_storage_counter_ = ADD_COUNTER(profile_, 
"ScanBytesFromRemoteStorage", TUnit::BYTES);
+        returned_rows_counter_ = ADD_COUNTER(profile_, "ReturnedRows", 
TUnit::UNIT);
+        shuffle_send_bytes_counter_ = ADD_COUNTER(profile_, 
"ShuffleSendBytes", TUnit::BYTES);
+        shuffle_send_rows_counter_ = ADD_COUNTER(profile_, 
"ShuffleSendRowsCounter_", TUnit::UNIT);
+    }
+
+    // Used to collect memory execution stats.
+    std::unique_ptr<RuntimeProfile> profile_;
 };
 
+class QueryIOContext : public IOContext {
+    QueryIOContext() = default;
+};
+
+class LoadIOContext : public IOContext {
+    LoadIOContext() = default;
+};
+
+class CompactionIOContext : public IOContext {
+    CompactionIOContext() = default;
+};
+
+
 } // namespace doris
diff --git a/be/src/runtime/workload_management/memory_context.h 
b/be/src/runtime/workload_management/memory_context.h
index 3e3a067e840..979fa8f1966 100644
--- a/be/src/runtime/workload_management/memory_context.h
+++ b/be/src/runtime/workload_management/memory_context.h
@@ -17,60 +17,55 @@
 
 #pragma once
 
-#include <stddef.h>
-#include <stdint.h>
+#include <cstdint>
 
-#include <atomic>
-#include <memory>
-#include <queue>
-#include <shared_mutex>
 #include <string>
 
+#include "common/factory_creator.h"
 #include "common/status.h"
+#include "util/runtime_profile.h"
 
 namespace doris {
 
+class MemTrackerLimiter;
+
 class MemoryContext : public std::enable_shared_from_this<MemoryContext> {
     ENABLE_FACTORY_CREATOR(MemoryContext);
 
 public:
-    // Used to collect memory execution stats.
-    // The stats class is not thread safe, should not do concurrent 
modifications.
-    class MemoryStats {
-    public:
-        MemoryStats() = default;
-        virtual ~MemoryStats() = default;
-        void merge(MemoryStats& stats);
-        void reset();
-        std::string debug_string();
-        int64_t revoke_attempts() { return revoke_attempts_; }
-        int64_t revoke_wait_time_ms() { return revoke_wait_time_ms_; }
-        int64_t revoked_bytes() { return revoked_bytes_; }
-        int64_t max_peak_memory_bytes() { return max_peak_memory_bytes_; }
-        int64_t current_used_memory_bytes() { return 
current_used_memory_bytes_; }
-
-    private:
-        // Maximum memory peak for all backends.
-        // only set once by result sink when closing.
-        int64_t max_peak_memory_bytes_ = 0;
-        int64_t current_used_memory_bytes_ = 0;
-        // The total number of times that the revoke method is called.
-        int64_t revoke_attempts_ = 0;
-        // The time that waiting for revoke finished.
-        int64_t revoke_wait_time_ms_ = 0;
-        // The revoked bytes
-        int64_t revoked_bytes_ = 0;
-    };
-
-public:
-    MemoryContext(std::shared_ptr<MemtrackerLimiter> memtracker)
-            : memtracker_limiter_(memtracker) {}
-
-    virtual ~MemoryContext() = default;
-
-    MemtrackerLimiter* memtracker_limiter() { return 
memtracker_limiter_.get(); }
-
-    MemoryStats* stats() { return &stats_; }
+    /*
+    * --------------------------------
+    * |          Property            |
+    * --------------------------------
+    * 1. operate them thread-safe.
+    * 2. all tasks are unified.
+    * 3. should not be operated frequently, use local variables to update 
Counter.
+    */
+
+    RuntimeProfile::Counter* current_memory_bytes_counter_;
+    RuntimeProfile::Counter* peak_memory_bytes_counter_;
+    // Maximum memory peak for all backends.
+    // only set once by result sink when closing.
+    RuntimeProfile::Counter* max_peak_memory_bytes_counter_;
+    // The total number of times that the revoke method is called.
+    RuntimeProfile::Counter* revoke_attempts_counter_;
+    // The time that waiting for revoke finished.
+    RuntimeProfile::Counter* revoke_wait_time_ms_counter_;
+    // The revoked bytes
+    RuntimeProfile::Counter* revoked_bytes_counter_;
+
+    RuntimeProfile* profile() { return profile_.get(); }
+    std::shared_ptr<MemTrackerLimiter> memtracker_limiter() { return 
memtracker_limiter_; }
+    void set_memtracker_limiter(const std::shared_ptr<MemTrackerLimiter>& 
memtracker_limiter) {
+        memtracker_limiter_ = memtracker_limiter;
+    }
+    std::string debug_string() { return profile_->pretty_print(); }
+
+    /*
+    * --------------------------------
+    * |           Action             |
+    * --------------------------------
+    */
 
     // Following method is related with spill disk.
     // Compute the number of bytes could be released.
@@ -85,9 +80,36 @@ public:
 
     virtual Status leave_arbitration(Status reason) { return Status::OK(); }
 
+protected:
+    MemoryContext() { init_profile(); }
+    virtual ~MemoryContext() = default;
+
 private:
-    MemoryStats stats_;
-    std::shared_ptr<MemtrackerLimiter> memtracker_limiter_;
+    void init_profile() {
+        profile_ = std::make_unique<RuntimeProfile>("MemoryContext");
+        current_memory_bytes_counter_ = ADD_COUNTER(profile_, 
"CurrentMemoryBytes", TUnit::BYTES);
+        peak_memory_bytes_counter_ = ADD_COUNTER(profile_, "PeakMemoryBytes", 
TUnit::BYTES);
+        max_peak_memory_bytes_counter_ = ADD_COUNTER(profile_, 
"MaxPeakMemoryBytes", TUnit::BYTES);
+        revoke_attempts_counter_ = ADD_COUNTER(profile_, "RevokeAttempts", 
TUnit::UNIT);
+        revoke_wait_time_ms_counter_ = ADD_COUNTER(profile_, 
"RevokeWaitTimeMs", TUnit::TIME_MS);
+        revoked_bytes_counter_ = ADD_COUNTER(profile_, "RevokedBytes", 
TUnit::BYTES);
+    }
+
+    // Used to collect memory execution stats.
+    std::unique_ptr<RuntimeProfile> profile_;
+    std::shared_ptr<MemTrackerLimiter> memtracker_limiter_;
+};
+
+class QueryMemoryContext : public MemoryContext {
+    QueryMemoryContext() = default;
+};
+
+class LoadMemoryContext : public MemoryContext {
+    LoadMemoryContext() = default;
+};
+
+class CompactionMemoryContext : public MemoryContext {
+    CompactionMemoryContext() = default;
 };
 
 } // namespace doris
diff --git a/be/src/runtime/workload_management/resource_context.h 
b/be/src/runtime/workload_management/resource_context.h
index 29b21f7e719..613d3975e02 100644
--- a/be/src/runtime/workload_management/resource_context.h
+++ b/be/src/runtime/workload_management/resource_context.h
@@ -17,45 +17,19 @@
 
 #pragma once
 
-#include <stddef.h>
-#include <stdint.h>
-
-#include <atomic>
 #include <memory>
-#include <queue>
-#include <shared_mutex>
-#include <string>
-
-#include "common/status.h"
+#include "common/multi_version.h"
+#include "runtime/workload_group/workload_group.h"
+#include "runtime/workload_management/cpu_context.h"
+#include "runtime/workload_management/memory_context.h"
+#include "runtime/workload_management/workload_group_context.h"
+#include "runtime/workload_management/task_controller.h"
+#include "runtime/workload_management/io_context.h"
+#include "util/runtime_profile.h"
+#include "common/factory_creator.h"
 
 namespace doris {
 
-// Any task that allow cancel should implement this class.
-class TaskController {
-    ENABLE_FACTORY_CREATOR(TaskController);
-
-public:
-    virtual Status cancel(Status cancel_reason) { return Status::OK(); }
-    virtual Status running_time(int64_t* running_time_msecs) {
-        *running_time_msecs = 0;
-        return Status::OK();
-    }
-};
-
-class WorkloadGroupContext {
-    ENABLE_FACTORY_CREATOR(WorkloadGroupContext);
-
-public:
-    WorkloadGroupContext() = default;
-    virtual ~WorkloadGroupContext() = default;
-
-    WorkloadGroupPtr workload_group() { return _workload_group; }
-    void set_workload_group(WorkloadGroupPtr wg) { _workload_group = wg; }
-
-private:
-    WorkloadGroupPtr _workload_group = nullptr;
-};
-
 // Every task should have its own resource context. And BE may adjust the 
resource
 // context during running.
 // ResourceContext contains many contexts or controller, the task could 
implements their
@@ -64,14 +38,16 @@ class ResourceContext : public 
std::enable_shared_from_this<ResourceContext> {
     ENABLE_FACTORY_CREATOR(ResourceContext);
 
 public:
-    ResourceContext() {
-        // These all default values, it may be reset.
-        cpu_context_ = std::make_shared<CPUContext>();
-        memory_context_ = std::make_shared<MemoryContext>();
-        io_context_ = std::make_shared<IOContext>();
-        reclaimer_ = std::make_shared<ResourceReclaimer>();
+    ResourceContext(const std::shared_ptr<CPUContext>& cpu_context, const 
std::shared_ptr<MemoryContext>& memory_context, const 
std::shared_ptr<IOContext>& io_context, const 
std::shared_ptr<WorkloadGroupContext>& workload_group_context, const 
std::shared_ptr<TaskController>& task_controller):
+            cpu_context_(cpu_context), memory_context_(memory_context), 
io_context_(io_context), workload_group_context_(workload_group_context), 
task_controller_(task_controller) {
+        refresh_resource_profile();
+    }
+    ~ResourceContext() = default;
+
+    template<typename TCPUContext, typename TMemoryContext, typename 
TIOContext, typename TWorkloadGroupContext, typename TTaskController>
+    static std::shared_ptr<ResourceContext> CreateResourceContext() {
+        return std::make_shared<ResourceContext>(TCPUContext::create_shared(), 
TMemoryContext::create_shared(), TIOContext::create_shared(), 
TWorkloadGroupContext::create_shared(), TTaskController::create_shared());
     }
-    virtual ~ResourceContext() = default;
 
     // Only return the raw pointer to the caller, so that the caller should 
not save it to other variables.
     CPUContext* cpu_context() { return cpu_context_.get(); }
@@ -80,16 +56,23 @@ public:
     WorkloadGroupContext* workload_group_context() { return 
workload_group_context_.get(); }
     TaskController* task_controller() { return task_controller_.get(); }
 
-    void set_cpu_context(std::shared_ptr<CPUContext> cpu_context) { 
cpu_context_ = cpu_context; }
-    void set_memory_context(std::shared_ptr<MemoryContext> memory_context) {
-        memory_context_ = memory_context;
-    }
-    void set_io_context(std::shared_ptr<IOContext> io_context) { io_context_ = 
io_context; }
-    void set_workload_group_context(std::shared_ptr<WorkloadGroupContext> 
wg_context) {
-        workload_group_context_ = wg_context;
-    }
-    void set_task_controller(std::shared_ptr<TaskController> task_controller) {
-        task_controller_ = task_controller;
+    const TUniqueId& task_id() const { return task_id_; }
+    RuntimeProfile* profile() { return 
const_cast<RuntimeProfile*>(resource_profile_.get().get()); }
+    std::string debug_string() { return 
resource_profile_.get()->pretty_print(); }
+    void refresh_resource_profile() {
+        std::unique_ptr<RuntimeProfile> resource_profile = 
std::make_unique<RuntimeProfile>("ResourceContext");
+
+        RuntimeProfile* cpu_profile =
+                
resource_profile->create_child(cpu_context_->profile()->name(), true, false);
+        cpu_profile->merge(cpu_context_->profile());
+        RuntimeProfile* memory_profile =
+                
resource_profile->create_child(memory_context_->profile()->name(), true, false);
+        memory_profile->merge(memory_context_->profile());
+        RuntimeProfile* io_profile =
+                resource_profile->create_child(io_context_->profile()->name(), 
true, false);
+        io_profile->merge(io_context_->profile());
+
+        resource_profile_.set(std::move(resource_profile));
     }
 
 private:
@@ -99,6 +82,10 @@ private:
     std::shared_ptr<IOContext> io_context_ = nullptr;
     std::shared_ptr<WorkloadGroupContext> workload_group_context_ = nullptr;
     std::shared_ptr<TaskController> task_controller_ = nullptr;
+
+    TUniqueId task_id_;
+    MultiVersion<RuntimeProfile> resource_profile_;
+
 };
 
 } // namespace doris
diff --git a/be/src/runtime/workload_management/task_controller.h 
b/be/src/runtime/workload_management/task_controller.h
new file mode 100644
index 00000000000..26247eb11aa
--- /dev/null
+++ b/be/src/runtime/workload_management/task_controller.h
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "common/factory_creator.h"
+#include "common/status.h"
+
+namespace doris {
+
+class TaskController {
+    ENABLE_FACTORY_CREATOR(TaskController);
+
+public:
+    TaskController() = default;
+    virtual ~TaskController() = default;
+
+    virtual Status cancel(Status cancel_reason) { return Status::OK(); }
+    virtual Status running_time(int64_t* running_time_msecs) {
+        *running_time_msecs = 0;
+        return Status::OK();
+    }
+};
+
+class QueryTaskController : public TaskController {
+    QueryTaskController() = default;
+};
+
+class LoadTaskController : public TaskController {
+    LoadTaskController() = default;
+};
+
+class CompactionTaskController : public TaskController {
+    CompactionTaskController() = default;
+};
+
+}
diff --git a/be/src/runtime/workload_management/workload_group_context.h 
b/be/src/runtime/workload_management/workload_group_context.h
new file mode 100644
index 00000000000..fd2fcedbbec
--- /dev/null
+++ b/be/src/runtime/workload_management/workload_group_context.h
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "runtime/workload_group/workload_group.h"
+#include "common/factory_creator.h"
+
+namespace doris {
+
+class WorkloadGroupContext {
+    ENABLE_FACTORY_CREATOR(WorkloadGroupContext);
+
+public:
+    WorkloadGroupContext() = default;
+    virtual ~WorkloadGroupContext() = default;
+
+    WorkloadGroupPtr workload_group() { return _workload_group; }
+    void set_workload_group(WorkloadGroupPtr wg) { _workload_group = wg; }
+
+private:
+    WorkloadGroupPtr _workload_group = nullptr;
+};
+
+}
diff --git a/be/src/util/runtime_profile.h b/be/src/util/runtime_profile.h
index 7130acbd2f9..6f38c81e931 100644
--- a/be/src/util/runtime_profile.h
+++ b/be/src/util/runtime_profile.h
@@ -433,6 +433,11 @@ public:
     // Prints the counters in a name: value format.
     // Does not hold locks when it makes any function calls.
     void pretty_print(std::ostream* s, const std::string& prefix = "") const;
+    std::string pretty_print() const {
+        std::stringstream ss;
+        pretty_print(&ss);
+        return ss.str();
+    };
 
     // Serializes profile to thrift.
     // Does not hold locks when it makes any function calls.
diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp
index e407dbbaab4..161374f5274 100644
--- a/be/src/vec/common/allocator.cpp
+++ b/be/src/vec/common/allocator.cpp
@@ -86,9 +86,9 @@ void Allocator<clear_memory_, mmap_populate, use_mmap, 
MemoryAllocator>::sys_mem
         err_msg += fmt::format(
                 "Allocator sys memory check failed: Cannot alloc:{}, consuming 
"
                 "tracker:<{}>, peak used {}, current used {}, exec node:<{}>, 
{}.",
-                size, doris::thread_context()->thread_mem_tracker()->label(),
-                
doris::thread_context()->thread_mem_tracker()->peak_consumption(),
-                doris::thread_context()->thread_mem_tracker()->consumption(),
+                size, 
doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->label(),
+                
doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->peak_consumption(),
+                
doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->consumption(),
                 
doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker_label(),
                 
doris::GlobalMemoryArbitrator::process_limit_exceeded_errmsg_str());
 
@@ -175,10 +175,10 @@ void Allocator<clear_memory_, mmap_populate, use_mmap, 
MemoryAllocator>::memory_
     if (doris::thread_context()->skip_memory_check != 0) {
         return;
     }
-    auto st = doris::thread_context()->thread_mem_tracker()->check_limit(size);
+    auto st = 
doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->check_limit(size);
     if (!st) {
         auto err_msg = fmt::format("Allocator mem tracker check failed, {}", 
st.to_string());
-        
doris::thread_context()->thread_mem_tracker()->print_log_usage(err_msg);
+        
doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->print_log_usage(err_msg);
         // If the external catch, throw bad::alloc first, let the query 
actively cancel. Otherwise asynchronous cancel.
         if 
(doris::thread_context()->thread_mem_tracker_mgr->is_attach_query()) {
             doris::thread_context()->thread_mem_tracker_mgr->disable_wait_gc();
@@ -239,7 +239,7 @@ void Allocator<clear_memory_, mmap_populate, use_mmap, 
MemoryAllocator>::add_add
         return;
     }
 #endif
-    doris::thread_context()->thread_mem_tracker()->add_address_sanitizers(buf, 
size);
+    
doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->add_address_sanitizers(buf,
 size);
 }
 
 template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename 
MemoryAllocator>
@@ -250,7 +250,7 @@ void Allocator<clear_memory_, mmap_populate, use_mmap, 
MemoryAllocator>::remove_
         return;
     }
 #endif
-    
doris::thread_context()->thread_mem_tracker()->remove_address_sanitizers(buf, 
size);
+    
doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->remove_address_sanitizers(buf,
 size);
 }
 
 template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename 
MemoryAllocator>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to