This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 62e02305233 [branch-2.1](memory) Add `ThreadMemTrackerMgr` BE UT 
(#37654)
62e02305233 is described below

commit 62e02305233b7a5dc2638bbacf396edc876f773f
Author: Xinyi Zou <[email protected]>
AuthorDate: Thu Jul 11 21:03:49 2024 +0800

    [branch-2.1](memory) Add `ThreadMemTrackerMgr` BE UT (#37654)
    
    ## Proposed changes
    
    pick #35518
---
 be/src/runtime/exec_env.h                          |   4 +-
 be/src/runtime/memory/thread_mem_tracker_mgr.h     |   8 +-
 be/src/runtime/thread_context.h                    |   6 -
 .../mem_tracker_test.cpp}                          |   2 +-
 .../runtime/memory/thread_mem_tracker_mgr_test.cpp | 455 +++++++++++++++++++++
 be/test/testutil/run_all_tests.cpp                 |   2 +
 6 files changed, 467 insertions(+), 10 deletions(-)

diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 41d8c740326..d877096aec2 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -263,7 +263,9 @@ public:
         this->_dummy_lru_cache = dummy_lru_cache;
     }
     void set_write_cooldown_meta_executors();
-
+    static void set_tracking_memory(bool tracking_memory) {
+        _s_tracking_memory.store(tracking_memory, std::memory_order_acquire);
+    }
 #endif
     LoadStreamMapPool* load_stream_map_pool() { return 
_load_stream_map_pool.get(); }
 
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h 
b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index 64c2190a149..9d36cd2d807 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -125,6 +125,9 @@ public:
                 fmt::to_string(consumer_tracker_buf));
     }
 
+    int64_t untracked_mem() const { return _untracked_mem; }
+    int64_t reserved_mem() const { return _reserved_mem; }
+
 private:
     // is false: ExecEnv::ready() = false when thread local is initialized
     bool _init = false;
@@ -190,7 +193,7 @@ inline void ThreadMemTrackerMgr::pop_consumer_tracker() {
 
 inline void ThreadMemTrackerMgr::consume(int64_t size, int 
skip_large_memory_check) {
     if (_reserved_mem != 0) {
-        if (_reserved_mem >= size) {
+        if (_reserved_mem > size) {
             // only need to subtract _reserved_mem, no need to consume 
MemTracker,
             // every time _reserved_mem is minus the sum of size >= 
SYNC_PROC_RESERVED_INTERVAL_BYTES,
             // subtract size from process global reserved memory,
@@ -208,7 +211,8 @@ inline void ThreadMemTrackerMgr::consume(int64_t size, int 
skip_large_memory_che
             }
             return;
         } else {
-            // reserved memory is insufficient, the remaining _reserved_mem is 
subtracted from this memory consumed,
+            // _reserved_mem <= size, reserved memory used done,
+            // the remaining _reserved_mem is subtracted from this memory 
consumed,
             // and reset _reserved_mem to 0, and subtract the remaining 
_reserved_mem from
             // process global reserved memory, this means that all reserved 
memory has been used by BE process.
             size -= _reserved_mem;
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index 72d3c8111f6..7a4695a4e98 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -156,14 +156,12 @@ public:
 
     void attach_task(const TUniqueId& task_id,
                      const std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
-#ifndef BE_TEST
         // will only attach_task at the beginning of the thread function, 
there should be no duplicate attach_task.
         DCHECK(mem_tracker);
         // Orphan is thread default tracker.
         DCHECK(thread_mem_tracker()->label() == "Orphan")
                 << ", thread mem tracker label: " << 
thread_mem_tracker()->label()
                 << ", attach mem tracker label: " << mem_tracker->label();
-#endif
         _task_id = task_id;
         thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker);
         thread_mem_tracker_mgr->set_query_id(_task_id);
@@ -374,9 +372,7 @@ public:
 class SwitchThreadMemTrackerLimiter {
 public:
     explicit SwitchThreadMemTrackerLimiter(const 
std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
-#ifndef BE_TEST
         DCHECK(mem_tracker);
-#endif
         ThreadLocalHandle::create_thread_local_if_not_exits();
         _old_mem_tracker = 
thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
         
thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker);
@@ -385,9 +381,7 @@ public:
     explicit SwitchThreadMemTrackerLimiter(const QueryThreadContext& 
query_thread_context) {
         ThreadLocalHandle::create_thread_local_if_not_exits();
         DCHECK(thread_context()->task_id() == query_thread_context.query_id);
-#ifndef BE_TEST
         DCHECK(query_thread_context.query_mem_tracker);
-#endif
         _old_mem_tracker = 
thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
         thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(
                 query_thread_context.query_mem_tracker);
diff --git a/be/test/runtime/mem_limit_test.cpp 
b/be/test/runtime/memory/mem_tracker_test.cpp
similarity index 97%
rename from be/test/runtime/mem_limit_test.cpp
rename to be/test/runtime/memory/mem_tracker_test.cpp
index e6630b1432d..49f6aa3bf0c 100644
--- a/be/test/runtime/mem_limit_test.cpp
+++ b/be/test/runtime/memory/mem_tracker_test.cpp
@@ -38,7 +38,7 @@ TEST(MemTrackerTest, SingleTrackerNoLimit) {
     t->release(5);
 }
 
-TEST(MemTestTest, SingleTrackerWithLimit) {
+TEST(MemTrackerTest, SingleTrackerWithLimit) {
     auto t = 
std::make_unique<MemTrackerLimiter>(MemTrackerLimiter::Type::GLOBAL, "limit 
tracker",
                                                  11);
     EXPECT_TRUE(t->has_limit());
diff --git a/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp 
b/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp
new file mode 100644
index 00000000000..29c2759fcb7
--- /dev/null
+++ b/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp
@@ -0,0 +1,455 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/memory/thread_mem_tracker_mgr.h"
+
+#include <gtest/gtest-message.h>
+#include <gtest/gtest-test-part.h>
+
+#include "gtest/gtest_pred_impl.h"
+#include "runtime/memory/mem_tracker_limiter.h"
+#include "runtime/thread_context.h"
+
+namespace doris {
+
+TEST(ThreadMemTrackerMgrTest, ConsumeMemory) {
+    std::unique_ptr<ThreadContext> thread_context = 
std::make_unique<ThreadContext>();
+    std::shared_ptr<MemTrackerLimiter> t =
+            MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER, 
"UT-ConsumeMemory");
+
+    int64_t size1 = 4 * 1024;
+    int64_t size2 = 4 * 1024 * 1024;
+
+    thread_context->attach_task(TUniqueId(), t);
+    thread_context->consume_memory(size1);
+    // size1 < config::mem_tracker_consume_min_size_bytes, not consume mem 
tracker.
+    EXPECT_EQ(t->consumption(), 0);
+
+    thread_context->consume_memory(size2);
+    // size1 + size2 > onfig::mem_tracker_consume_min_size_bytes, consume mem 
tracker.
+    EXPECT_EQ(t->consumption(), size1 + size2);
+
+    thread_context->consume_memory(-size1);
+    // std::abs(-size1) < config::mem_tracker_consume_min_size_bytes, not 
consume mem tracker.
+    EXPECT_EQ(t->consumption(), size1 + size2);
+
+    thread_context->thread_mem_tracker_mgr->flush_untracked_mem();
+    EXPECT_EQ(t->consumption(), size2);
+
+    thread_context->consume_memory(-size2);
+    // std::abs(-size2) > onfig::mem_tracker_consume_min_size_bytes, consume 
mem tracker.
+    EXPECT_EQ(t->consumption(), 0);
+
+    thread_context->consume_memory(-size2);
+    EXPECT_EQ(t->consumption(), -size2);
+
+    thread_context->consume_memory(-size1);
+    EXPECT_EQ(t->consumption(), -size2);
+
+    thread_context->consume_memory(size1);
+    thread_context->consume_memory(size2);
+    thread_context->consume_memory(size2 * 2);
+    thread_context->consume_memory(size2 * 10);
+    thread_context->consume_memory(size2 * 100);
+    thread_context->consume_memory(size2 * 1000);
+    thread_context->consume_memory(size2 * 10000);
+    thread_context->consume_memory(-size2 * 2);
+    thread_context->consume_memory(-size2 * 10);
+    thread_context->consume_memory(-size2 * 100);
+    thread_context->consume_memory(-size2 * 1000);
+    thread_context->consume_memory(-size2 * 10000);
+    thread_context->detach_task();
+    EXPECT_EQ(t->consumption(), 0); // detach automatic call 
flush_untracked_mem.
+}
+
+TEST(ThreadMemTrackerMgrTest, Boundary) {
+    // TODO, Boundary check may not be necessary, add some `IF` maybe increase 
cost time.
+}
+
+TEST(ThreadMemTrackerMgrTest, NestedSwitchMemTracker) {
+    std::unique_ptr<ThreadContext> thread_context = 
std::make_unique<ThreadContext>();
+    std::shared_ptr<MemTrackerLimiter> t1 = MemTrackerLimiter::create_shared(
+            MemTrackerLimiter::Type::OTHER, "UT-NestedSwitchMemTracker1");
+    std::shared_ptr<MemTrackerLimiter> t2 = MemTrackerLimiter::create_shared(
+            MemTrackerLimiter::Type::OTHER, "UT-NestedSwitchMemTracker2");
+    std::shared_ptr<MemTrackerLimiter> t3 = MemTrackerLimiter::create_shared(
+            MemTrackerLimiter::Type::OTHER, "UT-NestedSwitchMemTracker3");
+
+    int64_t size1 = 4 * 1024;
+    int64_t size2 = 4 * 1024 * 1024;
+
+    thread_context->attach_task(TUniqueId(), t1);
+    thread_context->consume_memory(size1);
+    thread_context->consume_memory(size2);
+    EXPECT_EQ(t1->consumption(), size1 + size2);
+
+    thread_context->consume_memory(size1);
+    thread_context->thread_mem_tracker_mgr->attach_limiter_tracker(t2);
+    EXPECT_EQ(t1->consumption(),
+              size1 + size2 + size1); // attach automatic call 
flush_untracked_mem.
+
+    thread_context->consume_memory(size1);
+    thread_context->consume_memory(size2);
+    thread_context->consume_memory(size1);
+    EXPECT_EQ(t1->consumption(), size1 + size2 + size1); // not changed, now 
consume t2
+    EXPECT_EQ(t2->consumption(), size1 + size2);
+
+    thread_context->thread_mem_tracker_mgr->detach_limiter_tracker(t1); // 
detach
+    EXPECT_EQ(t2->consumption(),
+              size1 + size2 + size1); // detach automatic call 
flush_untracked_mem.
+
+    thread_context->consume_memory(size2);
+    thread_context->consume_memory(size2);
+    EXPECT_EQ(t1->consumption(), size1 + size2 + size1 + size2 + size2);
+    EXPECT_EQ(t2->consumption(), size1 + size2 + size1); // not changed, now 
consume t1
+
+    thread_context->thread_mem_tracker_mgr->attach_limiter_tracker(t2);
+    thread_context->consume_memory(-size1);
+    thread_context->thread_mem_tracker_mgr->attach_limiter_tracker(t3);
+    thread_context->consume_memory(size1);
+    thread_context->consume_memory(size2);
+    thread_context->consume_memory(size1);
+    EXPECT_EQ(t1->consumption(), size1 + size2 + size1 + size2 + size2);
+    EXPECT_EQ(t2->consumption(), size1 + size2); // attach automatic call 
flush_untracked_mem.
+    EXPECT_EQ(t3->consumption(), size1 + size2);
+
+    thread_context->consume_memory(-size1);
+    thread_context->consume_memory(-size2);
+    thread_context->consume_memory(-size1);
+    EXPECT_EQ(t3->consumption(), size1);
+
+    thread_context->thread_mem_tracker_mgr->detach_limiter_tracker(t2); // 
detach
+    EXPECT_EQ(t1->consumption(), size1 + size2 + size1 + size2 + size2);
+    EXPECT_EQ(t2->consumption(), size1 + size2);
+    EXPECT_EQ(t3->consumption(), 0);
+
+    thread_context->consume_memory(-size1);
+    thread_context->consume_memory(-size2);
+    thread_context->consume_memory(-size1);
+    EXPECT_EQ(t1->consumption(), size1 + size2 + size1 + size2 + size2);
+    EXPECT_EQ(t2->consumption(), 0);
+
+    thread_context->thread_mem_tracker_mgr->detach_limiter_tracker(t1); // 
detach
+    EXPECT_EQ(t1->consumption(), size1 + size2 + size1 + size2 + size2);
+    EXPECT_EQ(t2->consumption(), -size1);
+
+    thread_context->consume_memory(-t1->consumption());
+    thread_context->detach_task(); // detach t1
+    EXPECT_EQ(t1->consumption(), 0);
+}
+
+TEST(ThreadMemTrackerMgrTest, MultiMemTracker) {
+    std::unique_ptr<ThreadContext> thread_context = 
std::make_unique<ThreadContext>();
+    std::shared_ptr<MemTrackerLimiter> t1 =
+            MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER, 
"UT-MultiMemTracker1");
+    std::shared_ptr<MemTracker> t2 = 
std::make_shared<MemTracker>("UT-MultiMemTracker2", t1.get());
+    std::shared_ptr<MemTracker> t3 = 
std::make_shared<MemTracker>("UT-MultiMemTracker3", t1.get());
+
+    int64_t size1 = 4 * 1024;
+    int64_t size2 = 4 * 1024 * 1024;
+
+    thread_context->attach_task(TUniqueId(), t1);
+    thread_context->consume_memory(size1);
+    thread_context->consume_memory(size2);
+    thread_context->consume_memory(size1);
+    EXPECT_EQ(t1->consumption(), size1 + size2);
+
+    bool rt = 
thread_context->thread_mem_tracker_mgr->push_consumer_tracker(t2.get());
+    EXPECT_EQ(rt, true);
+    EXPECT_EQ(t1->consumption(), size1 + size2);
+    EXPECT_EQ(t2->consumption(), -size1); // _untracked_mem = size1
+
+    thread_context->consume_memory(size2);
+    EXPECT_EQ(t1->consumption(), size1 + size2 + size1 + size2);
+    EXPECT_EQ(t2->consumption(), size2);
+
+    rt = 
thread_context->thread_mem_tracker_mgr->push_consumer_tracker(t2.get());
+    EXPECT_EQ(rt, false);
+    thread_context->consume_memory(size2);
+    EXPECT_EQ(t1->consumption(), size1 + size2 + size1 + size2 + size2);
+    EXPECT_EQ(t2->consumption(), size2 + size2);
+
+    rt = 
thread_context->thread_mem_tracker_mgr->push_consumer_tracker(t3.get());
+    EXPECT_EQ(rt, true);
+    thread_context->consume_memory(size1);
+    thread_context->consume_memory(size2);
+    thread_context->consume_memory(-size1); // _untracked_mem = -size1
+    EXPECT_EQ(t1->consumption(), size1 + size2 + size1 + size2 + size2 + size1 
+ size2);
+    EXPECT_EQ(t2->consumption(), size2 + size2 + size1 + size2);
+    EXPECT_EQ(t3->consumption(), size1 + size2);
+
+    thread_context->thread_mem_tracker_mgr->pop_consumer_tracker();
+    EXPECT_EQ(t1->consumption(), size1 + size2 + size1 + size2 + size2 + size1 
+ size2 - size1);
+    EXPECT_EQ(t2->consumption(), size2 + size2 + size1 + size2 - size1);
+    EXPECT_EQ(t3->consumption(), size1 + size2 - size1);
+
+    thread_context->consume_memory(-size2);
+    thread_context->consume_memory(size2);
+    thread_context->consume_memory(-size2);
+    thread_context->thread_mem_tracker_mgr->pop_consumer_tracker();
+    EXPECT_EQ(t1->consumption(),
+              size1 + size2 + size1 + size2 + size2 + size1 + size2 - size1 - 
size2);
+    EXPECT_EQ(t2->consumption(), size2 + size2 + size1 + size2 - size1 - 
size2);
+    EXPECT_EQ(t3->consumption(), size1 + size2 - size1);
+
+    thread_context->consume_memory(-t1->consumption());
+    thread_context->detach_task(); // detach t1
+    EXPECT_EQ(t1->consumption(), 0);
+    EXPECT_EQ(t2->consumption(), size2 + size2 + size1 + size2 - size1 - 
size2);
+    EXPECT_EQ(t3->consumption(), size1 + size2 - size1);
+}
+
+TEST(ThreadMemTrackerMgrTest, ScopedCount) {
+    std::unique_ptr<ThreadContext> thread_context = 
std::make_unique<ThreadContext>();
+    std::shared_ptr<MemTrackerLimiter> t1 =
+            MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER, 
"UT-ScopedCount");
+
+    int64_t size1 = 4 * 1024;
+    int64_t size2 = 4 * 1024 * 1024;
+
+    thread_context->attach_task(TUniqueId(), t1);
+    thread_context->thread_mem_tracker_mgr->start_count_scope_mem();
+    thread_context->consume_memory(size1);
+    thread_context->consume_memory(size2);
+    thread_context->consume_memory(size1);
+    thread_context->consume_memory(size2);
+    thread_context->consume_memory(size1);
+    int64_t scope_mem = 
thread_context->thread_mem_tracker_mgr->stop_count_scope_mem();
+    EXPECT_EQ(t1->consumption(), size1 + size2 + size1 + size2 + size1);
+    EXPECT_EQ(t1->consumption(), scope_mem);
+
+    thread_context->consume_memory(-size2);
+    thread_context->consume_memory(-size1);
+    thread_context->consume_memory(-size2);
+    EXPECT_EQ(t1->consumption(), size1 + size1);
+    EXPECT_EQ(scope_mem, size1 + size2 + size1 + size2 + size1);
+}
+
+TEST(ThreadMemTrackerMgrTest, ReserveMemory) {
+    std::unique_ptr<ThreadContext> thread_context = 
std::make_unique<ThreadContext>();
+    std::shared_ptr<MemTrackerLimiter> t =
+            MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER, 
"UT-ReserveMemory");
+
+    int64_t size1 = 4 * 1024;
+    int64_t size2 = 4 * 1024 * 1024;
+    int64_t size3 = size2 * 1024;
+
+    thread_context->attach_task(TUniqueId(), t);
+    thread_context->consume_memory(size1);
+    thread_context->consume_memory(size2);
+    EXPECT_EQ(t->consumption(), size1 + size2);
+
+    thread_context->try_reserve_memory(size3);
+    EXPECT_EQ(t->consumption(), size1 + size2 + size3);
+    EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3);
+
+    thread_context->consume_memory(size2);
+    thread_context->consume_memory(-size2);
+    thread_context->consume_memory(size2);
+    EXPECT_EQ(t->consumption(), size1 + size2 + size3);
+    EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 
- size2);
+
+    thread_context->consume_memory(-size1);
+    thread_context->consume_memory(-size1);
+    EXPECT_EQ(t->consumption(), size1 + size2 + size3);
+    // std::abs(-size1 - size1) < SYNC_PROC_RESERVED_INTERVAL_BYTES, not 
update process_reserved_memory.
+    EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 
- size2);
+
+    thread_context->consume_memory(size2 * 1023);
+    EXPECT_EQ(t->consumption(), size1 + size2 + size3);
+    EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size1 
+ size1);
+
+    std::cout << "11111 " << 
thread_context->thread_mem_tracker_mgr->untracked_mem() << ", "
+              << thread_context->thread_mem_tracker_mgr->reserved_mem() << 
std::endl;
+    thread_context->consume_memory(size1);
+    thread_context->consume_memory(size1);
+    std::cout << "2222 " << 
thread_context->thread_mem_tracker_mgr->untracked_mem() << ", "
+              << thread_context->thread_mem_tracker_mgr->reserved_mem() << 
std::endl;
+    std::cout << "3333 " << 
thread_context->thread_mem_tracker_mgr->untracked_mem() << ", "
+              << thread_context->thread_mem_tracker_mgr->reserved_mem() << 
std::endl;
+    // reserved memory used done
+    EXPECT_EQ(t->consumption(), size1 + size2 + size3);
+    EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), 0);
+
+    thread_context->consume_memory(size1);
+    thread_context->consume_memory(size2);
+    // no reserved memory, normal memory consumption
+    EXPECT_EQ(t->consumption(), size1 + size2 + size3 + size1 + size2);
+    EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), 0);
+
+    thread_context->consume_memory(-size3);
+    thread_context->consume_memory(-size1);
+    thread_context->consume_memory(-size2);
+    EXPECT_EQ(t->consumption(), size1 + size2);
+    EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), 0);
+
+    thread_context->try_reserve_memory(size3);
+    EXPECT_EQ(t->consumption(), size1 + size2 + size3);
+    EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3);
+
+    thread_context->consume_memory(-size1);
+    // ThreadMemTrackerMgr _reserved_mem = size3 + size1
+    // ThreadMemTrackerMgr _untracked_mem = -size1
+    thread_context->consume_memory(size3);
+    // ThreadMemTrackerMgr _reserved_mem = size1
+    // ThreadMemTrackerMgr _untracked_mem = -size1 + size3
+    EXPECT_EQ(t->consumption(), size1 + size2 + size3);
+    EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(),
+              size1); // size3 + size1 - size3
+
+    thread_context->consume_memory(-size3);
+    // ThreadMemTrackerMgr _reserved_mem = size1 + size3
+    // ThreadMemTrackerMgr _untracked_mem = 0, std::abs(-size3) > 
SYNC_PROC_RESERVED_INTERVAL_BYTES,
+    // so update process_reserved_memory.
+    EXPECT_EQ(t->consumption(), size1 + size2 + size3);
+    EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size1 
+ size3);
+
+    thread_context->consume_memory(size1);
+    thread_context->consume_memory(size2);
+    thread_context->consume_memory(size1);
+    // ThreadMemTrackerMgr _reserved_mem = size1 + size3 - size1 - size2 - 
size1 = size3 - size2 - size1
+    // ThreadMemTrackerMgr _untracked_mem = size1
+    EXPECT_EQ(t->consumption(), size1 + size2 + size3);
+    // size1 + size3 - (size1 + size2)
+    EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 
- size2);
+
+    thread_context->release_reserved_memory();
+    // size1 + size2 + size3 - _reserved_mem, size1 + size2 + size3 - (size3 - 
size2 - size1)
+    EXPECT_EQ(t->consumption(), size1 + size2 + size1 + size2);
+    // size3 - size2 - (_reserved_mem + _untracked_mem) = 0, size3 - size2 - 
((size3 - size2 - size1) + (size1)) = 0
+    EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), 0);
+
+    thread_context->detach_task();
+    EXPECT_EQ(t->consumption(), size1 + size2 + size1 + size2);
+    EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), 0);
+}
+
+TEST(ThreadMemTrackerMgrTest, NestedReserveMemory) {
+    std::unique_ptr<ThreadContext> thread_context = 
std::make_unique<ThreadContext>();
+    std::shared_ptr<MemTrackerLimiter> t = MemTrackerLimiter::create_shared(
+            MemTrackerLimiter::Type::OTHER, "UT-NestedReserveMemory");
+
+    int64_t size2 = 4 * 1024 * 1024;
+    int64_t size3 = size2 * 1024;
+
+    thread_context->attach_task(TUniqueId(), t);
+    thread_context->try_reserve_memory(size3);
+    EXPECT_EQ(t->consumption(), size3);
+    EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3);
+
+    thread_context->consume_memory(size2);
+    // ThreadMemTrackerMgr _reserved_mem = size3 - size2
+    // ThreadMemTrackerMgr _untracked_mem = 0, size2 > 
SYNC_PROC_RESERVED_INTERVAL_BYTES,
+    // update process_reserved_memory.
+    EXPECT_EQ(t->consumption(), size3);
+    EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 
- size2);
+
+    thread_context->try_reserve_memory(size2);
+    // ThreadMemTrackerMgr _reserved_mem = size3 - size2 + size2
+    // ThreadMemTrackerMgr _untracked_mem = 0
+    EXPECT_EQ(t->consumption(), size3 + size2);
+    EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(),
+              size3); // size3 - size2 + size2
+
+    thread_context->try_reserve_memory(size3);
+    thread_context->try_reserve_memory(size3);
+    thread_context->consume_memory(size3);
+    thread_context->consume_memory(size2);
+    thread_context->consume_memory(size3);
+    // ThreadMemTrackerMgr _reserved_mem = size3 - size2
+    // ThreadMemTrackerMgr _untracked_mem = 0
+    EXPECT_EQ(t->consumption(), size3 + size2 + size3 + size3);
+    EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 
- size2);
+
+    thread_context->release_reserved_memory();
+    // size3 + size2 + size3 + size3 - _reserved_mem, size3 + size2 + size3 + 
size3 - (size3 - size2)
+    EXPECT_EQ(t->consumption(), size3 + size2 + size3 + size2);
+    // size3 - size2 - (_reserved_mem + _untracked_mem) = 0, size3 - size2 - 
((size3 - size2 - size1) + (size1)) = 0
+    EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), 0);
+
+    thread_context->detach_task();
+    EXPECT_EQ(t->consumption(), size3 + size2 + size3 + size2);
+    EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), 0);
+}
+
+TEST(ThreadMemTrackerMgrTest, NestedSwitchMemTrackerReserveMemory) {
+    std::unique_ptr<ThreadContext> thread_context = 
std::make_unique<ThreadContext>();
+    std::shared_ptr<MemTrackerLimiter> t1 = MemTrackerLimiter::create_shared(
+            MemTrackerLimiter::Type::OTHER, 
"UT-NestedSwitchMemTrackerReserveMemory1");
+    std::shared_ptr<MemTrackerLimiter> t2 = MemTrackerLimiter::create_shared(
+            MemTrackerLimiter::Type::OTHER, 
"UT-NestedSwitchMemTrackerReserveMemory2");
+    std::shared_ptr<MemTrackerLimiter> t3 = MemTrackerLimiter::create_shared(
+            MemTrackerLimiter::Type::OTHER, 
"UT-NestedSwitchMemTrackerReserveMemory3");
+
+    int64_t size1 = 4 * 1024;
+    int64_t size2 = 4 * 1024 * 1024;
+    int64_t size3 = size2 * 1024;
+
+    thread_context->attach_task(TUniqueId(), t1);
+    thread_context->try_reserve_memory(size3);
+    thread_context->consume_memory(size2);
+    EXPECT_EQ(t1->consumption(), size3);
+    EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 
- size2);
+
+    thread_context->thread_mem_tracker_mgr->attach_limiter_tracker(t2);
+    thread_context->try_reserve_memory(size3);
+    EXPECT_EQ(t1->consumption(), size3);
+    EXPECT_EQ(t2->consumption(), size3);
+    EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 
- size2 + size3);
+
+    thread_context->consume_memory(size2 + size3); // reserved memory used done
+    EXPECT_EQ(t1->consumption(), size3);
+    EXPECT_EQ(t2->consumption(), size3 + size2);
+    EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 
- size2);
+
+    thread_context->thread_mem_tracker_mgr->attach_limiter_tracker(t3);
+    thread_context->try_reserve_memory(size3);
+    EXPECT_EQ(t1->consumption(), size3);
+    EXPECT_EQ(t2->consumption(), size3 + size2);
+    EXPECT_EQ(t3->consumption(), size3);
+    EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 
- size2 + size3);
+
+    thread_context->consume_memory(-size2);
+    thread_context->consume_memory(-size1);
+    // ThreadMemTrackerMgr _reserved_mem = size3 + size2 + size1
+    // ThreadMemTrackerMgr _untracked_mem = -size1
+    EXPECT_EQ(t1->consumption(), size3);
+    EXPECT_EQ(t2->consumption(), size3 + size2);
+    EXPECT_EQ(t3->consumption(), size3);
+    EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(),
+              size3 - size2 + size3 + size2);
+
+    thread_context->thread_mem_tracker_mgr->detach_limiter_tracker(t2); // 
detach
+    EXPECT_EQ(t1->consumption(), size3);
+    EXPECT_EQ(t2->consumption(), size3 + size2);
+    EXPECT_EQ(t3->consumption(), -size1 - size2); // size3 - _reserved_mem
+    //  size3 - size2 + size3 + size2 - (_reserved_mem + _untracked_mem)
+    EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 
- size2);
+
+    thread_context->thread_mem_tracker_mgr->detach_limiter_tracker(t1); // 
detach
+    EXPECT_EQ(t1->consumption(), size3);
+    // not changed, reserved memory used done.
+    EXPECT_EQ(t2->consumption(), size3 + size2);
+    EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3 
- size2);
+
+    thread_context->detach_task();
+    EXPECT_EQ(t1->consumption(), size2); // size3 - _reserved_mem
+    // size3 - size2 - (_reserved_mem + _untracked_mem)
+    EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), 0);
+}
+
+} // end namespace doris
diff --git a/be/test/testutil/run_all_tests.cpp 
b/be/test/testutil/run_all_tests.cpp
index de088f8d17b..75afdacd87b 100644
--- a/be/test/testutil/run_all_tests.cpp
+++ b/be/test/testutil/run_all_tests.cpp
@@ -69,6 +69,8 @@ int main(int argc, char** argv) {
     static_cast<void>(service->start());
     doris::global_test_http_host = "http://127.0.0.1:"; + 
std::to_string(service->get_real_port());
 
+    doris::ExecEnv::GetInstance()->set_tracking_memory(false);
+
     int res = RUN_ALL_TESTS();
     return res;
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to