This is an automated email from the ASF dual-hosted git repository. zouxinyi pushed a commit to branch resource_ctx in repository https://gitbox.apache.org/repos/asf/doris.git
commit ca4c07ed2809157ea7c2ba1a91cd31d739144eb6 Author: Zou Xinyi <[email protected]> AuthorDate: Mon Jan 6 20:33:55 2025 +0800 1 --- be/src/olap/rowid_conversion.h | 6 +- be/src/runtime/memory/memory_profile.h | 18 +- be/src/runtime/memory/thread_mem_tracker_mgr.cpp | 34 -- be/src/runtime/memory/thread_mem_tracker_mgr.h | 79 +++-- be/src/runtime/query_context.cpp | 26 +- be/src/runtime/query_context.h | 4 + be/src/runtime/thread_context.cpp | 62 ++-- be/src/runtime/thread_context.h | 357 ++------------------- be/src/runtime/thread_context_impl.h | 158 +++++++++ be/src/runtime/workload_management/cpu_context.h | 71 ++-- be/src/runtime/workload_management/io_context.h | 118 +++---- .../runtime/workload_management/memory_context.h | 112 ++++--- .../runtime/workload_management/resource_context.h | 91 +++--- .../runtime/workload_management/task_controller.h | 51 +++ .../workload_management/workload_group_context.h | 39 +++ be/src/util/runtime_profile.h | 5 + be/src/vec/common/allocator.cpp | 14 +- 17 files changed, 614 insertions(+), 631 deletions(-) diff --git a/be/src/olap/rowid_conversion.h b/be/src/olap/rowid_conversion.h index 8f9d96a136a..b07d683ddae 100644 --- a/be/src/olap/rowid_conversion.h +++ b/be/src/olap/rowid_conversion.h @@ -46,9 +46,9 @@ public: "consuming " "tracker:<{}>, peak used {}, current used {}.", doris::GlobalMemoryArbitrator::process_limit_exceeded_errmsg_str(), - doris::thread_context()->thread_mem_tracker()->label(), - doris::thread_context()->thread_mem_tracker()->peak_consumption(), - doris::thread_context()->thread_mem_tracker()->consumption())); + doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->label(), + doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->peak_consumption(), + doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->consumption())); } uint32_t id = _segments_rowid_map.size(); diff --git a/be/src/runtime/memory/memory_profile.h b/be/src/runtime/memory/memory_profile.h index c6aefb72f22..635a66ebf23 100644 --- a/be/src/runtime/memory/memory_profile.h +++ b/be/src/runtime/memory/memory_profile.h @@ -33,27 +33,27 @@ public: void make_memory_profile(RuntimeProfile* profile) const; std::string print_memory_overview_profile() const { - return return_memory_profile_str(_memory_overview_profile.get()); + return _memory_overview_profile->pretty_print(); } std::string print_global_memory_profile() const { - return return_memory_profile_str(_global_memory_profile.get().get()); + return _global_memory_profile.get()->pretty_print(); } std::string print_metadata_memory_profile() const { - return return_memory_profile_str(_metadata_memory_profile.get().get()); + return _metadata_memory_profile.get()->pretty_print(); } std::string print_cache_memory_profile() const { - return return_memory_profile_str(_cache_memory_profile.get().get()); + return _cache_memory_profile.get()->pretty_print(); } std::string print_top_memory_tasks_profile() const { - return return_memory_profile_str(_top_memory_tasks_profile.get().get()); + return _top_memory_tasks_profile.get()->pretty_print(); } std::string print_tasks_memory_profile() const { - return return_memory_profile_str(_tasks_memory_profile.get().get()); + return _tasks_memory_profile.get()->pretty_print(); } static int64_t query_current_usage(); @@ -67,12 +67,6 @@ public: void print_log_process_usage(); private: - std::string return_memory_profile_str(const RuntimeProfile* profile) const { - std::stringstream ss; - profile->pretty_print(&ss); - return ss.str(); - } - void init_memory_overview_counter(); std::unique_ptr<RuntimeProfile> _memory_overview_profile; diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp index 3b40426f6ef..510b831633a 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp @@ -20,27 +20,9 @@ #include <gen_cpp/types.pb.h> #include "runtime/exec_env.h" -#include "runtime/fragment_mgr.h" namespace doris { -class AsyncCancelQueryTask : public Runnable { - ENABLE_FACTORY_CREATOR(AsyncCancelQueryTask); - -public: - AsyncCancelQueryTask(TUniqueId query_id, const std::string& exceed_msg) - : _query_id(query_id), _exceed_msg(exceed_msg) {} - ~AsyncCancelQueryTask() override = default; - void run() override { - ExecEnv::GetInstance()->fragment_mgr()->cancel_query( - _query_id, Status::MemoryLimitExceeded(_exceed_msg)); - } - -private: - TUniqueId _query_id; - const std::string _exceed_msg; -}; - void ThreadMemTrackerMgr::attach_limiter_tracker( const std::shared_ptr<MemTrackerLimiter>& mem_tracker) { DCHECK(mem_tracker); @@ -71,20 +53,4 @@ void ThreadMemTrackerMgr::detach_limiter_tracker( _limiter_tracker = old_mem_tracker; } -void ThreadMemTrackerMgr::cancel_query(const std::string& exceed_msg) { - if (is_attach_query() && !_is_query_cancelled) { - Status submit_st = ExecEnv::GetInstance()->lazy_release_obj_pool()->submit( - AsyncCancelQueryTask::create_shared(_query_id, exceed_msg)); - if (submit_st.ok()) { - // Use this flag to avoid the cancel request submit to pool many times, because even we cancel the query - // successfully, but the application may not use if (state.iscancelled) to exist quickly. And it may try to - // allocate memory and may failed again and the pool will be full. - _is_query_cancelled = true; - } else { - LOG(WARNING) << "Failed to submit cancel query task to pool, query_id " - << print_id(_query_id) << ", error st " << submit_st; - } - } -} - } // namespace doris diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index 9dbf4399492..dbb31c393f7 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -20,7 +20,6 @@ #include <fmt/format.h> #include <gen_cpp/Types_types.h> #include <glog/logging.h> -#include <stdint.h> #include <algorithm> #include <memory> @@ -35,11 +34,17 @@ #include "runtime/memory/mem_tracker_limiter.h" #include "runtime/workload_group/workload_group.h" #include "util/stack_util.h" -#include "util/uid_util.h" namespace doris { constexpr size_t SYNC_PROC_RESERVED_INTERVAL_BYTES = (1ULL << 20); // 1M +static std::string memory_orphan_check_msg = + "If you crash here, it means that SCOPED_ATTACH_TASK and " + "SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER are not used correctly. starting position of " + "each thread is expected to use SCOPED_ATTACH_TASK to bind a MemTrackerLimiter belonging " + "to Query/Load/Compaction/Other Tasks, otherwise memory alloc using Doris Allocator in the " + "thread will crash. If you want to switch MemTrackerLimiter during thread execution, " + "please use SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER, do not repeat Attach."; // Memory Hook is counted in the memory tracker of the current thread. class ThreadMemTrackerMgr { @@ -61,6 +66,24 @@ public: void detach_limiter_tracker(const std::shared_ptr<MemTrackerLimiter>& old_mem_tracker = ExecEnv::GetInstance()->orphan_mem_tracker()); + void attach_task(const std::shared_ptr<MemTrackerLimiter>& mem_tracker, + const std::weak_ptr<WorkloadGroup>& wg_wptr) { + DCHECK(mem_tracker); + // Orphan is thread default tracker. + // will only attach_task at the beginning of the thread function, there should be no duplicate attach_task. + DCHECK(_limiter_tracker->label() == "Orphan") + << ", thread mem tracker label: " << _limiter_tracker->label() + << ", attach mem tracker label: " << mem_tracker->label(); + attach_limiter_tracker(mem_tracker); + _wg_wptr = wg_wptr; + enable_wait_gc(); + } + void detach_task() { + detach_limiter_tracker(); + _wg_wptr.reset(); + disable_wait_gc(); + } + // Must be fast enough! Thread update_tracker may be called very frequently. bool push_consumer_tracker(MemTracker* mem_tracker); void pop_consumer_tracker(); @@ -68,31 +91,17 @@ public: return _consumer_tracker_stack.empty() ? "" : _consumer_tracker_stack.back()->label(); } - void set_query_id(const TUniqueId& query_id) { _query_id = query_id; } - - TUniqueId query_id() { return _query_id; } - - void set_wg_wptr(const std::weak_ptr<WorkloadGroup>& wg_wptr) { _wg_wptr = wg_wptr; } - - void reset_wg_wptr() { _wg_wptr.reset(); } - // Note that, If call the memory allocation operation in Memory Hook, // such as calling LOG/iostream/sstream/stringstream/etc. related methods, // must increase the control to avoid entering infinite recursion, otherwise it may cause crash or stuck, // Returns whether the memory exceeds limit, and will consume mem trcker no matter whether the limit is exceeded. - void consume(int64_t size, int skip_large_memory_check = 0); + void consume(int64_t size); void flush_untracked_mem(); doris::Status try_reserve(int64_t size); void release_reserved(); - bool is_attach_query() { return _query_id != TUniqueId(); } - - bool is_query_cancelled() const { return _is_query_cancelled; } - - void reset_query_cancelled_flag(bool new_val) { _is_query_cancelled = new_val; } - std::shared_ptr<MemTrackerLimiter> limiter_mem_tracker() { CHECK(init()); return _limiter_tracker; @@ -101,7 +110,6 @@ public: void enable_wait_gc() { _wait_gc = true; } void disable_wait_gc() { _wait_gc = false; } [[nodiscard]] bool wait_gc() const { return _wait_gc; } - void cancel_query(const std::string& exceed_msg); std::string print_debug_string() { fmt::memory_buffer consumer_tracker_buf; @@ -118,6 +126,17 @@ public: int64_t untracked_mem() const { return _untracked_mem; } int64_t reserved_mem() const { return _reserved_mem; } + int skip_memory_check = 0; + int skip_large_memory_check = 0; + + void memory_orphan_check() { +#ifdef USE_MEM_TRACKER + DCHECK(doris::k_doris_exit || !doris::config::enable_memory_orphan_check || + _limiter_tracker->label() != "Orphan") + << doris::memory_orphan_check_msg; +#endif + } + private: struct LastAttachSnapshot { int64_t reserved_mem = 0; @@ -136,7 +155,7 @@ private: // so `attach_limiter_tracker` may be nested. std::vector<LastAttachSnapshot> _last_attach_snapshots_stack; - std::string _failed_consume_msg = std::string(); + std::string _failed_consume_msg; // If true, the Allocator will wait for the GC to free memory if it finds that the memory exceed limit. // A thread of query/load will only wait once during execution. bool _wait_gc = false; @@ -147,14 +166,13 @@ private: // If there is a memory new/delete operation in the consume method, it may enter infinite recursion. bool _stop_consume = false; - TUniqueId _query_id = TUniqueId(); - bool _is_query_cancelled = false; }; inline bool ThreadMemTrackerMgr::init() { // 1. Initialize in the thread context when the thread starts // 2. ExecEnv not initialized when thread start, initialized in limiter_mem_tracker(). - if (_init) return true; + if (_init) { return true; +} if (ExecEnv::GetInstance()->orphan_mem_tracker() != nullptr) { _limiter_tracker = ExecEnv::GetInstance()->orphan_mem_tracker(); _wait_gc = true; @@ -178,7 +196,8 @@ inline void ThreadMemTrackerMgr::pop_consumer_tracker() { _consumer_tracker_stack.pop_back(); } -inline void ThreadMemTrackerMgr::consume(int64_t size, int skip_large_memory_check) { +inline void ThreadMemTrackerMgr::consume(int64_t size) { + memory_orphan_check(); // `consumer_tracker` not support reserve memory and not require use `_untracked_mem` to batch consume, // because `consumer_tracker` will not be bound by many threads, so there is no performance problem. for (auto* tracker : _consumer_tracker_stack) { @@ -236,22 +255,18 @@ inline void ThreadMemTrackerMgr::consume(int64_t size, int skip_large_memory_che size > doris::config::stacktrace_in_alloc_large_memory_bytes) { _stop_consume = true; LOG(WARNING) << fmt::format( - "alloc large memory: {}, {}, this is just a warning, not prevent memory alloc, " + "alloc large memory: {}, consume tracker: {}, this is just a warning, not prevent memory alloc, " "stacktrace:\n{}", - size, - is_attach_query() ? "in query or load: " + print_id(_query_id) - : "not in query or load", + size, _limiter_tracker->label(), get_stack_trace()); _stop_consume = false; } if (doris::config::crash_in_alloc_large_memory_bytes > 0 && size > doris::config::crash_in_alloc_large_memory_bytes) { throw Exception(Status::FatalError( - "alloc large memory: {}, {}, crash generate core dumpsto help analyze, " + "alloc large memory: {}, consume tracker: {}, crash generate core dumpsto help analyze, " "stacktrace:\n{}", - size, - is_attach_query() ? "in query or load: " + print_id(_query_id) - : "not in query or load", + size, _limiter_tracker->label(), get_stack_trace())); } } @@ -282,6 +297,7 @@ inline doris::Status ThreadMemTrackerMgr::try_reserve(int64_t size) { DCHECK(_limiter_tracker); DCHECK(size >= 0); CHECK(init()); + memory_orphan_check(); // if _reserved_mem not equal to 0, repeat reserve, // _untracked_mem store bytes that not synchronized to process reserved memory. flush_untracked_mem(); @@ -317,6 +333,7 @@ inline doris::Status ThreadMemTrackerMgr::try_reserve(int64_t size) { } inline void ThreadMemTrackerMgr::release_reserved() { + memory_orphan_check(); if (_reserved_mem != 0) { doris::GlobalMemoryArbitrator::release_process_reserved_memory(_reserved_mem + _untracked_mem); diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index c777c8100ef..cb501731653 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -35,6 +35,10 @@ #include "runtime/fragment_mgr.h" #include "runtime/runtime_query_statistics_mgr.h" #include "runtime/runtime_state.h" +#include "runtime/workload_management/cpu_context.h" +#include "runtime/workload_management/io_context.h" +#include "runtime/workload_management/memory_context.h" +#include "runtime/workload_management/task_controller.h" #include "runtime/thread_context.h" #include "runtime/workload_group/workload_group_manager.h" #include "util/mem_info.h" @@ -82,6 +86,7 @@ QueryContext::QueryContext(TUniqueId query_id, ExecEnv* exec_env, _query_source(query_source) { _init_query_mem_tracker(); SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_mem_tracker); + _init_resource_context(); _query_watcher.start(); _shared_hash_table_controller.reset(new vectorized::SharedHashTableController()); _execution_dependency = pipeline::Dependency::create_unique(-1, -1, "ExecutionDependency"); @@ -128,16 +133,33 @@ void QueryContext::_init_query_mem_tracker() { query_mem_tracker = MemTrackerLimiter::create_shared( MemTrackerLimiter::Type::LOAD, fmt::format("Load#Id={}", print_id(_query_id)), _bytes_limit); - } else { // EXTERNAL + } else if (_query_options.query_type == TQueryType::EXTERNAL) { query_mem_tracker = MemTrackerLimiter::create_shared( - MemTrackerLimiter::Type::LOAD, fmt::format("External#Id={}", print_id(_query_id)), + MemTrackerLimiter::Type::QUERY, fmt::format("External#Id={}", print_id(_query_id)), _bytes_limit); + } else { + LOG(FATAL) << "__builtin_unreachable"; + __builtin_unreachable(); } if (_query_options.__isset.is_report_success && _query_options.is_report_success) { query_mem_tracker->enable_print_log_usage(); } } +void QueryContext::_init_resource_context() { + if (_query_options.query_type == TQueryType::SELECT) { + resource_context = ResourceContext::CreateResourceContext<QueryCPUContext, QueryMemoryContext, QueryIOContext, WorkloadGroupContext, QueryTaskController>(); + } else if (_query_options.query_type == TQueryType::LOAD) { + resource_context = ResourceContext::CreateResourceContext<LoadCPUContext, LoadMemoryContext, LoadIOContext, WorkloadGroupContext, LoadTaskController>(); + } else if (_query_options.query_type == TQueryType::EXTERNAL) { + resource_context = ResourceContext::CreateResourceContext<QueryCPUContext, QueryMemoryContext, QueryIOContext, WorkloadGroupContext, QueryTaskController>(); + } else { + LOG(FATAL) << "__builtin_unreachable"; + __builtin_unreachable(); + } + resource_context->memory_context()->set_memtracker_limiter(query_mem_tracker); +} + QueryContext::~QueryContext() { SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_mem_tracker); // query mem tracker consumption is equal to 0, it means that after QueryContext is created, diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 621c5ebca90..6ca01045706 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -34,6 +34,7 @@ #include "runtime/exec_env.h" #include "runtime/memory/mem_tracker_limiter.h" #include "runtime/query_statistics.h" +#include "runtime/workload_management/resource_context.h" #include "runtime/runtime_filter_mgr.h" #include "runtime/runtime_predicate.h" #include "util/hash_util.hpp" @@ -235,6 +236,8 @@ public: // MemTracker that is shared by all fragment instances running on this host. std::shared_ptr<MemTrackerLimiter> query_mem_tracker; + std::shared_ptr<ResourceContext> resource_context; + std::vector<TUniqueId> fragment_instance_ids; // plan node id -> TFileScanRangeParams @@ -283,6 +286,7 @@ private: std::unique_ptr<ThreadPoolToken> _thread_token {nullptr}; void _init_query_mem_tracker(); + void _init_resource_context(); std::shared_ptr<vectorized::SharedHashTableController> _shared_hash_table_controller; std::unordered_map<int, vectorized::RuntimePredicate> _runtime_predicates; diff --git a/be/src/runtime/thread_context.cpp b/be/src/runtime/thread_context.cpp index c89f532e592..c796f8b6166 100644 --- a/be/src/runtime/thread_context.cpp +++ b/be/src/runtime/thread_context.cpp @@ -18,53 +18,47 @@ #include "runtime/thread_context.h" #include "common/signal_handler.h" -#include "runtime/query_context.h" -#include "runtime/runtime_state.h" -#include "runtime/workload_group/workload_group_manager.h" namespace doris { class MemTracker; -QueryThreadContext ThreadContext::query_thread_context() { - DCHECK(doris::pthread_context_ptr_init); - ORPHAN_TRACKER_CHECK(); - return {_task_id, thread_mem_tracker_mgr->limiter_mem_tracker(), _wg_wptr}; -} - -void AttachTask::init(const QueryThreadContext& query_thread_context) { +AttachTask::AttachTask(const std::shared_ptr<ResourceContext>& rc) { ThreadLocalHandle::create_thread_local_if_not_exits(); - signal::set_signal_task_id(query_thread_context.query_id); - thread_context()->attach_task(query_thread_context.query_id, - query_thread_context.query_mem_tracker, - query_thread_context.wg_wptr); -} - -AttachTask::AttachTask(const std::shared_ptr<MemTrackerLimiter>& mem_tracker) { - QueryThreadContext query_thread_context = {TUniqueId(), mem_tracker}; - init(query_thread_context); + signal::set_signal_task_id(rc->task_id()); + thread_context()->attach_task(rc); } -AttachTask::AttachTask(RuntimeState* runtime_state) { - signal::set_signal_is_nereids(runtime_state->is_nereids()); - QueryThreadContext query_thread_context = {runtime_state->query_id(), - runtime_state->query_mem_tracker(), - runtime_state->get_query_ctx()->workload_group()}; - init(query_thread_context); +AttachTask::~AttachTask() { + thread_context()->detach_task(); + ThreadLocalHandle::del_thread_local_if_count_is_zero(); } -AttachTask::AttachTask(const QueryThreadContext& query_thread_context) { - init(query_thread_context); +SwitchThreadMemTrackerLimiter::SwitchThreadMemTrackerLimiter( + const std::shared_ptr<doris::MemTrackerLimiter>& mem_tracker) { + DCHECK(mem_tracker); + doris::ThreadLocalHandle::create_thread_local_if_not_exits(); + if (mem_tracker != thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()) { + _old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker(); + thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker); + } } -AttachTask::AttachTask(QueryContext* query_ctx) { - QueryThreadContext query_thread_context = {query_ctx->query_id(), query_ctx->query_mem_tracker, - query_ctx->workload_group()}; - init(query_thread_context); +SwitchThreadMemTrackerLimiter::SwitchThreadMemTrackerLimiter(ResourceContext* rc) { + doris::ThreadLocalHandle::create_thread_local_if_not_exits(); + DCHECK(rc->memory_context()->memtracker_limiter()); + if (rc->memory_context()->memtracker_limiter() != + thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()) { + _old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker(); + thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker( + rc->memory_context()->memtracker_limiter()); + } } -AttachTask::~AttachTask() { - thread_context()->detach_task(); - ThreadLocalHandle::del_thread_local_if_count_is_zero(); +SwitchThreadMemTrackerLimiter::~SwitchThreadMemTrackerLimiter() { + if (_old_mem_tracker != nullptr) { + thread_context()->thread_mem_tracker_mgr->detach_limiter_tracker(_old_mem_tracker); + } + doris::ThreadLocalHandle::del_thread_local_if_count_is_zero(); } AddThreadMemTrackerConsumer::AddThreadMemTrackerConsumer(MemTracker* mem_tracker) { diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index 9ba7949ec5a..386671aac08 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -17,21 +17,14 @@ #pragma once -#include <bthread/bthread.h> -#include <bthread/types.h> -#include <gen_cpp/Types_types.h> -#include <stdint.h> - #include <memory> -#include <ostream> #include <string> -#include <thread> -#include "common/exception.h" #include "common/logging.h" -#include "gutil/macros.h" +#include "runtime/thread_context_impl.h" #include "runtime/exec_env.h" #include "runtime/memory/mem_tracker_limiter.h" +#include "runtime/workload_management/resource_context.h" #include "runtime/memory/thread_mem_tracker_mgr.h" #include "util/defer_op.h" // IWYU pragma: keep @@ -58,16 +51,7 @@ auto VARNAME_LINENUM(add_mem_consumer) = doris::AddThreadMemTrackerConsumer(mem_tracker) #define DEFER_RELEASE_RESERVED() \ - Defer VARNAME_LINENUM(defer) {[&]() { doris::thread_context()->release_reserved_memory(); }}; - -#define ORPHAN_TRACKER_CHECK() \ - DCHECK(doris::k_doris_exit || !doris::config::enable_memory_orphan_check || \ - doris::thread_context()->thread_mem_tracker()->label() != "Orphan") \ - << doris::memory_orphan_check_msg - -#define MEMORY_ORPHAN_CHECK() \ - DCHECK(doris::k_doris_exit || !doris::config::enable_memory_orphan_check) \ - << doris::memory_orphan_check_msg; + Defer VARNAME_LINENUM(defer) {[&]() { doris::thread_context()->thread_mem_tracker_mgr->release_reserved(); }}; #else // thread context need to be initialized, required by Allocator and elsewhere. #define SCOPED_ATTACH_TASK(arg1, ...) \ @@ -77,8 +61,6 @@ #define SCOPED_CONSUME_MEM_TRACKER(mem_tracker) \ auto VARNAME_LINENUM(scoped_tls_cmt) = doris::ScopedInitThreadContext() #define DEFER_RELEASE_RESERVED() (void)0 -#define ORPHAN_TRACKER_CHECK() (void)0 -#define MEMORY_ORPHAN_CHECK() (void)0 #endif #if defined(USE_MEM_TRACKER) && !defined(BE_TEST) @@ -111,9 +93,9 @@ #define SKIP_LARGE_MEMORY_CHECK(...) \ do { \ doris::ThreadLocalHandle::create_thread_local_if_not_exits(); \ - doris::thread_context()->skip_large_memory_check++; \ + doris::thread_context()->thread_mem_tracker_mgr->skip_large_memory_check++; \ DEFER({ \ - doris::thread_context()->skip_large_memory_check--; \ + doris::thread_context()->thread_mem_tracker_mgr->skip_large_memory_check--; \ doris::ThreadLocalHandle::del_thread_local_if_count_is_zero(); \ }); \ __VA_ARGS__; \ @@ -123,7 +105,7 @@ std::shared_ptr<IOThrottle> iot = nullptr; \ auto* t_ctx = doris::thread_context(true); \ if (t_ctx) { \ - iot = t_ctx->get_local_scan_io_throttle(data_dir); \ + iot = t_ctx->resource_ctx->workload_group_context()->workload_group()->get_local_scan_io_throttle(data_dir); \ } \ if (iot) { \ iot->acquire(-1); \ @@ -132,7 +114,7 @@ [&]() { \ if (iot) { \ iot->update_next_io_time(*bytes_read); \ - t_ctx->update_local_scan_io(data_dir, *bytes_read); \ + iot = t_ctx->resource_ctx->workload_group_context()->workload_group()->update_local_scan_io(data_dir, *bytes_read); \ } \ } \ } @@ -141,7 +123,7 @@ std::shared_ptr<IOThrottle> iot = nullptr; \ auto* t_ctx = doris::thread_context(true); \ if (t_ctx) { \ - iot = t_ctx->get_remote_scan_io_throttle(); \ + iot = t_ctx->resource_ctx->workload_group_context()->workload_group()->get_remote_scan_io_throttle(data_dir); \ } \ if (iot) { \ iot->acquire(-1); \ @@ -150,274 +132,21 @@ [&]() { \ if (iot) { \ iot->update_next_io_time(*bytes_read); \ - t_ctx->update_remote_scan_io(*bytes_read); \ + iot = t_ctx->resource_ctx->workload_group_context()->workload_group()->update_remote_scan_io(*bytes_read); \ } \ } \ } namespace doris { -class ThreadContext; class MemTracker; class RuntimeState; class QueryThreadContext; class WorkloadGroup; -extern bthread_key_t btls_key; - -// Is true after ThreadContext construction. -inline thread_local bool pthread_context_ptr_init = false; -inline thread_local constinit ThreadContext* thread_context_ptr = nullptr; // use mem hook to consume thread mem tracker. inline thread_local bool use_mem_hook = false; -static std::string memory_orphan_check_msg = - "If you crash here, it means that SCOPED_ATTACH_TASK and " - "SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER are not used correctly. starting position of " - "each thread is expected to use SCOPED_ATTACH_TASK to bind a MemTrackerLimiter belonging " - "to Query/Load/Compaction/Other Tasks, otherwise memory alloc using Doris Allocator in the " - "thread will crash. If you want to switch MemTrackerLimiter during thread execution, " - "please use SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER, do not repeat Attach."; - -// The thread context saves some info about a working thread. -// 2 required info: -// 1. thread_id: Current thread id, Auto generated. -// 2. type(abolished): The type is a enum value indicating which type of task current thread is running. -// For example: QUERY, LOAD, COMPACTION, ... -// 3. task id: A unique id to identify this task. maybe query id, load job id, etc. -// 4. ThreadMemTrackerMgr -// -// There may be other optional info to be added later. -class ThreadContext { -public: - ThreadContext() { thread_mem_tracker_mgr = std::make_unique<ThreadMemTrackerMgr>(); } - - ~ThreadContext() = default; - - void attach_task(const TUniqueId& task_id, - const std::shared_ptr<MemTrackerLimiter>& mem_tracker, - const std::weak_ptr<WorkloadGroup>& wg_wptr) { - // will only attach_task at the beginning of the thread function, there should be no duplicate attach_task. - DCHECK(mem_tracker); - // Orphan is thread default tracker. - DCHECK(thread_mem_tracker()->label() == "Orphan") - << ", thread mem tracker label: " << thread_mem_tracker()->label() - << ", attach mem tracker label: " << mem_tracker->label(); - _task_id = task_id; - _wg_wptr = wg_wptr; - thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker); - thread_mem_tracker_mgr->set_query_id(_task_id); - thread_mem_tracker_mgr->set_wg_wptr(_wg_wptr); - thread_mem_tracker_mgr->enable_wait_gc(); - thread_mem_tracker_mgr->reset_query_cancelled_flag(false); - } - - void detach_task() { - _task_id = TUniqueId(); - _wg_wptr.reset(); - thread_mem_tracker_mgr->detach_limiter_tracker(); - thread_mem_tracker_mgr->set_query_id(TUniqueId()); - thread_mem_tracker_mgr->reset_wg_wptr(); - thread_mem_tracker_mgr->disable_wait_gc(); - } - - [[nodiscard]] const TUniqueId& task_id() const { return _task_id; } - - static std::string get_thread_id() { - std::stringstream ss; - ss << std::this_thread::get_id(); - return ss.str(); - } - // Note that if set global Memory Hook, After thread_mem_tracker_mgr is initialized, - // the current thread Hook starts to consume/release mem_tracker. - // the use of shared_ptr will cause a crash. The guess is that there is an - // intermediate state during the copy construction of shared_ptr. Shared_ptr is not equal - // to nullptr, but the object it points to is not initialized. At this time, when the memory - // is released somewhere, the hook is triggered to cause the crash. - std::unique_ptr<ThreadMemTrackerMgr> thread_mem_tracker_mgr; - [[nodiscard]] std::shared_ptr<MemTrackerLimiter> thread_mem_tracker() const { - return thread_mem_tracker_mgr->limiter_mem_tracker(); - } - - QueryThreadContext query_thread_context(); - - void consume_memory(const int64_t size) const { -#ifdef USE_MEM_TRACKER - DCHECK(doris::k_doris_exit || !doris::config::enable_memory_orphan_check || - thread_mem_tracker()->label() != "Orphan") - << doris::memory_orphan_check_msg; -#endif - thread_mem_tracker_mgr->consume(size, skip_large_memory_check); - } - - doris::Status try_reserve_memory(const int64_t size) const { -#ifdef USE_MEM_TRACKER - DCHECK(doris::k_doris_exit || !doris::config::enable_memory_orphan_check || - thread_mem_tracker()->label() != "Orphan") - << doris::memory_orphan_check_msg; -#endif - return thread_mem_tracker_mgr->try_reserve(size); - } - - void release_reserved_memory() const { -#ifdef USE_MEM_TRACKER - DCHECK(doris::k_doris_exit || !doris::config::enable_memory_orphan_check || - thread_mem_tracker()->label() != "Orphan") - << doris::memory_orphan_check_msg; -#endif - thread_mem_tracker_mgr->release_reserved(); - } - - std::weak_ptr<WorkloadGroup> workload_group() { return _wg_wptr; } - - std::shared_ptr<IOThrottle> get_local_scan_io_throttle(const std::string& data_dir) { - if (std::shared_ptr<WorkloadGroup> wg_ptr = _wg_wptr.lock()) { - return wg_ptr->get_local_scan_io_throttle(data_dir); - } - return nullptr; - } - - std::shared_ptr<IOThrottle> get_remote_scan_io_throttle() { - if (std::shared_ptr<WorkloadGroup> wg_ptr = _wg_wptr.lock()) { - return wg_ptr->get_remote_scan_io_throttle(); - } - return nullptr; - } - - void update_local_scan_io(std::string path, size_t bytes_read) { - if (std::shared_ptr<WorkloadGroup> wg_ptr = _wg_wptr.lock()) { - wg_ptr->update_local_scan_io(path, bytes_read); - } - } - - void update_remote_scan_io(size_t bytes_read) { - if (std::shared_ptr<WorkloadGroup> wg_ptr = _wg_wptr.lock()) { - wg_ptr->update_remote_scan_io(bytes_read); - } - } - - int thread_local_handle_count = 0; - int skip_memory_check = 0; - int skip_large_memory_check = 0; - -private: - TUniqueId _task_id; - std::weak_ptr<WorkloadGroup> _wg_wptr; -}; - -class ThreadLocalHandle { -public: - static void create_thread_local_if_not_exits() { - if (bthread_self() == 0) { - if (!pthread_context_ptr_init) { - thread_context_ptr = new ThreadContext(); - pthread_context_ptr_init = true; - } - DCHECK(thread_context_ptr != nullptr); - thread_context_ptr->thread_local_handle_count++; - } else { - // Avoid calling bthread_getspecific frequently to get bthread local. - // Very frequent bthread_getspecific will slow, but create_thread_local_if_not_exits is not expected to be much. - // Cache the pointer of bthread local in pthead local. - auto* bthread_context = static_cast<ThreadContext*>(bthread_getspecific(btls_key)); - if (bthread_context == nullptr) { - // If bthread_context == nullptr: - // 1. First call to bthread_getspecific (and before any bthread_setspecific) returns NULL - // 2. There are not enough reusable btls in btls pool. - // else if bthread_context != nullptr: - // 1. A new bthread starts, but get a reuses btls. - bthread_context = new ThreadContext; - // The brpc server should respond as quickly as possible. - bthread_context->thread_mem_tracker_mgr->disable_wait_gc(); - // set the data so that next time bthread_getspecific in the thread returns the data. - CHECK(0 == bthread_setspecific(btls_key, bthread_context) || doris::k_doris_exit); - } - DCHECK(bthread_context != nullptr); - bthread_context->thread_local_handle_count++; - } - } - - // `create_thread_local_if_not_exits` and `del_thread_local_if_count_is_zero` should be used in pairs, - // `del_thread_local_if_count_is_zero` should only be called if `create_thread_local_if_not_exits` returns true - static void del_thread_local_if_count_is_zero() { - if (pthread_context_ptr_init) { - // in pthread - thread_context_ptr->thread_local_handle_count--; - if (thread_context_ptr->thread_local_handle_count == 0) { - pthread_context_ptr_init = false; - delete doris::thread_context_ptr; - thread_context_ptr = nullptr; - } - } else if (bthread_self() != 0) { - // in bthread - auto* bthread_context = static_cast<ThreadContext*>(bthread_getspecific(btls_key)); - DCHECK(bthread_context != nullptr); - bthread_context->thread_local_handle_count--; - } else { - throw Exception(Status::FatalError("__builtin_unreachable")); - } - } -}; - -// must call create_thread_local_if_not_exits() before use thread_context(). -static ThreadContext* thread_context(bool allow_return_null = false) { - if (pthread_context_ptr_init) { - // in pthread - DCHECK(bthread_self() == 0); - DCHECK(thread_context_ptr != nullptr); - return thread_context_ptr; - } - if (bthread_self() != 0) { - // in bthread - // bthread switching pthread may be very frequent, remember not to use lock or other time-consuming operations. - auto* bthread_context = static_cast<ThreadContext*>(bthread_getspecific(btls_key)); - DCHECK(bthread_context != nullptr && bthread_context->thread_local_handle_count > 0); - return bthread_context; - } - if (allow_return_null) { - return nullptr; - } - // It means that use thread_context() but this thread not attached a query/load using SCOPED_ATTACH_TASK macro. - throw Exception( - Status::FatalError("__builtin_unreachable, {}", doris::memory_orphan_check_msg)); -} - -// belong to one query object member, not be shared by multiple queries. -class QueryThreadContext { -public: - QueryThreadContext() = default; - QueryThreadContext(const TUniqueId& query_id, - const std::shared_ptr<MemTrackerLimiter>& mem_tracker, - const std::weak_ptr<WorkloadGroup>& wg_wptr) - : query_id(query_id), query_mem_tracker(mem_tracker), wg_wptr(wg_wptr) {} - // If use WorkloadGroup and can get WorkloadGroup ptr, must as a parameter. - QueryThreadContext(const TUniqueId& query_id, - const std::shared_ptr<MemTrackerLimiter>& mem_tracker) - : query_id(query_id), query_mem_tracker(mem_tracker) {} - - // Not thread safe, generally be called in class constructor, shared_ptr use_count may be - // wrong when called by multiple threads, cause crash after object be destroyed prematurely. - void init_unlocked() { -#ifndef BE_TEST - ORPHAN_TRACKER_CHECK(); - query_id = doris::thread_context()->task_id(); - query_mem_tracker = doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker(); - wg_wptr = doris::thread_context()->workload_group(); -#else - query_id = TUniqueId(); - query_mem_tracker = doris::ExecEnv::GetInstance()->orphan_mem_tracker(); -#endif - } - - std::shared_ptr<MemTrackerLimiter> get_memory_tracker() { return query_mem_tracker; } - - WorkloadGroupPtr get_workload_group_ptr() { return wg_wptr.lock(); } - - TUniqueId query_id; - std::shared_ptr<MemTrackerLimiter> query_mem_tracker; - std::weak_ptr<WorkloadGroup> wg_wptr; -}; - class ScopedPeakMem { public: explicit ScopedPeakMem(int64* peak_mem) : _peak_mem(peak_mem), _mem_tracker("ScopedPeakMem") { @@ -446,52 +175,17 @@ public: class AttachTask { public: - // not query or load, initialize with memory tracker, empty query id and default normal workload group. - explicit AttachTask(const std::shared_ptr<MemTrackerLimiter>& mem_tracker); - - // is query or load, initialize with memory tracker, query id and workload group wptr. - explicit AttachTask(RuntimeState* runtime_state); - - explicit AttachTask(QueryContext* query_ctx); - - explicit AttachTask(const QueryThreadContext& query_thread_context); - - void init(const QueryThreadContext& query_thread_context); + explicit AttachTask(const std::shared_ptr<ResourceContext>& rc); ~AttachTask(); }; class SwitchThreadMemTrackerLimiter { public: - explicit SwitchThreadMemTrackerLimiter( - const std::shared_ptr<doris::MemTrackerLimiter>& mem_tracker) { - DCHECK(mem_tracker); - doris::ThreadLocalHandle::create_thread_local_if_not_exits(); - if (mem_tracker != thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()) { - _old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker(); - thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker); - } - } - - explicit SwitchThreadMemTrackerLimiter(const doris::QueryThreadContext& query_thread_context) { - doris::ThreadLocalHandle::create_thread_local_if_not_exits(); - DCHECK(thread_context()->task_id() == - query_thread_context.query_id); // workload group alse not change - DCHECK(query_thread_context.query_mem_tracker); - if (query_thread_context.query_mem_tracker != - thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()) { - _old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker(); - thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker( - query_thread_context.query_mem_tracker); - } - } + explicit SwitchThreadMemTrackerLimiter(const std::shared_ptr<doris::MemTrackerLimiter>& mem_tracker); + explicit SwitchThreadMemTrackerLimiter(ResourceContext* rc); - ~SwitchThreadMemTrackerLimiter() { - if (_old_mem_tracker != nullptr) { - thread_context()->thread_mem_tracker_mgr->detach_limiter_tracker(_old_mem_tracker); - } - doris::ThreadLocalHandle::del_thread_local_if_count_is_zero(); - } + ~SwitchThreadMemTrackerLimiter(); private: std::shared_ptr<doris::MemTrackerLimiter> _old_mem_tracker {nullptr}; @@ -526,11 +220,11 @@ class ScopeSkipMemoryCheck { public: explicit ScopeSkipMemoryCheck() { ThreadLocalHandle::create_thread_local_if_not_exits(); - doris::thread_context()->skip_memory_check++; + doris::thread_context()->thread_mem_tracker_mgr->skip_memory_check++; } ~ScopeSkipMemoryCheck() { - doris::thread_context()->skip_memory_check--; + doris::thread_context()->thread_mem_tracker_mgr->skip_memory_check--; ThreadLocalHandle::del_thread_local_if_count_is_zero(); } }; @@ -540,14 +234,18 @@ public: // used to fix the tracking accuracy of caches. #define THREAD_MEM_TRACKER_TRANSFER_TO(size, tracker) \ do { \ - ORPHAN_TRACKER_CHECK(); \ + DCHECK(doris::k_doris_exit || !doris::config::enable_memory_orphan_check || \ + doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->label() != "Orphan") \ + << doris::memory_orphan_check_msg; \ doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->transfer_to( \ size, tracker); \ } while (0) #define THREAD_MEM_TRACKER_TRANSFER_FROM(size, tracker) \ do { \ - ORPHAN_TRACKER_CHECK(); \ + DCHECK(doris::k_doris_exit || !doris::config::enable_memory_orphan_check || \ + doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->label() != "Orphan") \ + << doris::memory_orphan_check_msg; \ tracker->transfer_to( \ size, doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()); \ } while (0) @@ -556,20 +254,20 @@ public: #define CONSUME_THREAD_MEM_TRACKER_BY_HOOK(size) \ do { \ if (doris::use_mem_hook) { \ - doris::thread_context()->consume_memory(size); \ + doris::thread_context()->thread_mem_tracker_mgr->consume(size); \ } \ } while (0) #define RELEASE_THREAD_MEM_TRACKER_BY_HOOK(size) CONSUME_THREAD_MEM_TRACKER_BY_HOOK(-size) #define CONSUME_THREAD_MEM_TRACKER_BY_HOOK_WITH_FN(size_fn, ...) \ do { \ if (doris::use_mem_hook) { \ - doris::thread_context()->consume_memory(size_fn(__VA_ARGS__)); \ + doris::thread_context()->thread_mem_tracker_mgr->consume(size_fn(__VA_ARGS__)); \ } \ } while (0) #define RELEASE_THREAD_MEM_TRACKER_BY_HOOK_WITH_FN(size_fn, ...) \ do { \ if (doris::use_mem_hook) { \ - doris::thread_context()->consume_memory(-size_fn(__VA_ARGS__)); \ + doris::thread_context()->thread_mem_tracker_mgr->consume(-size_fn(__VA_ARGS__)); \ } \ } while (0) @@ -582,12 +280,13 @@ public: } \ if (doris::pthread_context_ptr_init) { \ DCHECK(bthread_self() == 0); \ - doris::thread_context_ptr->consume_memory(size); \ + doris::thread_context_ptr->thread_mem_tracker_mgr->consume(size); \ } else if (bthread_self() != 0) { \ static_cast<doris::ThreadContext*>(bthread_getspecific(doris::btls_key)) \ - ->consume_memory(size); \ + ->thread_mem_tracker_mgr->consume(size); \ } else if (doris::ExecEnv::ready()) { \ - MEMORY_ORPHAN_CHECK(); \ + DCHECK(doris::k_doris_exit || !doris::config::enable_memory_orphan_check) \ + << doris::memory_orphan_check_msg; \ doris::ExecEnv::GetInstance()->orphan_mem_tracker()->consume_no_update_peak(size); \ } \ } while (0) diff --git a/be/src/runtime/thread_context_impl.h b/be/src/runtime/thread_context_impl.h new file mode 100644 index 00000000000..4cd7a4baad9 --- /dev/null +++ b/be/src/runtime/thread_context_impl.h @@ -0,0 +1,158 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <bthread/bthread.h> +#include <bthread/types.h> + +#include <string> +#include <thread> + +#include "common/exception.h" +#include "runtime/workload_management/resource_context.h" +#include "runtime/memory/thread_mem_tracker_mgr.h" + +namespace doris { + +class ThreadContext; + +extern bthread_key_t btls_key; + +// Is true after ThreadContext construction. +inline thread_local bool pthread_context_ptr_init = false; +inline thread_local constinit ThreadContext* thread_context_ptr = nullptr; + +// The thread context saves some info about a working thread. +// 2 required info: +// 1. thread_id: Current thread id, Auto generated. +// 2. type(abolished): The type is a enum value indicating which type of task current thread is running. +// For example: QUERY, LOAD, COMPACTION, ... +// 3. task id: A unique id to identify this task. maybe query id, load job id, etc. +// 4. ThreadMemTrackerMgr +// +// There may be other optional info to be added later. +class ThreadContext { +public: + ThreadContext() { thread_mem_tracker_mgr = std::make_unique<ThreadMemTrackerMgr>(); } + + ~ThreadContext() = default; + + void attach_task(const std::shared_ptr<ResourceContext>& rc) { + resource_ctx = rc; + thread_mem_tracker_mgr->attach_task(rc->memory_context()->memtracker_limiter(), rc->workload_group_context()->workload_group()); + } + + void detach_task() { + resource_ctx.reset(); + thread_mem_tracker_mgr->detach_task(); + } + + static std::string get_thread_id() { + std::stringstream ss; + ss << std::this_thread::get_id(); + return ss.str(); + } + // Note that if set global Memory Hook, After thread_mem_tracker_mgr is initialized, + // the current thread Hook starts to consume/release mem_tracker. + // the use of shared_ptr will cause a crash. The guess is that there is an + // intermediate state during the copy construction of shared_ptr. Shared_ptr is not equal + // to nullptr, but the object it points to is not initialized. At this time, when the memory + // is released somewhere, the hook is triggered to cause the crash. + std::unique_ptr<ThreadMemTrackerMgr> thread_mem_tracker_mgr; + std::shared_ptr<ResourceContext> resource_ctx; + int thread_local_handle_count = 0; +}; + +class ThreadLocalHandle { +public: + static void create_thread_local_if_not_exits() { + if (bthread_self() == 0) { + if (!pthread_context_ptr_init) { + thread_context_ptr = new ThreadContext(); + pthread_context_ptr_init = true; + } + DCHECK(thread_context_ptr != nullptr); + thread_context_ptr->thread_local_handle_count++; + } else { + // Avoid calling bthread_getspecific frequently to get bthread local. + // Very frequent bthread_getspecific will slow, but create_thread_local_if_not_exits is not expected to be much. + // Cache the pointer of bthread local in pthead local. + auto* bthread_context = static_cast<ThreadContext*>(bthread_getspecific(btls_key)); + if (bthread_context == nullptr) { + // If bthread_context == nullptr: + // 1. First call to bthread_getspecific (and before any bthread_setspecific) returns NULL + // 2. There are not enough reusable btls in btls pool. + // else if bthread_context != nullptr: + // 1. A new bthread starts, but get a reuses btls. + bthread_context = new ThreadContext; + // The brpc server should respond as quickly as possible. + bthread_context->thread_mem_tracker_mgr->disable_wait_gc(); + // set the data so that next time bthread_getspecific in the thread returns the data. + CHECK(0 == bthread_setspecific(btls_key, bthread_context) || doris::k_doris_exit); + } + DCHECK(bthread_context != nullptr); + bthread_context->thread_local_handle_count++; + } + } + + // `create_thread_local_if_not_exits` and `del_thread_local_if_count_is_zero` should be used in pairs, + // `del_thread_local_if_count_is_zero` should only be called if `create_thread_local_if_not_exits` returns true + static void del_thread_local_if_count_is_zero() { + if (pthread_context_ptr_init) { + // in pthread + thread_context_ptr->thread_local_handle_count--; + if (thread_context_ptr->thread_local_handle_count == 0) { + pthread_context_ptr_init = false; + delete doris::thread_context_ptr; + thread_context_ptr = nullptr; + } + } else if (bthread_self() != 0) { + // in bthread + auto* bthread_context = static_cast<ThreadContext*>(bthread_getspecific(btls_key)); + DCHECK(bthread_context != nullptr); + bthread_context->thread_local_handle_count--; + } else { + throw Exception(Status::FatalError("__builtin_unreachable")); + } + } +}; + +// must call create_thread_local_if_not_exits() before use thread_context(). +static ThreadContext* thread_context(bool allow_return_null = false) { + if (pthread_context_ptr_init) { + // in pthread + DCHECK(bthread_self() == 0); + DCHECK(thread_context_ptr != nullptr); + return thread_context_ptr; + } + if (bthread_self() != 0) { + // in bthread + // bthread switching pthread may be very frequent, remember not to use lock or other time-consuming operations. + auto* bthread_context = static_cast<ThreadContext*>(bthread_getspecific(btls_key)); + DCHECK(bthread_context != nullptr && bthread_context->thread_local_handle_count > 0); + return bthread_context; + } + if (allow_return_null) { + return nullptr; + } + // It means that use thread_context() but this thread not attached a query/load using SCOPED_ATTACH_TASK macro. + throw Exception( + Status::FatalError("__builtin_unreachable, {}", doris::memory_orphan_check_msg)); +} + +} // namespace doris diff --git a/be/src/runtime/workload_management/cpu_context.h b/be/src/runtime/workload_management/cpu_context.h index ba6681074eb..41574a5c3b1 100644 --- a/be/src/runtime/workload_management/cpu_context.h +++ b/be/src/runtime/workload_management/cpu_context.h @@ -17,16 +17,8 @@ #pragma once -#include <stddef.h> -#include <stdint.h> - -#include <atomic> -#include <memory> -#include <queue> -#include <shared_mutex> -#include <string> - -#include "common/status.h" +#include "common/factory_creator.h" +#include "util/runtime_profile.h" namespace doris { @@ -34,28 +26,55 @@ class CPUContext : public std::enable_shared_from_this<CPUContext> { ENABLE_FACTORY_CREATOR(CPUContext); public: - // Used to collect cpu execution stats. - // The stats is not thread safe. - // For example, you should use a seperate object for every scanner and do merge and reset - class CPUStats { - public: - // Should add some cpu stats relared method here. - void reset(); - void merge(CPUStats& stats); - std::string debug_string(); - }; + /* + * -------------------------------- + * | Property | + * -------------------------------- + * 1. operate them thread-safe. + * 2. all tasks are unified. + * 3. should not be operated frequently, use local variables to update Counter. + */ + + RuntimeProfile::Counter* cpu_cost_ms_counter_; + + RuntimeProfile* profile() { return profile_.get(); } + std::string debug_string() { return profile_->pretty_print(); } + + /* + * -------------------------------- + * | Action | + * -------------------------------- + */ -public: - CPUContext() {} - virtual ~CPUContext() = default; // Bind current thread to cgroup, only some load thread should do this. void bind_workload_group() { - // Call workload group method to bind current thread to cgroup + // TODO: Call workload group method to bind current thread to cgroup } - CPUStats* cpu_stats() { return &stats_; } + +protected: + CPUContext() { init_profile(); } + virtual ~CPUContext() = default; private: - CPUStats stats_; + void init_profile() { + profile_ = std::make_unique<RuntimeProfile>("MemoryContext"); + cpu_cost_ms_counter_ = ADD_COUNTER(profile_, "RevokeWaitTimeMs", TUnit::TIME_MS); + } + + // Used to collect memory execution stats. + std::unique_ptr<RuntimeProfile> profile_; +}; + +class QueryCPUContext : public CPUContext { + QueryCPUContext() = default; +}; + +class LoadCPUContext : public CPUContext { + LoadCPUContext() = default; +}; + +class CompactionCPUContext : public CPUContext { + CompactionCPUContext() = default; }; } // namespace doris diff --git a/be/src/runtime/workload_management/io_context.h b/be/src/runtime/workload_management/io_context.h index 9d34b1811db..7ffca1eca96 100644 --- a/be/src/runtime/workload_management/io_context.h +++ b/be/src/runtime/workload_management/io_context.h @@ -17,16 +17,9 @@ #pragma once -#include <stddef.h> -#include <stdint.h> - -#include <atomic> -#include <memory> -#include <queue> -#include <shared_mutex> -#include <string> - -#include "common/status.h" +#include "common/factory_creator.h" +#include "runtime/workload_management/io_throttle.h" +#include "util/runtime_profile.h" namespace doris { @@ -34,57 +27,70 @@ class IOContext : public std::enable_shared_from_this<IOContext> { ENABLE_FACTORY_CREATOR(IOContext); public: - // Used to collect io execution stats. - class IOStats { - public: - IOStats() = default; - virtual ~IOStats() = default; - void merge(IOStats stats); - void reset(); - std::string debug_string(); - int64_t scan_rows() { return scan_rows_; } - int64_t scan_bytes() { return scan_bytes_; } - int64_t scan_bytes_from_local_storage() { return scan_bytes_from_local_storage_; } - int64_t scan_bytes_from_remote_storage() { return scan_bytes_from_remote_storage_; } - int64_t returned_rows() { return returned_rows_; } - int64_t shuffle_send_bytes() { return shuffle_send_bytes_; } - int64_t shuffle_send_rows() { return shuffle_send_rows_; } - - int64_t incr_scan_rows(int64_t delta) { return scan_rows_ + delta; } - int64_t incr_scan_bytes(int64_t delta) { return scan_bytes_ + delta; } - int64_t incr_scan_bytes_from_local_storage(int64_t delta) { - return scan_bytes_from_local_storage_ + delta; - } - int64_t incr_scan_bytes_from_remote_storage(int64_t delta) { - return scan_bytes_from_remote_storage_ + delta; - } - int64_t incr_returned_rows(int64_t delta) { return returned_rows_ + delta; } - int64_t incr_shuffle_send_bytes(int64_t delta) { return shuffle_send_bytes_ + delta; } - int64_t incr_shuffle_send_rows(int64_t delta) { return shuffle_send_rows_ + delta; } - std::string debug_string(); - - private: - int64_t scan_rows_ = 0; - int64_t scan_bytes_ = 0; - int64_t scan_bytes_from_local_storage_ = 0; - int64_t scan_bytes_from_remote_storage_ = 0; - // number rows returned by query. - // only set once by result sink when closing. - int64_t returned_rows_ = 0; - int64_t shuffle_send_bytes_ = 0; - int64_t shuffle_send_rows_ = 0; - }; + /* + * -------------------------------- + * | Property | + * -------------------------------- + * 1. operate them thread-safe. + * 2. all tasks are unified. + * 3. should not be operated frequently, use local variables to update Counter. + */ + + RuntimeProfile::Counter* scan_rows_counter_; + RuntimeProfile::Counter* scan_bytes_counter_; + RuntimeProfile::Counter* scan_bytes_from_local_storage_counter_; + RuntimeProfile::Counter* scan_bytes_from_remote_storage_counter_; + // number rows returned by query. + // only set once by result sink when closing. + RuntimeProfile::Counter* returned_rows_counter_; + RuntimeProfile::Counter* shuffle_send_bytes_counter_; + RuntimeProfile::Counter* shuffle_send_rows_counter_; + RuntimeProfile* profile() { return profile_.get(); } + std::string debug_string() { return profile_->pretty_print(); } + + /* + * -------------------------------- + * | Action | + * -------------------------------- + */ -public: - IOContext() {} - virtual ~IOContext() = default; IOThrottle* io_throttle() { - // get io throttle from workload group + // TODO: get io throttle from workload group + return nullptr; } - IOStats* stats() { return &stats_; } + +protected: + IOContext() { init_profile(); } + virtual ~IOContext() = default; + private: - IOStats stats_; + void init_profile() { + profile_ = std::make_unique<RuntimeProfile>("MemoryContext"); + scan_rows_counter_ = ADD_COUNTER(profile_, "ScanRows", TUnit::UNIT); + scan_bytes_counter_ = ADD_COUNTER(profile_, "ScanBytes", TUnit::BYTES); + scan_bytes_from_local_storage_counter_ = ADD_COUNTER(profile_, "ScanBytesFromLocalStorage", TUnit::BYTES); + scan_bytes_from_remote_storage_counter_ = ADD_COUNTER(profile_, "ScanBytesFromRemoteStorage", TUnit::BYTES); + returned_rows_counter_ = ADD_COUNTER(profile_, "ReturnedRows", TUnit::UNIT); + shuffle_send_bytes_counter_ = ADD_COUNTER(profile_, "ShuffleSendBytes", TUnit::BYTES); + shuffle_send_rows_counter_ = ADD_COUNTER(profile_, "ShuffleSendRowsCounter_", TUnit::UNIT); + } + + // Used to collect memory execution stats. + std::unique_ptr<RuntimeProfile> profile_; }; +class QueryIOContext : public IOContext { + QueryIOContext() = default; +}; + +class LoadIOContext : public IOContext { + LoadIOContext() = default; +}; + +class CompactionIOContext : public IOContext { + CompactionIOContext() = default; +}; + + } // namespace doris diff --git a/be/src/runtime/workload_management/memory_context.h b/be/src/runtime/workload_management/memory_context.h index 3e3a067e840..979fa8f1966 100644 --- a/be/src/runtime/workload_management/memory_context.h +++ b/be/src/runtime/workload_management/memory_context.h @@ -17,60 +17,55 @@ #pragma once -#include <stddef.h> -#include <stdint.h> +#include <cstdint> -#include <atomic> -#include <memory> -#include <queue> -#include <shared_mutex> #include <string> +#include "common/factory_creator.h" #include "common/status.h" +#include "util/runtime_profile.h" namespace doris { +class MemTrackerLimiter; + class MemoryContext : public std::enable_shared_from_this<MemoryContext> { ENABLE_FACTORY_CREATOR(MemoryContext); public: - // Used to collect memory execution stats. - // The stats class is not thread safe, should not do concurrent modifications. - class MemoryStats { - public: - MemoryStats() = default; - virtual ~MemoryStats() = default; - void merge(MemoryStats& stats); - void reset(); - std::string debug_string(); - int64_t revoke_attempts() { return revoke_attempts_; } - int64_t revoke_wait_time_ms() { return revoke_wait_time_ms_; } - int64_t revoked_bytes() { return revoked_bytes_; } - int64_t max_peak_memory_bytes() { return max_peak_memory_bytes_; } - int64_t current_used_memory_bytes() { return current_used_memory_bytes_; } - - private: - // Maximum memory peak for all backends. - // only set once by result sink when closing. - int64_t max_peak_memory_bytes_ = 0; - int64_t current_used_memory_bytes_ = 0; - // The total number of times that the revoke method is called. - int64_t revoke_attempts_ = 0; - // The time that waiting for revoke finished. - int64_t revoke_wait_time_ms_ = 0; - // The revoked bytes - int64_t revoked_bytes_ = 0; - }; - -public: - MemoryContext(std::shared_ptr<MemtrackerLimiter> memtracker) - : memtracker_limiter_(memtracker) {} - - virtual ~MemoryContext() = default; - - MemtrackerLimiter* memtracker_limiter() { return memtracker_limiter_.get(); } - - MemoryStats* stats() { return &stats_; } + /* + * -------------------------------- + * | Property | + * -------------------------------- + * 1. operate them thread-safe. + * 2. all tasks are unified. + * 3. should not be operated frequently, use local variables to update Counter. + */ + + RuntimeProfile::Counter* current_memory_bytes_counter_; + RuntimeProfile::Counter* peak_memory_bytes_counter_; + // Maximum memory peak for all backends. + // only set once by result sink when closing. + RuntimeProfile::Counter* max_peak_memory_bytes_counter_; + // The total number of times that the revoke method is called. + RuntimeProfile::Counter* revoke_attempts_counter_; + // The time that waiting for revoke finished. + RuntimeProfile::Counter* revoke_wait_time_ms_counter_; + // The revoked bytes + RuntimeProfile::Counter* revoked_bytes_counter_; + + RuntimeProfile* profile() { return profile_.get(); } + std::shared_ptr<MemTrackerLimiter> memtracker_limiter() { return memtracker_limiter_; } + void set_memtracker_limiter(const std::shared_ptr<MemTrackerLimiter>& memtracker_limiter) { + memtracker_limiter_ = memtracker_limiter; + } + std::string debug_string() { return profile_->pretty_print(); } + + /* + * -------------------------------- + * | Action | + * -------------------------------- + */ // Following method is related with spill disk. // Compute the number of bytes could be released. @@ -85,9 +80,36 @@ public: virtual Status leave_arbitration(Status reason) { return Status::OK(); } +protected: + MemoryContext() { init_profile(); } + virtual ~MemoryContext() = default; + private: - MemoryStats stats_; - std::shared_ptr<MemtrackerLimiter> memtracker_limiter_; + void init_profile() { + profile_ = std::make_unique<RuntimeProfile>("MemoryContext"); + current_memory_bytes_counter_ = ADD_COUNTER(profile_, "CurrentMemoryBytes", TUnit::BYTES); + peak_memory_bytes_counter_ = ADD_COUNTER(profile_, "PeakMemoryBytes", TUnit::BYTES); + max_peak_memory_bytes_counter_ = ADD_COUNTER(profile_, "MaxPeakMemoryBytes", TUnit::BYTES); + revoke_attempts_counter_ = ADD_COUNTER(profile_, "RevokeAttempts", TUnit::UNIT); + revoke_wait_time_ms_counter_ = ADD_COUNTER(profile_, "RevokeWaitTimeMs", TUnit::TIME_MS); + revoked_bytes_counter_ = ADD_COUNTER(profile_, "RevokedBytes", TUnit::BYTES); + } + + // Used to collect memory execution stats. + std::unique_ptr<RuntimeProfile> profile_; + std::shared_ptr<MemTrackerLimiter> memtracker_limiter_; +}; + +class QueryMemoryContext : public MemoryContext { + QueryMemoryContext() = default; +}; + +class LoadMemoryContext : public MemoryContext { + LoadMemoryContext() = default; +}; + +class CompactionMemoryContext : public MemoryContext { + CompactionMemoryContext() = default; }; } // namespace doris diff --git a/be/src/runtime/workload_management/resource_context.h b/be/src/runtime/workload_management/resource_context.h index 29b21f7e719..613d3975e02 100644 --- a/be/src/runtime/workload_management/resource_context.h +++ b/be/src/runtime/workload_management/resource_context.h @@ -17,45 +17,19 @@ #pragma once -#include <stddef.h> -#include <stdint.h> - -#include <atomic> #include <memory> -#include <queue> -#include <shared_mutex> -#include <string> - -#include "common/status.h" +#include "common/multi_version.h" +#include "runtime/workload_group/workload_group.h" +#include "runtime/workload_management/cpu_context.h" +#include "runtime/workload_management/memory_context.h" +#include "runtime/workload_management/workload_group_context.h" +#include "runtime/workload_management/task_controller.h" +#include "runtime/workload_management/io_context.h" +#include "util/runtime_profile.h" +#include "common/factory_creator.h" namespace doris { -// Any task that allow cancel should implement this class. -class TaskController { - ENABLE_FACTORY_CREATOR(TaskController); - -public: - virtual Status cancel(Status cancel_reason) { return Status::OK(); } - virtual Status running_time(int64_t* running_time_msecs) { - *running_time_msecs = 0; - return Status::OK(); - } -}; - -class WorkloadGroupContext { - ENABLE_FACTORY_CREATOR(WorkloadGroupContext); - -public: - WorkloadGroupContext() = default; - virtual ~WorkloadGroupContext() = default; - - WorkloadGroupPtr workload_group() { return _workload_group; } - void set_workload_group(WorkloadGroupPtr wg) { _workload_group = wg; } - -private: - WorkloadGroupPtr _workload_group = nullptr; -}; - // Every task should have its own resource context. And BE may adjust the resource // context during running. // ResourceContext contains many contexts or controller, the task could implements their @@ -64,14 +38,16 @@ class ResourceContext : public std::enable_shared_from_this<ResourceContext> { ENABLE_FACTORY_CREATOR(ResourceContext); public: - ResourceContext() { - // These all default values, it may be reset. - cpu_context_ = std::make_shared<CPUContext>(); - memory_context_ = std::make_shared<MemoryContext>(); - io_context_ = std::make_shared<IOContext>(); - reclaimer_ = std::make_shared<ResourceReclaimer>(); + ResourceContext(const std::shared_ptr<CPUContext>& cpu_context, const std::shared_ptr<MemoryContext>& memory_context, const std::shared_ptr<IOContext>& io_context, const std::shared_ptr<WorkloadGroupContext>& workload_group_context, const std::shared_ptr<TaskController>& task_controller): + cpu_context_(cpu_context), memory_context_(memory_context), io_context_(io_context), workload_group_context_(workload_group_context), task_controller_(task_controller) { + refresh_resource_profile(); + } + ~ResourceContext() = default; + + template<typename TCPUContext, typename TMemoryContext, typename TIOContext, typename TWorkloadGroupContext, typename TTaskController> + static std::shared_ptr<ResourceContext> CreateResourceContext() { + return std::make_shared<ResourceContext>(TCPUContext::create_shared(), TMemoryContext::create_shared(), TIOContext::create_shared(), TWorkloadGroupContext::create_shared(), TTaskController::create_shared()); } - virtual ~ResourceContext() = default; // Only return the raw pointer to the caller, so that the caller should not save it to other variables. CPUContext* cpu_context() { return cpu_context_.get(); } @@ -80,16 +56,23 @@ public: WorkloadGroupContext* workload_group_context() { return workload_group_context_.get(); } TaskController* task_controller() { return task_controller_.get(); } - void set_cpu_context(std::shared_ptr<CPUContext> cpu_context) { cpu_context_ = cpu_context; } - void set_memory_context(std::shared_ptr<MemoryContext> memory_context) { - memory_context_ = memory_context; - } - void set_io_context(std::shared_ptr<IOContext> io_context) { io_context_ = io_context; } - void set_workload_group_context(std::shared_ptr<WorkloadGroupContext> wg_context) { - workload_group_context_ = wg_context; - } - void set_task_controller(std::shared_ptr<TaskController> task_controller) { - task_controller_ = task_controller; + const TUniqueId& task_id() const { return task_id_; } + RuntimeProfile* profile() { return const_cast<RuntimeProfile*>(resource_profile_.get().get()); } + std::string debug_string() { return resource_profile_.get()->pretty_print(); } + void refresh_resource_profile() { + std::unique_ptr<RuntimeProfile> resource_profile = std::make_unique<RuntimeProfile>("ResourceContext"); + + RuntimeProfile* cpu_profile = + resource_profile->create_child(cpu_context_->profile()->name(), true, false); + cpu_profile->merge(cpu_context_->profile()); + RuntimeProfile* memory_profile = + resource_profile->create_child(memory_context_->profile()->name(), true, false); + memory_profile->merge(memory_context_->profile()); + RuntimeProfile* io_profile = + resource_profile->create_child(io_context_->profile()->name(), true, false); + io_profile->merge(io_context_->profile()); + + resource_profile_.set(std::move(resource_profile)); } private: @@ -99,6 +82,10 @@ private: std::shared_ptr<IOContext> io_context_ = nullptr; std::shared_ptr<WorkloadGroupContext> workload_group_context_ = nullptr; std::shared_ptr<TaskController> task_controller_ = nullptr; + + TUniqueId task_id_; + MultiVersion<RuntimeProfile> resource_profile_; + }; } // namespace doris diff --git a/be/src/runtime/workload_management/task_controller.h b/be/src/runtime/workload_management/task_controller.h new file mode 100644 index 00000000000..26247eb11aa --- /dev/null +++ b/be/src/runtime/workload_management/task_controller.h @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "common/factory_creator.h" +#include "common/status.h" + +namespace doris { + +class TaskController { + ENABLE_FACTORY_CREATOR(TaskController); + +public: + TaskController() = default; + virtual ~TaskController() = default; + + virtual Status cancel(Status cancel_reason) { return Status::OK(); } + virtual Status running_time(int64_t* running_time_msecs) { + *running_time_msecs = 0; + return Status::OK(); + } +}; + +class QueryTaskController : public TaskController { + QueryTaskController() = default; +}; + +class LoadTaskController : public TaskController { + LoadTaskController() = default; +}; + +class CompactionTaskController : public TaskController { + CompactionTaskController() = default; +}; + +} diff --git a/be/src/runtime/workload_management/workload_group_context.h b/be/src/runtime/workload_management/workload_group_context.h new file mode 100644 index 00000000000..fd2fcedbbec --- /dev/null +++ b/be/src/runtime/workload_management/workload_group_context.h @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "runtime/workload_group/workload_group.h" +#include "common/factory_creator.h" + +namespace doris { + +class WorkloadGroupContext { + ENABLE_FACTORY_CREATOR(WorkloadGroupContext); + +public: + WorkloadGroupContext() = default; + virtual ~WorkloadGroupContext() = default; + + WorkloadGroupPtr workload_group() { return _workload_group; } + void set_workload_group(WorkloadGroupPtr wg) { _workload_group = wg; } + +private: + WorkloadGroupPtr _workload_group = nullptr; +}; + +} diff --git a/be/src/util/runtime_profile.h b/be/src/util/runtime_profile.h index 7130acbd2f9..6f38c81e931 100644 --- a/be/src/util/runtime_profile.h +++ b/be/src/util/runtime_profile.h @@ -433,6 +433,11 @@ public: // Prints the counters in a name: value format. // Does not hold locks when it makes any function calls. void pretty_print(std::ostream* s, const std::string& prefix = "") const; + std::string pretty_print() const { + std::stringstream ss; + pretty_print(&ss); + return ss.str(); + }; // Serializes profile to thrift. // Does not hold locks when it makes any function calls. diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp index e407dbbaab4..161374f5274 100644 --- a/be/src/vec/common/allocator.cpp +++ b/be/src/vec/common/allocator.cpp @@ -86,9 +86,9 @@ void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::sys_mem err_msg += fmt::format( "Allocator sys memory check failed: Cannot alloc:{}, consuming " "tracker:<{}>, peak used {}, current used {}, exec node:<{}>, {}.", - size, doris::thread_context()->thread_mem_tracker()->label(), - doris::thread_context()->thread_mem_tracker()->peak_consumption(), - doris::thread_context()->thread_mem_tracker()->consumption(), + size, doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->label(), + doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->peak_consumption(), + doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->consumption(), doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker_label(), doris::GlobalMemoryArbitrator::process_limit_exceeded_errmsg_str()); @@ -175,10 +175,10 @@ void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::memory_ if (doris::thread_context()->skip_memory_check != 0) { return; } - auto st = doris::thread_context()->thread_mem_tracker()->check_limit(size); + auto st = doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->check_limit(size); if (!st) { auto err_msg = fmt::format("Allocator mem tracker check failed, {}", st.to_string()); - doris::thread_context()->thread_mem_tracker()->print_log_usage(err_msg); + doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->print_log_usage(err_msg); // If the external catch, throw bad::alloc first, let the query actively cancel. Otherwise asynchronous cancel. if (doris::thread_context()->thread_mem_tracker_mgr->is_attach_query()) { doris::thread_context()->thread_mem_tracker_mgr->disable_wait_gc(); @@ -239,7 +239,7 @@ void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::add_add return; } #endif - doris::thread_context()->thread_mem_tracker()->add_address_sanitizers(buf, size); + doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->add_address_sanitizers(buf, size); } template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator> @@ -250,7 +250,7 @@ void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::remove_ return; } #endif - doris::thread_context()->thread_mem_tracker()->remove_address_sanitizers(buf, size); + doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()->remove_address_sanitizers(buf, size); } template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator> --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
