This is an automated email from the ASF dual-hosted git repository.

zouxinyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 03538381a3 [enhancement](memory) MemCounter supports lock-free thread 
safety (#19256)
03538381a3 is described below

commit 03538381a30a87a5211d13cb20f21d756c3f1869
Author: luozenglin <[email protected]>
AuthorDate: Wed May 10 02:24:07 2023 +0800

    [enhancement](memory) MemCounter supports lock-free thread safety (#19256)
    
    make try_add() and update_peak() thread-safe.
---
 be/src/runtime/memory/mem_tracker.h | 41 ++++++++++++++++++++++++-------------
 1 file changed, 27 insertions(+), 14 deletions(-)

diff --git a/be/src/runtime/memory/mem_tracker.h 
b/be/src/runtime/memory/mem_tracker.h
index 60ac996a82..81f901879c 100644
--- a/be/src/runtime/memory/mem_tracker.h
+++ b/be/src/runtime/memory/mem_tracker.h
@@ -64,8 +64,8 @@ public:
         MemCounter() : _current_value(0), _peak_value(0) {}
 
         void add(int64_t delta) {
-            _current_value.fetch_add(delta, std::memory_order_relaxed);
-            update_peak();
+            auto value = _current_value.fetch_add(delta, 
std::memory_order_relaxed) + delta;
+            update_peak(value);
         }
 
         void add_no_update_peak(int64_t delta) {
@@ -73,23 +73,30 @@ public:
         }
 
         bool try_add(int64_t delta, int64_t max) {
-            if (UNLIKELY(_current_value.load(std::memory_order_relaxed) + 
delta > max))
-                return false;
-            _current_value.fetch_add(delta, std::memory_order_relaxed);
-            update_peak();
+            auto cur_val = _current_value.load(std::memory_order_relaxed);
+            auto new_val = 0;
+            do {
+                new_val = cur_val + delta;
+                if (UNLIKELY(new_val > max)) {
+                    return false;
+                }
+            } while (UNLIKELY(!_current_value.compare_exchange_weak(cur_val, 
new_val,
+                                                                    
std::memory_order_relaxed)));
+            update_peak(new_val);
             return true;
         }
 
+        void sub(int64_t delta) { _current_value.fetch_sub(delta, 
std::memory_order_relaxed); }
+
         void set(int64_t v) {
             _current_value.store(v, std::memory_order_relaxed);
-            update_peak();
+            update_peak(v);
         }
 
-        void update_peak() {
-            if (_current_value.load(std::memory_order_relaxed) >
-                _peak_value.load(std::memory_order_relaxed)) {
-                
_peak_value.store(_current_value.load(std::memory_order_relaxed),
-                                  std::memory_order_relaxed);
+        void update_peak(int64_t value) {
+            auto pre_value = _peak_value.load(std::memory_order_relaxed);
+            while (value > pre_value && !_peak_value.compare_exchange_weak(
+                                                pre_value, value, 
std::memory_order_relaxed)) {
             }
         }
 
@@ -124,13 +131,18 @@ public:
     int64_t peak_consumption() const { return _consumption->peak_value(); }
 
     void consume(int64_t bytes) {
-        if (bytes == 0) return;
+        if (UNLIKELY(bytes == 0)) {
+            return;
+        }
         _consumption->add(bytes);
     }
+
     void consume_no_update_peak(int64_t bytes) { // need extreme fast
         _consumption->add_no_update_peak(bytes);
     }
-    void release(int64_t bytes) { consume(-bytes); }
+
+    void release(int64_t bytes) { _consumption->sub(bytes); }
+
     void set_consumption(int64_t bytes) { _consumption->set(bytes); }
 
     void refresh_profile_counter() {
@@ -138,6 +150,7 @@ public:
             _profile_counter->set(_consumption->current_value());
         }
     }
+
     static void refresh_all_tracker_profile();
 
 public:


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to