This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
     new b28d2d6e84 [cherrypick](memory) Add special counter for memtracker and 
fix thread create and destroy track (#17515)
b28d2d6e84 is described below

commit b28d2d6e842b88b6671cc12c29540fcf77dbdd77
Author: Xinyi Zou <[email protected]>
AuthorDate: Wed Mar 8 07:28:18 2023 +0800

    [cherrypick](memory) Add special counter for memtracker and fix thread 
create and destroy track (#17515)
    
    * [enhancement](memtracker) Add special counter for memtracker and fix 
thread create and destroy track #17301
    
    Add a special counter for memtracker, faster, but relaxed ordering and not 
accurate in real time
    Track thread create and destroy memory, which was previously removed due to 
performance loss and added back
---
 be/src/common/daemon.cpp                       | 15 +++++++
 be/src/common/daemon.h                         |  2 +
 be/src/runtime/memory/mem_tracker.cpp          | 23 +++++++---
 be/src/runtime/memory/mem_tracker.h            | 59 +++++++++++++++++++++++++-
 be/src/runtime/memory/mem_tracker_limiter.cpp  | 22 +++++++---
 be/src/runtime/memory/mem_tracker_limiter.h    |  7 ++-
 be/src/runtime/memory/thread_mem_tracker_mgr.h |  7 ---
 be/src/runtime/thread_context.h                | 49 ++++++++++++---------
 8 files changed, 139 insertions(+), 45 deletions(-)

diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp
index 01dbe847eb..d36bf08e8c 100644
--- a/be/src/common/daemon.cpp
+++ b/be/src/common/daemon.cpp
@@ -305,6 +305,13 @@ void Daemon::load_channel_tracker_refresh_thread() {
     }
 }
 
+void Daemon::memory_tracker_profile_refresh_thread() {
+    while 
(!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(50))) {
+        MemTracker::refresh_all_tracker_profile();
+        MemTrackerLimiter::refresh_all_tracker_profile();
+    }
+}
+
 /*
  * this thread will calculate some metrics at a fix interval(15 sec)
  * 1. push bytes per second
@@ -497,6 +504,11 @@ void Daemon::start() {
             [this]() { this->load_channel_tracker_refresh_thread(); },
             &_load_channel_tracker_refresh_thread);
     CHECK(st.ok()) << st;
+    st = Thread::create(
+            "Daemon", "memory_tracker_profile_refresh_thread",
+            [this]() { this->memory_tracker_profile_refresh_thread(); },
+            &_memory_tracker_profile_refresh_thread);
+    CHECK(st.ok()) << st;
 
     if (config::enable_metric_calculator) {
         st = Thread::create(
@@ -524,6 +536,9 @@ void Daemon::stop() {
     if (_load_channel_tracker_refresh_thread) {
         _load_channel_tracker_refresh_thread->join();
     }
+    if (_memory_tracker_profile_refresh_thread) {
+        _memory_tracker_profile_refresh_thread->join();
+    }
     if (_calculate_metrics_thread) {
         _calculate_metrics_thread->join();
     }
diff --git a/be/src/common/daemon.h b/be/src/common/daemon.h
index 2dc4fd9bc7..9880a3bc72 100644
--- a/be/src/common/daemon.h
+++ b/be/src/common/daemon.h
@@ -50,6 +50,7 @@ private:
     void memory_maintenance_thread();
     void memory_gc_thread();
     void load_channel_tracker_refresh_thread();
+    void memory_tracker_profile_refresh_thread();
     void calculate_metrics_thread();
 
     CountDownLatch _stop_background_threads_latch;
@@ -59,6 +60,7 @@ private:
     scoped_refptr<Thread> _memory_maintenance_thread;
     scoped_refptr<Thread> _memory_gc_thread;
     scoped_refptr<Thread> _load_channel_tracker_refresh_thread;
+    scoped_refptr<Thread> _memory_tracker_profile_refresh_thread;
     scoped_refptr<Thread> _calculate_metrics_thread;
 };
 } // namespace doris
diff --git a/be/src/runtime/memory/mem_tracker.cpp 
b/be/src/runtime/memory/mem_tracker.cpp
index 08582aafc6..305063f9b8 100644
--- a/be/src/runtime/memory/mem_tracker.cpp
+++ b/be/src/runtime/memory/mem_tracker.cpp
@@ -44,9 +44,8 @@ static std::vector<TrackerGroup> mem_tracker_pool(1000);
 MemTracker::MemTracker(const std::string& label, RuntimeProfile* profile, 
MemTrackerLimiter* parent,
                        const std::string& profile_counter_name)
         : _label(label) {
-    if (profile == nullptr) {
-        _consumption = 
std::make_shared<RuntimeProfile::HighWaterMarkCounter>(TUnit::BYTES);
-    } else {
+    _consumption = std::make_shared<MemCounter>();
+    if (profile != nullptr) {
         // By default, memory consumption is tracked via calls to 
consume()/release(), either to
         // the tracker itself or to one of its descendents. Alternatively, a 
consumption metric
         // can be specified, and then the metric's value is used as the 
consumption rather than
@@ -56,13 +55,14 @@ MemTracker::MemTracker(const std::string& label, 
RuntimeProfile* profile, MemTra
         // Other consumption metrics are used in trackers below the process 
level to account
         // for memory (such as free buffer pool buffers) that is not tracked 
by consume() and
         // release().
-        _consumption = profile->AddSharedHighWaterMarkCounter(COUNTER_NAME, 
TUnit::BYTES);
+        _profile_counter =
+                profile->AddSharedHighWaterMarkCounter(profile_counter_name, 
TUnit::BYTES);
     }
-    bind_parent(parent);
+    bind_parent(parent); // at the end
 }
 
 MemTracker::MemTracker(const std::string& label, MemTrackerLimiter* parent) : 
_label(label) {
-    _consumption = 
std::make_shared<RuntimeProfile::HighWaterMarkCounter>(TUnit::BYTES);
+    _consumption = std::make_shared<MemCounter>();
     bind_parent(parent);
 }
 
@@ -91,13 +91,22 @@ MemTracker::~MemTracker() {
     }
 }
 
+void MemTracker::refresh_all_tracker_profile() {
+    for (unsigned i = 0; i < mem_tracker_pool.size(); ++i) {
+        std::lock_guard<std::mutex> l(mem_tracker_pool[i].group_lock);
+        for (auto tracker : mem_tracker_pool[i].trackers) {
+            tracker->refresh_profile_counter();
+        }
+    }
+}
+
 MemTracker::Snapshot MemTracker::make_snapshot() const {
     Snapshot snapshot;
     snapshot.label = _label;
     snapshot.parent_label = _parent_label;
     snapshot.limit = -1;
     snapshot.cur_consumption = _consumption->current_value();
-    snapshot.peak_consumption = _consumption->value();
+    snapshot.peak_consumption = _consumption->peak_value();
     return snapshot;
 }
 
diff --git a/be/src/runtime/memory/mem_tracker.h 
b/be/src/runtime/memory/mem_tracker.h
index 0b4e54bb10..3fb4efafe1 100644
--- a/be/src/runtime/memory/mem_tracker.h
+++ b/be/src/runtime/memory/mem_tracker.h
@@ -43,6 +43,50 @@ public:
         int64_t peak_consumption = 0;
     };
 
+    // A counter that keeps track of the current and peak value seen.
+    // Relaxed ordering, not accurate in real time.
+    class MemCounter {
+    public:
+        MemCounter() : _current_value(0), _peak_value(0) {}
+
+        void add(int64_t delta) {
+            _current_value.fetch_add(delta, std::memory_order_relaxed);
+            update_peak();
+        }
+
+        void add_no_update_peak(int64_t delta) {
+            _current_value.fetch_add(delta, std::memory_order_relaxed);
+        }
+
+        bool try_add(int64_t delta, int64_t max) {
+            if (UNLIKELY(_current_value.load(std::memory_order_relaxed) + 
delta > max))
+                return false;
+            _current_value.fetch_add(delta, std::memory_order_relaxed);
+            update_peak();
+            return true;
+        }
+
+        void set(int64_t v) {
+            _current_value.store(v, std::memory_order_relaxed);
+            update_peak();
+        }
+
+        void update_peak() {
+            if (_current_value.load(std::memory_order_relaxed) >
+                _peak_value.load(std::memory_order_relaxed)) {
+                
_peak_value.store(_current_value.load(std::memory_order_relaxed),
+                                  std::memory_order_relaxed);
+            }
+        }
+
+        int64_t current_value() const { return 
_current_value.load(std::memory_order_relaxed); }
+        int64_t peak_value() const { return 
_peak_value.load(std::memory_order_relaxed); }
+
+    private:
+        std::atomic<int64_t> _current_value;
+        std::atomic<int64_t> _peak_value;
+    };
+
     // Creates and adds the tracker to the mem_tracker_pool.
     MemTracker(const std::string& label, RuntimeProfile* profile, 
MemTrackerLimiter* parent,
                const std::string& profile_counter_name);
@@ -63,15 +107,25 @@ public:
     const std::string& set_parent_label() const { return _parent_label; }
     // Returns the memory consumed in bytes.
     int64_t consumption() const { return _consumption->current_value(); }
-    int64_t peak_consumption() const { return _consumption->value(); }
+    int64_t peak_consumption() const { return _consumption->peak_value(); }
 
     void consume(int64_t bytes) {
         if (bytes == 0) return;
         _consumption->add(bytes);
     }
+    void consume_no_update_peak(int64_t bytes) { // need extreme fast
+        _consumption->add_no_update_peak(bytes);
+    }
     void release(int64_t bytes) { consume(-bytes); }
     void set_consumption(int64_t bytes) { _consumption->set(bytes); }
 
+    void refresh_profile_counter() {
+        if (_profile_counter) {
+            _profile_counter->set(_consumption->current_value());
+        }
+    }
+    static void refresh_all_tracker_profile();
+
 public:
     Snapshot make_snapshot() const;
     // Specify group_num from mem_tracker_pool to generate snapshot.
@@ -95,7 +149,8 @@ protected:
     // label used in the make snapshot, not guaranteed unique.
     std::string _label;
 
-    std::shared_ptr<RuntimeProfile::HighWaterMarkCounter> _consumption; // in 
bytes
+    std::shared_ptr<MemCounter> _consumption;
+    std::shared_ptr<RuntimeProfile::HighWaterMarkCounter> _profile_counter;
 
     // Tracker is located in group num in mem_tracker_pool
     int64_t _parent_group_num = 0;
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp 
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 3293c00914..0a054a7dfe 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -46,12 +46,13 @@ std::atomic<bool> 
MemTrackerLimiter::_enable_print_log_process_usage {true};
 bool MemTrackerLimiter::_oom_avoidance {true};
 
 MemTrackerLimiter::MemTrackerLimiter(Type type, const std::string& label, 
int64_t byte_limit,
-                                     RuntimeProfile* profile) {
+                                     RuntimeProfile* profile,
+                                     const std::string& profile_counter_name) {
     DCHECK_GE(byte_limit, -1);
-    if (profile == nullptr) {
-        _consumption = 
std::make_shared<RuntimeProfile::HighWaterMarkCounter>(TUnit::BYTES);
-    } else {
-        _consumption = profile->AddSharedHighWaterMarkCounter(COUNTER_NAME, 
TUnit::BYTES);
+    _consumption = std::make_shared<MemCounter>();
+    if (profile != nullptr) {
+        _profile_counter =
+                profile->AddSharedHighWaterMarkCounter(profile_counter_name, 
TUnit::BYTES);
     }
     _type = type;
     _label = label;
@@ -91,7 +92,7 @@ MemTracker::Snapshot MemTrackerLimiter::make_snapshot() const 
{
     snapshot.label = _label;
     snapshot.limit = _limit;
     snapshot.cur_consumption = _consumption->current_value();
-    snapshot.peak_consumption = _consumption->value();
+    snapshot.peak_consumption = _consumption->peak_value();
     return snapshot;
 }
 
@@ -110,6 +111,15 @@ void MemTrackerLimiter::refresh_global_counter() {
     }
 }
 
+void MemTrackerLimiter::refresh_all_tracker_profile() {
+    for (unsigned i = 0; i < mem_tracker_limiter_pool.size(); ++i) {
+        std::lock_guard<std::mutex> l(mem_tracker_limiter_pool[i].group_lock);
+        for (auto tracker : mem_tracker_limiter_pool[i].trackers) {
+            tracker->refresh_profile_counter();
+        }
+    }
+}
+
 void 
MemTrackerLimiter::make_process_snapshots(std::vector<MemTracker::Snapshot>* 
snapshots) {
     MemTrackerLimiter::refresh_global_counter();
     int64_t process_mem_sum = 0;
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h 
b/be/src/runtime/memory/mem_tracker_limiter.h
index 5e705d3516..bf4f49400a 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -72,7 +72,8 @@ public:
 public:
     // byte_limit equal to -1 means no consumption limit, only participate in 
process memory statistics.
     MemTrackerLimiter(Type type, const std::string& label = std::string(), 
int64_t byte_limit = -1,
-                      RuntimeProfile* profile = nullptr);
+                      RuntimeProfile* profile = nullptr,
+                      const std::string& profile_counter_name = 
"PeakMemoryUsage");
 
     ~MemTrackerLimiter();
 
@@ -124,6 +125,8 @@ public:
     }
 
     static void refresh_global_counter();
+    static void refresh_all_tracker_profile();
+
     Snapshot make_snapshot() const;
     // Returns a list of all the valid tracker snapshots.
     static void make_process_snapshots(std::vector<MemTracker::Snapshot>* 
snapshots);
@@ -214,7 +217,7 @@ private:
                 "failed alloc size {}, exceeded tracker:<{}>, limit {}, peak "
                 "used {}, current used {}",
                 print_bytes(bytes), exceed_tracker->label(), 
print_bytes(exceed_tracker->limit()),
-                print_bytes(exceed_tracker->_consumption->value()),
+                print_bytes(exceed_tracker->_consumption->peak_value()),
                 print_bytes(exceed_tracker->_consumption->current_value()));
     }
 
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h 
b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index 151f79cfcb..5898f4c338 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -39,13 +39,6 @@ public:
         if (_init) flush_untracked_mem<false, true>();
     }
 
-    // only for memory hook
-    static void consume_no_attach(int64_t size) {
-        if (ExecEnv::GetInstance()->initialized()) {
-            ExecEnv::GetInstance()->orphan_mem_tracker()->consume(size);
-        }
-    }
-
     void init();
 
     // After attach, the current thread Memory Hook starts to consume/release 
task mem_tracker
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index 038ed3e264..c2e66ba42e 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -339,31 +339,38 @@ private:
 // If the consume succeeds, the memory is actually allocated, otherwise an 
exception is thrown.
 // But the statistics of memory through TCMalloc new/delete Hook are after the 
memory is actually allocated,
 // which is different from the previous behavior.
-#define CONSUME_MEM_TRACKER(size)                                           \
-    do {                                                                    \
-        if (doris::thread_context_ptr.init) {                               \
-            doris::thread_context()->thread_mem_tracker_mgr->consume(size); \
-        }                                                                   \
+#define CONSUME_MEM_TRACKER(size)                                              
                    \
+    do {                                                                       
                    \
+        if (doris::thread_context_ptr.init) {                                  
                    \
+            doris::thread_context()->thread_mem_tracker_mgr->consume(size);    
                    \
+        } else if (doris::ExecEnv::GetInstance()->initialized()) {             
                    \
+            
doris::ExecEnv::GetInstance()->orphan_mem_tracker_raw()->consume_no_update_peak(size);
 \
+        }                                                                      
                    \
     } while (0)
 // NOTE, The LOG cannot be printed in the mem hook. If the LOG statement 
triggers the mem hook LOG,
 // the nested LOG may cause an unknown crash.
-#define TRY_CONSUME_MEM_TRACKER(size, fail_ret)                                
            \
-    do {                                                                       
            \
-        if (doris::thread_context_ptr.init) {                                  
            \
-            if (doris::enable_thread_catch_bad_alloc) {                        
            \
-                if 
(!doris::thread_context()->thread_mem_tracker_mgr->try_consume(size)) { \
-                    return fail_ret;                                           
            \
-                }                                                              
            \
-            } else {                                                           
            \
-                
doris::thread_context()->thread_mem_tracker_mgr->consume(size);            \
-            }                                                                  
            \
-        }                                                                      
            \
+#define TRY_CONSUME_MEM_TRACKER(size, fail_ret)                                
                    \
+    do {                                                                       
                    \
+        if (doris::thread_context_ptr.init) {                                  
                    \
+            if (doris::enable_thread_catch_bad_alloc) {                        
                    \
+                if 
(!doris::thread_context()->thread_mem_tracker_mgr->try_consume(size)) {         
\
+                    return fail_ret;                                           
                    \
+                }                                                              
                    \
+            } else {                                                           
                    \
+                
doris::thread_context()->thread_mem_tracker_mgr->consume(size);                 
   \
+            }                                                                  
                    \
+        } else if (doris::ExecEnv::GetInstance()->initialized()) {             
                    \
+            
doris::ExecEnv::GetInstance()->orphan_mem_tracker_raw()->consume_no_update_peak(size);
 \
+        }                                                                      
                    \
     } while (0)
-#define RELEASE_MEM_TRACKER(size)                                            \
-    do {                                                                     \
-        if (doris::thread_context_ptr.init) {                                \
-            doris::thread_context()->thread_mem_tracker_mgr->consume(-size); \
-        }                                                                    \
+#define RELEASE_MEM_TRACKER(size)                                              
              \
+    do {                                                                       
              \
+        if (doris::thread_context_ptr.init) {                                  
              \
+            doris::thread_context()->thread_mem_tracker_mgr->consume(-size);   
              \
+        } else if (doris::ExecEnv::GetInstance()->initialized()) {             
              \
+            
doris::ExecEnv::GetInstance()->orphan_mem_tracker_raw()->consume_no_update_peak(
 \
+                    -size);                                                    
              \
+        }                                                                      
              \
     } while (0)
 #else
 #define CONSUME_THREAD_MEM_TRACKER(size) (void)0


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to