This is an automated email from the ASF dual-hosted git repository.
zouxinyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 03538381a3 [enhancement](memory) MemCounter supports lock-free thread
safety (#19256)
03538381a3 is described below
commit 03538381a30a87a5211d13cb20f21d756c3f1869
Author: luozenglin <[email protected]>
AuthorDate: Wed May 10 02:24:07 2023 +0800
[enhancement](memory) MemCounter supports lock-free thread safety (#19256)
make try_add() and update_peak() thread-safe.
---
be/src/runtime/memory/mem_tracker.h | 41 ++++++++++++++++++++++++-------------
1 file changed, 27 insertions(+), 14 deletions(-)
diff --git a/be/src/runtime/memory/mem_tracker.h
b/be/src/runtime/memory/mem_tracker.h
index 60ac996a82..81f901879c 100644
--- a/be/src/runtime/memory/mem_tracker.h
+++ b/be/src/runtime/memory/mem_tracker.h
@@ -64,8 +64,8 @@ public:
MemCounter() : _current_value(0), _peak_value(0) {}
void add(int64_t delta) {
- _current_value.fetch_add(delta, std::memory_order_relaxed);
- update_peak();
+ auto value = _current_value.fetch_add(delta,
std::memory_order_relaxed) + delta;
+ update_peak(value);
}
void add_no_update_peak(int64_t delta) {
@@ -73,23 +73,30 @@ public:
}
bool try_add(int64_t delta, int64_t max) {
- if (UNLIKELY(_current_value.load(std::memory_order_relaxed) +
delta > max))
- return false;
- _current_value.fetch_add(delta, std::memory_order_relaxed);
- update_peak();
+ auto cur_val = _current_value.load(std::memory_order_relaxed);
+ auto new_val = 0;
+ do {
+ new_val = cur_val + delta;
+ if (UNLIKELY(new_val > max)) {
+ return false;
+ }
+ } while (UNLIKELY(!_current_value.compare_exchange_weak(cur_val,
new_val,
+
std::memory_order_relaxed)));
+ update_peak(new_val);
return true;
}
+ void sub(int64_t delta) { _current_value.fetch_sub(delta,
std::memory_order_relaxed); }
+
void set(int64_t v) {
_current_value.store(v, std::memory_order_relaxed);
- update_peak();
+ update_peak(v);
}
- void update_peak() {
- if (_current_value.load(std::memory_order_relaxed) >
- _peak_value.load(std::memory_order_relaxed)) {
-
_peak_value.store(_current_value.load(std::memory_order_relaxed),
- std::memory_order_relaxed);
+ void update_peak(int64_t value) {
+ auto pre_value = _peak_value.load(std::memory_order_relaxed);
+ while (value > pre_value && !_peak_value.compare_exchange_weak(
+ pre_value, value,
std::memory_order_relaxed)) {
}
}
@@ -124,13 +131,18 @@ public:
int64_t peak_consumption() const { return _consumption->peak_value(); }
void consume(int64_t bytes) {
- if (bytes == 0) return;
+ if (UNLIKELY(bytes == 0)) {
+ return;
+ }
_consumption->add(bytes);
}
+
void consume_no_update_peak(int64_t bytes) { // need extreme fast
_consumption->add_no_update_peak(bytes);
}
- void release(int64_t bytes) { consume(-bytes); }
+
+ void release(int64_t bytes) { _consumption->sub(bytes); }
+
void set_consumption(int64_t bytes) { _consumption->set(bytes); }
void refresh_profile_counter() {
@@ -138,6 +150,7 @@ public:
_profile_counter->set(_consumption->current_value());
}
}
+
static void refresh_all_tracker_profile();
public:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]