This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 62e02305233 [branch-2.1](memory) Add `ThreadMemTrackerMgr` BE UT
(#37654)
62e02305233 is described below
commit 62e02305233b7a5dc2638bbacf396edc876f773f
Author: Xinyi Zou <[email protected]>
AuthorDate: Thu Jul 11 21:03:49 2024 +0800
[branch-2.1](memory) Add `ThreadMemTrackerMgr` BE UT (#37654)
## Proposed changes
pick #35518
---
be/src/runtime/exec_env.h | 4 +-
be/src/runtime/memory/thread_mem_tracker_mgr.h | 8 +-
be/src/runtime/thread_context.h | 6 -
.../mem_tracker_test.cpp} | 2 +-
.../runtime/memory/thread_mem_tracker_mgr_test.cpp | 455 +++++++++++++++++++++
be/test/testutil/run_all_tests.cpp | 2 +
6 files changed, 467 insertions(+), 10 deletions(-)
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 41d8c740326..d877096aec2 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -263,7 +263,9 @@ public:
this->_dummy_lru_cache = dummy_lru_cache;
}
void set_write_cooldown_meta_executors();
-
+ static void set_tracking_memory(bool tracking_memory) {
+ _s_tracking_memory.store(tracking_memory, std::memory_order_acquire);
+ }
#endif
LoadStreamMapPool* load_stream_map_pool() { return
_load_stream_map_pool.get(); }
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h
b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index 64c2190a149..9d36cd2d807 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -125,6 +125,9 @@ public:
fmt::to_string(consumer_tracker_buf));
}
+ int64_t untracked_mem() const { return _untracked_mem; }
+ int64_t reserved_mem() const { return _reserved_mem; }
+
private:
// is false: ExecEnv::ready() = false when thread local is initialized
bool _init = false;
@@ -190,7 +193,7 @@ inline void ThreadMemTrackerMgr::pop_consumer_tracker() {
inline void ThreadMemTrackerMgr::consume(int64_t size, int
skip_large_memory_check) {
if (_reserved_mem != 0) {
- if (_reserved_mem >= size) {
+ if (_reserved_mem > size) {
// only need to subtract _reserved_mem, no need to consume
MemTracker,
// every time _reserved_mem is minus the sum of size >=
SYNC_PROC_RESERVED_INTERVAL_BYTES,
// subtract size from process global reserved memory,
@@ -208,7 +211,8 @@ inline void ThreadMemTrackerMgr::consume(int64_t size, int
skip_large_memory_che
}
return;
} else {
- // reserved memory is insufficient, the remaining _reserved_mem is
subtracted from this memory consumed,
+ // _reserved_mem <= size, reserved memory used done,
+ // the remaining _reserved_mem is subtracted from this memory
consumed,
// and reset _reserved_mem to 0, and subtract the remaining
_reserved_mem from
// process global reserved memory, this means that all reserved
memory has been used by BE process.
size -= _reserved_mem;
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index 72d3c8111f6..7a4695a4e98 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -156,14 +156,12 @@ public:
void attach_task(const TUniqueId& task_id,
const std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
-#ifndef BE_TEST
// will only attach_task at the beginning of the thread function,
there should be no duplicate attach_task.
DCHECK(mem_tracker);
// Orphan is thread default tracker.
DCHECK(thread_mem_tracker()->label() == "Orphan")
<< ", thread mem tracker label: " <<
thread_mem_tracker()->label()
<< ", attach mem tracker label: " << mem_tracker->label();
-#endif
_task_id = task_id;
thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker);
thread_mem_tracker_mgr->set_query_id(_task_id);
@@ -374,9 +372,7 @@ public:
class SwitchThreadMemTrackerLimiter {
public:
explicit SwitchThreadMemTrackerLimiter(const
std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
-#ifndef BE_TEST
DCHECK(mem_tracker);
-#endif
ThreadLocalHandle::create_thread_local_if_not_exits();
_old_mem_tracker =
thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker);
@@ -385,9 +381,7 @@ public:
explicit SwitchThreadMemTrackerLimiter(const QueryThreadContext&
query_thread_context) {
ThreadLocalHandle::create_thread_local_if_not_exits();
DCHECK(thread_context()->task_id() == query_thread_context.query_id);
-#ifndef BE_TEST
DCHECK(query_thread_context.query_mem_tracker);
-#endif
_old_mem_tracker =
thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(
query_thread_context.query_mem_tracker);
diff --git a/be/test/runtime/mem_limit_test.cpp
b/be/test/runtime/memory/mem_tracker_test.cpp
similarity index 97%
rename from be/test/runtime/mem_limit_test.cpp
rename to be/test/runtime/memory/mem_tracker_test.cpp
index e6630b1432d..49f6aa3bf0c 100644
--- a/be/test/runtime/mem_limit_test.cpp
+++ b/be/test/runtime/memory/mem_tracker_test.cpp
@@ -38,7 +38,7 @@ TEST(MemTrackerTest, SingleTrackerNoLimit) {
t->release(5);
}
-TEST(MemTestTest, SingleTrackerWithLimit) {
+TEST(MemTrackerTest, SingleTrackerWithLimit) {
auto t =
std::make_unique<MemTrackerLimiter>(MemTrackerLimiter::Type::GLOBAL, "limit
tracker",
11);
EXPECT_TRUE(t->has_limit());
diff --git a/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp
b/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp
new file mode 100644
index 00000000000..29c2759fcb7
--- /dev/null
+++ b/be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp
@@ -0,0 +1,455 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/memory/thread_mem_tracker_mgr.h"
+
+#include <gtest/gtest-message.h>
+#include <gtest/gtest-test-part.h>
+
+#include "gtest/gtest_pred_impl.h"
+#include "runtime/memory/mem_tracker_limiter.h"
+#include "runtime/thread_context.h"
+
+namespace doris {
+
+TEST(ThreadMemTrackerMgrTest, ConsumeMemory) {
+ std::unique_ptr<ThreadContext> thread_context =
std::make_unique<ThreadContext>();
+ std::shared_ptr<MemTrackerLimiter> t =
+ MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER,
"UT-ConsumeMemory");
+
+ int64_t size1 = 4 * 1024;
+ int64_t size2 = 4 * 1024 * 1024;
+
+ thread_context->attach_task(TUniqueId(), t);
+ thread_context->consume_memory(size1);
+ // size1 < config::mem_tracker_consume_min_size_bytes, not consume mem
tracker.
+ EXPECT_EQ(t->consumption(), 0);
+
+ thread_context->consume_memory(size2);
+ // size1 + size2 > onfig::mem_tracker_consume_min_size_bytes, consume mem
tracker.
+ EXPECT_EQ(t->consumption(), size1 + size2);
+
+ thread_context->consume_memory(-size1);
+ // std::abs(-size1) < config::mem_tracker_consume_min_size_bytes, not
consume mem tracker.
+ EXPECT_EQ(t->consumption(), size1 + size2);
+
+ thread_context->thread_mem_tracker_mgr->flush_untracked_mem();
+ EXPECT_EQ(t->consumption(), size2);
+
+ thread_context->consume_memory(-size2);
+ // std::abs(-size2) > onfig::mem_tracker_consume_min_size_bytes, consume
mem tracker.
+ EXPECT_EQ(t->consumption(), 0);
+
+ thread_context->consume_memory(-size2);
+ EXPECT_EQ(t->consumption(), -size2);
+
+ thread_context->consume_memory(-size1);
+ EXPECT_EQ(t->consumption(), -size2);
+
+ thread_context->consume_memory(size1);
+ thread_context->consume_memory(size2);
+ thread_context->consume_memory(size2 * 2);
+ thread_context->consume_memory(size2 * 10);
+ thread_context->consume_memory(size2 * 100);
+ thread_context->consume_memory(size2 * 1000);
+ thread_context->consume_memory(size2 * 10000);
+ thread_context->consume_memory(-size2 * 2);
+ thread_context->consume_memory(-size2 * 10);
+ thread_context->consume_memory(-size2 * 100);
+ thread_context->consume_memory(-size2 * 1000);
+ thread_context->consume_memory(-size2 * 10000);
+ thread_context->detach_task();
+ EXPECT_EQ(t->consumption(), 0); // detach automatic call
flush_untracked_mem.
+}
+
+TEST(ThreadMemTrackerMgrTest, Boundary) {
+ // TODO, Boundary check may not be necessary, add some `IF` maybe increase
cost time.
+}
+
+TEST(ThreadMemTrackerMgrTest, NestedSwitchMemTracker) {
+ std::unique_ptr<ThreadContext> thread_context =
std::make_unique<ThreadContext>();
+ std::shared_ptr<MemTrackerLimiter> t1 = MemTrackerLimiter::create_shared(
+ MemTrackerLimiter::Type::OTHER, "UT-NestedSwitchMemTracker1");
+ std::shared_ptr<MemTrackerLimiter> t2 = MemTrackerLimiter::create_shared(
+ MemTrackerLimiter::Type::OTHER, "UT-NestedSwitchMemTracker2");
+ std::shared_ptr<MemTrackerLimiter> t3 = MemTrackerLimiter::create_shared(
+ MemTrackerLimiter::Type::OTHER, "UT-NestedSwitchMemTracker3");
+
+ int64_t size1 = 4 * 1024;
+ int64_t size2 = 4 * 1024 * 1024;
+
+ thread_context->attach_task(TUniqueId(), t1);
+ thread_context->consume_memory(size1);
+ thread_context->consume_memory(size2);
+ EXPECT_EQ(t1->consumption(), size1 + size2);
+
+ thread_context->consume_memory(size1);
+ thread_context->thread_mem_tracker_mgr->attach_limiter_tracker(t2);
+ EXPECT_EQ(t1->consumption(),
+ size1 + size2 + size1); // attach automatic call
flush_untracked_mem.
+
+ thread_context->consume_memory(size1);
+ thread_context->consume_memory(size2);
+ thread_context->consume_memory(size1);
+ EXPECT_EQ(t1->consumption(), size1 + size2 + size1); // not changed, now
consume t2
+ EXPECT_EQ(t2->consumption(), size1 + size2);
+
+ thread_context->thread_mem_tracker_mgr->detach_limiter_tracker(t1); //
detach
+ EXPECT_EQ(t2->consumption(),
+ size1 + size2 + size1); // detach automatic call
flush_untracked_mem.
+
+ thread_context->consume_memory(size2);
+ thread_context->consume_memory(size2);
+ EXPECT_EQ(t1->consumption(), size1 + size2 + size1 + size2 + size2);
+ EXPECT_EQ(t2->consumption(), size1 + size2 + size1); // not changed, now
consume t1
+
+ thread_context->thread_mem_tracker_mgr->attach_limiter_tracker(t2);
+ thread_context->consume_memory(-size1);
+ thread_context->thread_mem_tracker_mgr->attach_limiter_tracker(t3);
+ thread_context->consume_memory(size1);
+ thread_context->consume_memory(size2);
+ thread_context->consume_memory(size1);
+ EXPECT_EQ(t1->consumption(), size1 + size2 + size1 + size2 + size2);
+ EXPECT_EQ(t2->consumption(), size1 + size2); // attach automatic call
flush_untracked_mem.
+ EXPECT_EQ(t3->consumption(), size1 + size2);
+
+ thread_context->consume_memory(-size1);
+ thread_context->consume_memory(-size2);
+ thread_context->consume_memory(-size1);
+ EXPECT_EQ(t3->consumption(), size1);
+
+ thread_context->thread_mem_tracker_mgr->detach_limiter_tracker(t2); //
detach
+ EXPECT_EQ(t1->consumption(), size1 + size2 + size1 + size2 + size2);
+ EXPECT_EQ(t2->consumption(), size1 + size2);
+ EXPECT_EQ(t3->consumption(), 0);
+
+ thread_context->consume_memory(-size1);
+ thread_context->consume_memory(-size2);
+ thread_context->consume_memory(-size1);
+ EXPECT_EQ(t1->consumption(), size1 + size2 + size1 + size2 + size2);
+ EXPECT_EQ(t2->consumption(), 0);
+
+ thread_context->thread_mem_tracker_mgr->detach_limiter_tracker(t1); //
detach
+ EXPECT_EQ(t1->consumption(), size1 + size2 + size1 + size2 + size2);
+ EXPECT_EQ(t2->consumption(), -size1);
+
+ thread_context->consume_memory(-t1->consumption());
+ thread_context->detach_task(); // detach t1
+ EXPECT_EQ(t1->consumption(), 0);
+}
+
+TEST(ThreadMemTrackerMgrTest, MultiMemTracker) {
+ std::unique_ptr<ThreadContext> thread_context =
std::make_unique<ThreadContext>();
+ std::shared_ptr<MemTrackerLimiter> t1 =
+ MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER,
"UT-MultiMemTracker1");
+ std::shared_ptr<MemTracker> t2 =
std::make_shared<MemTracker>("UT-MultiMemTracker2", t1.get());
+ std::shared_ptr<MemTracker> t3 =
std::make_shared<MemTracker>("UT-MultiMemTracker3", t1.get());
+
+ int64_t size1 = 4 * 1024;
+ int64_t size2 = 4 * 1024 * 1024;
+
+ thread_context->attach_task(TUniqueId(), t1);
+ thread_context->consume_memory(size1);
+ thread_context->consume_memory(size2);
+ thread_context->consume_memory(size1);
+ EXPECT_EQ(t1->consumption(), size1 + size2);
+
+ bool rt =
thread_context->thread_mem_tracker_mgr->push_consumer_tracker(t2.get());
+ EXPECT_EQ(rt, true);
+ EXPECT_EQ(t1->consumption(), size1 + size2);
+ EXPECT_EQ(t2->consumption(), -size1); // _untracked_mem = size1
+
+ thread_context->consume_memory(size2);
+ EXPECT_EQ(t1->consumption(), size1 + size2 + size1 + size2);
+ EXPECT_EQ(t2->consumption(), size2);
+
+ rt =
thread_context->thread_mem_tracker_mgr->push_consumer_tracker(t2.get());
+ EXPECT_EQ(rt, false);
+ thread_context->consume_memory(size2);
+ EXPECT_EQ(t1->consumption(), size1 + size2 + size1 + size2 + size2);
+ EXPECT_EQ(t2->consumption(), size2 + size2);
+
+ rt =
thread_context->thread_mem_tracker_mgr->push_consumer_tracker(t3.get());
+ EXPECT_EQ(rt, true);
+ thread_context->consume_memory(size1);
+ thread_context->consume_memory(size2);
+ thread_context->consume_memory(-size1); // _untracked_mem = -size1
+ EXPECT_EQ(t1->consumption(), size1 + size2 + size1 + size2 + size2 + size1
+ size2);
+ EXPECT_EQ(t2->consumption(), size2 + size2 + size1 + size2);
+ EXPECT_EQ(t3->consumption(), size1 + size2);
+
+ thread_context->thread_mem_tracker_mgr->pop_consumer_tracker();
+ EXPECT_EQ(t1->consumption(), size1 + size2 + size1 + size2 + size2 + size1
+ size2 - size1);
+ EXPECT_EQ(t2->consumption(), size2 + size2 + size1 + size2 - size1);
+ EXPECT_EQ(t3->consumption(), size1 + size2 - size1);
+
+ thread_context->consume_memory(-size2);
+ thread_context->consume_memory(size2);
+ thread_context->consume_memory(-size2);
+ thread_context->thread_mem_tracker_mgr->pop_consumer_tracker();
+ EXPECT_EQ(t1->consumption(),
+ size1 + size2 + size1 + size2 + size2 + size1 + size2 - size1 -
size2);
+ EXPECT_EQ(t2->consumption(), size2 + size2 + size1 + size2 - size1 -
size2);
+ EXPECT_EQ(t3->consumption(), size1 + size2 - size1);
+
+ thread_context->consume_memory(-t1->consumption());
+ thread_context->detach_task(); // detach t1
+ EXPECT_EQ(t1->consumption(), 0);
+ EXPECT_EQ(t2->consumption(), size2 + size2 + size1 + size2 - size1 -
size2);
+ EXPECT_EQ(t3->consumption(), size1 + size2 - size1);
+}
+
+TEST(ThreadMemTrackerMgrTest, ScopedCount) {
+ std::unique_ptr<ThreadContext> thread_context =
std::make_unique<ThreadContext>();
+ std::shared_ptr<MemTrackerLimiter> t1 =
+ MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER,
"UT-ScopedCount");
+
+ int64_t size1 = 4 * 1024;
+ int64_t size2 = 4 * 1024 * 1024;
+
+ thread_context->attach_task(TUniqueId(), t1);
+ thread_context->thread_mem_tracker_mgr->start_count_scope_mem();
+ thread_context->consume_memory(size1);
+ thread_context->consume_memory(size2);
+ thread_context->consume_memory(size1);
+ thread_context->consume_memory(size2);
+ thread_context->consume_memory(size1);
+ int64_t scope_mem =
thread_context->thread_mem_tracker_mgr->stop_count_scope_mem();
+ EXPECT_EQ(t1->consumption(), size1 + size2 + size1 + size2 + size1);
+ EXPECT_EQ(t1->consumption(), scope_mem);
+
+ thread_context->consume_memory(-size2);
+ thread_context->consume_memory(-size1);
+ thread_context->consume_memory(-size2);
+ EXPECT_EQ(t1->consumption(), size1 + size1);
+ EXPECT_EQ(scope_mem, size1 + size2 + size1 + size2 + size1);
+}
+
+TEST(ThreadMemTrackerMgrTest, ReserveMemory) {
+ std::unique_ptr<ThreadContext> thread_context =
std::make_unique<ThreadContext>();
+ std::shared_ptr<MemTrackerLimiter> t =
+ MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER,
"UT-ReserveMemory");
+
+ int64_t size1 = 4 * 1024;
+ int64_t size2 = 4 * 1024 * 1024;
+ int64_t size3 = size2 * 1024;
+
+ thread_context->attach_task(TUniqueId(), t);
+ thread_context->consume_memory(size1);
+ thread_context->consume_memory(size2);
+ EXPECT_EQ(t->consumption(), size1 + size2);
+
+ thread_context->try_reserve_memory(size3);
+ EXPECT_EQ(t->consumption(), size1 + size2 + size3);
+ EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3);
+
+ thread_context->consume_memory(size2);
+ thread_context->consume_memory(-size2);
+ thread_context->consume_memory(size2);
+ EXPECT_EQ(t->consumption(), size1 + size2 + size3);
+ EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3
- size2);
+
+ thread_context->consume_memory(-size1);
+ thread_context->consume_memory(-size1);
+ EXPECT_EQ(t->consumption(), size1 + size2 + size3);
+ // std::abs(-size1 - size1) < SYNC_PROC_RESERVED_INTERVAL_BYTES, not
update process_reserved_memory.
+ EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3
- size2);
+
+ thread_context->consume_memory(size2 * 1023);
+ EXPECT_EQ(t->consumption(), size1 + size2 + size3);
+ EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size1
+ size1);
+
+ std::cout << "11111 " <<
thread_context->thread_mem_tracker_mgr->untracked_mem() << ", "
+ << thread_context->thread_mem_tracker_mgr->reserved_mem() <<
std::endl;
+ thread_context->consume_memory(size1);
+ thread_context->consume_memory(size1);
+ std::cout << "2222 " <<
thread_context->thread_mem_tracker_mgr->untracked_mem() << ", "
+ << thread_context->thread_mem_tracker_mgr->reserved_mem() <<
std::endl;
+ std::cout << "3333 " <<
thread_context->thread_mem_tracker_mgr->untracked_mem() << ", "
+ << thread_context->thread_mem_tracker_mgr->reserved_mem() <<
std::endl;
+ // reserved memory used done
+ EXPECT_EQ(t->consumption(), size1 + size2 + size3);
+ EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), 0);
+
+ thread_context->consume_memory(size1);
+ thread_context->consume_memory(size2);
+ // no reserved memory, normal memory consumption
+ EXPECT_EQ(t->consumption(), size1 + size2 + size3 + size1 + size2);
+ EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), 0);
+
+ thread_context->consume_memory(-size3);
+ thread_context->consume_memory(-size1);
+ thread_context->consume_memory(-size2);
+ EXPECT_EQ(t->consumption(), size1 + size2);
+ EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), 0);
+
+ thread_context->try_reserve_memory(size3);
+ EXPECT_EQ(t->consumption(), size1 + size2 + size3);
+ EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3);
+
+ thread_context->consume_memory(-size1);
+ // ThreadMemTrackerMgr _reserved_mem = size3 + size1
+ // ThreadMemTrackerMgr _untracked_mem = -size1
+ thread_context->consume_memory(size3);
+ // ThreadMemTrackerMgr _reserved_mem = size1
+ // ThreadMemTrackerMgr _untracked_mem = -size1 + size3
+ EXPECT_EQ(t->consumption(), size1 + size2 + size3);
+ EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(),
+ size1); // size3 + size1 - size3
+
+ thread_context->consume_memory(-size3);
+ // ThreadMemTrackerMgr _reserved_mem = size1 + size3
+ // ThreadMemTrackerMgr _untracked_mem = 0, std::abs(-size3) >
SYNC_PROC_RESERVED_INTERVAL_BYTES,
+ // so update process_reserved_memory.
+ EXPECT_EQ(t->consumption(), size1 + size2 + size3);
+ EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size1
+ size3);
+
+ thread_context->consume_memory(size1);
+ thread_context->consume_memory(size2);
+ thread_context->consume_memory(size1);
+ // ThreadMemTrackerMgr _reserved_mem = size1 + size3 - size1 - size2 -
size1 = size3 - size2 - size1
+ // ThreadMemTrackerMgr _untracked_mem = size1
+ EXPECT_EQ(t->consumption(), size1 + size2 + size3);
+ // size1 + size3 - (size1 + size2)
+ EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3
- size2);
+
+ thread_context->release_reserved_memory();
+ // size1 + size2 + size3 - _reserved_mem, size1 + size2 + size3 - (size3 -
size2 - size1)
+ EXPECT_EQ(t->consumption(), size1 + size2 + size1 + size2);
+ // size3 - size2 - (_reserved_mem + _untracked_mem) = 0, size3 - size2 -
((size3 - size2 - size1) + (size1)) = 0
+ EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), 0);
+
+ thread_context->detach_task();
+ EXPECT_EQ(t->consumption(), size1 + size2 + size1 + size2);
+ EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), 0);
+}
+
+TEST(ThreadMemTrackerMgrTest, NestedReserveMemory) {
+ std::unique_ptr<ThreadContext> thread_context =
std::make_unique<ThreadContext>();
+ std::shared_ptr<MemTrackerLimiter> t = MemTrackerLimiter::create_shared(
+ MemTrackerLimiter::Type::OTHER, "UT-NestedReserveMemory");
+
+ int64_t size2 = 4 * 1024 * 1024;
+ int64_t size3 = size2 * 1024;
+
+ thread_context->attach_task(TUniqueId(), t);
+ thread_context->try_reserve_memory(size3);
+ EXPECT_EQ(t->consumption(), size3);
+ EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3);
+
+ thread_context->consume_memory(size2);
+ // ThreadMemTrackerMgr _reserved_mem = size3 - size2
+ // ThreadMemTrackerMgr _untracked_mem = 0, size2 >
SYNC_PROC_RESERVED_INTERVAL_BYTES,
+ // update process_reserved_memory.
+ EXPECT_EQ(t->consumption(), size3);
+ EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3
- size2);
+
+ thread_context->try_reserve_memory(size2);
+ // ThreadMemTrackerMgr _reserved_mem = size3 - size2 + size2
+ // ThreadMemTrackerMgr _untracked_mem = 0
+ EXPECT_EQ(t->consumption(), size3 + size2);
+ EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(),
+ size3); // size3 - size2 + size2
+
+ thread_context->try_reserve_memory(size3);
+ thread_context->try_reserve_memory(size3);
+ thread_context->consume_memory(size3);
+ thread_context->consume_memory(size2);
+ thread_context->consume_memory(size3);
+ // ThreadMemTrackerMgr _reserved_mem = size3 - size2
+ // ThreadMemTrackerMgr _untracked_mem = 0
+ EXPECT_EQ(t->consumption(), size3 + size2 + size3 + size3);
+ EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3
- size2);
+
+ thread_context->release_reserved_memory();
+ // size3 + size2 + size3 + size3 - _reserved_mem, size3 + size2 + size3 +
size3 - (size3 - size2)
+ EXPECT_EQ(t->consumption(), size3 + size2 + size3 + size2);
+ // size3 - size2 - (_reserved_mem + _untracked_mem) = 0, size3 - size2 -
((size3 - size2 - size1) + (size1)) = 0
+ EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), 0);
+
+ thread_context->detach_task();
+ EXPECT_EQ(t->consumption(), size3 + size2 + size3 + size2);
+ EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), 0);
+}
+
+TEST(ThreadMemTrackerMgrTest, NestedSwitchMemTrackerReserveMemory) {
+ std::unique_ptr<ThreadContext> thread_context =
std::make_unique<ThreadContext>();
+ std::shared_ptr<MemTrackerLimiter> t1 = MemTrackerLimiter::create_shared(
+ MemTrackerLimiter::Type::OTHER,
"UT-NestedSwitchMemTrackerReserveMemory1");
+ std::shared_ptr<MemTrackerLimiter> t2 = MemTrackerLimiter::create_shared(
+ MemTrackerLimiter::Type::OTHER,
"UT-NestedSwitchMemTrackerReserveMemory2");
+ std::shared_ptr<MemTrackerLimiter> t3 = MemTrackerLimiter::create_shared(
+ MemTrackerLimiter::Type::OTHER,
"UT-NestedSwitchMemTrackerReserveMemory3");
+
+ int64_t size1 = 4 * 1024;
+ int64_t size2 = 4 * 1024 * 1024;
+ int64_t size3 = size2 * 1024;
+
+ thread_context->attach_task(TUniqueId(), t1);
+ thread_context->try_reserve_memory(size3);
+ thread_context->consume_memory(size2);
+ EXPECT_EQ(t1->consumption(), size3);
+ EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3
- size2);
+
+ thread_context->thread_mem_tracker_mgr->attach_limiter_tracker(t2);
+ thread_context->try_reserve_memory(size3);
+ EXPECT_EQ(t1->consumption(), size3);
+ EXPECT_EQ(t2->consumption(), size3);
+ EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3
- size2 + size3);
+
+ thread_context->consume_memory(size2 + size3); // reserved memory used done
+ EXPECT_EQ(t1->consumption(), size3);
+ EXPECT_EQ(t2->consumption(), size3 + size2);
+ EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3
- size2);
+
+ thread_context->thread_mem_tracker_mgr->attach_limiter_tracker(t3);
+ thread_context->try_reserve_memory(size3);
+ EXPECT_EQ(t1->consumption(), size3);
+ EXPECT_EQ(t2->consumption(), size3 + size2);
+ EXPECT_EQ(t3->consumption(), size3);
+ EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3
- size2 + size3);
+
+ thread_context->consume_memory(-size2);
+ thread_context->consume_memory(-size1);
+ // ThreadMemTrackerMgr _reserved_mem = size3 + size2 + size1
+ // ThreadMemTrackerMgr _untracked_mem = -size1
+ EXPECT_EQ(t1->consumption(), size3);
+ EXPECT_EQ(t2->consumption(), size3 + size2);
+ EXPECT_EQ(t3->consumption(), size3);
+ EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(),
+ size3 - size2 + size3 + size2);
+
+ thread_context->thread_mem_tracker_mgr->detach_limiter_tracker(t2); //
detach
+ EXPECT_EQ(t1->consumption(), size3);
+ EXPECT_EQ(t2->consumption(), size3 + size2);
+ EXPECT_EQ(t3->consumption(), -size1 - size2); // size3 - _reserved_mem
+ // size3 - size2 + size3 + size2 - (_reserved_mem + _untracked_mem)
+ EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3
- size2);
+
+ thread_context->thread_mem_tracker_mgr->detach_limiter_tracker(t1); //
detach
+ EXPECT_EQ(t1->consumption(), size3);
+ // not changed, reserved memory used done.
+ EXPECT_EQ(t2->consumption(), size3 + size2);
+ EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), size3
- size2);
+
+ thread_context->detach_task();
+ EXPECT_EQ(t1->consumption(), size2); // size3 - _reserved_mem
+ // size3 - size2 - (_reserved_mem + _untracked_mem)
+ EXPECT_EQ(doris::GlobalMemoryArbitrator::process_reserved_memory(), 0);
+}
+
+} // end namespace doris
diff --git a/be/test/testutil/run_all_tests.cpp
b/be/test/testutil/run_all_tests.cpp
index de088f8d17b..75afdacd87b 100644
--- a/be/test/testutil/run_all_tests.cpp
+++ b/be/test/testutil/run_all_tests.cpp
@@ -69,6 +69,8 @@ int main(int argc, char** argv) {
static_cast<void>(service->start());
doris::global_test_http_host = "http://127.0.0.1:" +
std::to_string(service->get_real_port());
+ doris::ExecEnv::GetInstance()->set_tracking_memory(false);
+
int res = RUN_ALL_TESTS();
return res;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]