This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 9434ee57100 [fix](load) fix memtracking orphan too large (#28600)
9434ee57100 is described below

commit 9434ee5710093790ab4a570a232ef8593dcdab79
Author: Kaijie Chen <[email protected]>
AuthorDate: Tue Dec 19 12:41:19 2023 +0800

    [fix](load) fix memtracking orphan too large (#28600)
---
 be/src/common/config.cpp                |  2 ++
 be/src/common/config.h                  |  2 ++
 be/src/olap/memtable.cpp                | 11 ++++++++++-
 be/src/olap/memtable_memory_limiter.cpp |  2 ++
 be/src/runtime/tablets_channel.cpp      |  9 +++++++++
 5 files changed, 25 insertions(+), 1 deletion(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 2722b5a8a16..549404e80ff 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -586,6 +586,8 @@ DEFINE_mInt32(memtable_soft_limit_active_percent, "50");
 // Alignment
 DEFINE_Int32(memory_max_alignment, "16");
 
+// memtable insert memory tracker will multiply input block size with this 
ratio
+DEFINE_mDouble(memtable_insert_memory_ratio, "1.4");
 // max write buffer size before flush, default 200MB
 DEFINE_mInt64(write_buffer_size, "209715200");
 // max buffer size used in memtable for the aggregated table, default 400MB
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 37020c56be4..fb5943ec9fd 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -645,6 +645,8 @@ DECLARE_mInt32(memtable_soft_limit_active_percent);
 // Alignment
 DECLARE_Int32(memory_max_alignment);
 
+// memtable insert memory tracker will multiply input block size with this 
ratio
+DECLARE_mDouble(memtable_insert_memory_ratio);
 // max write buffer size before flush, default 200MB
 DECLARE_mInt64(write_buffer_size);
 // max buffer size used in memtable for the aggregated table, default 400MB
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 95966edf9db..e868d79dfb6 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -44,6 +44,7 @@
 namespace doris {
 
 bvar::Adder<int64_t> g_memtable_cnt("memtable_cnt");
+bvar::Adder<int64_t> 
g_memtable_input_block_allocated_size("memtable_input_block_allocated_size");
 
 using namespace ErrorCode;
 
@@ -137,6 +138,7 @@ void MemTable::_init_agg_functions(const vectorized::Block* 
block) {
 }
 
 MemTable::~MemTable() {
+    g_memtable_input_block_allocated_size << 
-_input_mutable_block.allocated_bytes();
     g_memtable_cnt << -1;
     if (_keys_type != KeysType::DUP_KEYS) {
         for (auto it = _row_in_blocks.begin(); it != _row_in_blocks.end(); 
it++) {
@@ -198,6 +200,7 @@ void MemTable::insert(const vectorized::Block* input_block, 
const std::vector<ui
 
     auto num_rows = row_idxs.size();
     size_t cursor_in_mutableblock = _input_mutable_block.rows();
+    auto block_size0 = _input_mutable_block.allocated_bytes();
     if (is_append) {
         // Append the block, call insert range from
         _input_mutable_block.add_rows(&target_block, 0, target_block.rows());
@@ -205,7 +208,10 @@ void MemTable::insert(const vectorized::Block* 
input_block, const std::vector<ui
     } else {
         _input_mutable_block.add_rows(&target_block, row_idxs.data(), 
row_idxs.data() + num_rows);
     }
-    size_t input_size = target_block.bytes() * num_rows / target_block.rows();
+    auto block_size1 = _input_mutable_block.allocated_bytes();
+    g_memtable_input_block_allocated_size << block_size1 - block_size0;
+    size_t input_size = target_block.bytes() * num_rows / target_block.rows() *
+                        config::memtable_insert_memory_ratio;
     _mem_usage += input_size;
     _insert_mem_tracker->consume(input_size);
     for (int i = 0; i < num_rows; i++) {
@@ -504,6 +510,9 @@ std::unique_ptr<vectorized::Block> MemTable::to_block() {
         !_tablet_schema->cluster_key_idxes().empty()) {
         _sort_by_cluster_keys();
     }
+    _input_mutable_block.clear();
+    _insert_mem_tracker->release(_mem_usage);
+    _mem_usage = 0;
     return vectorized::Block::create_unique(_output_mutable_block.to_block());
 }
 
diff --git a/be/src/olap/memtable_memory_limiter.cpp 
b/be/src/olap/memtable_memory_limiter.cpp
index bceb33419a1..6d5ee2f3f76 100644
--- a/be/src/olap/memtable_memory_limiter.cpp
+++ b/be/src/olap/memtable_memory_limiter.cpp
@@ -38,6 +38,7 @@ bvar::Status<int64_t> 
g_memtable_flush_memory("mm_limiter_mem_flush", 0);
 bvar::Status<int64_t> g_memtable_load_memory("mm_limiter_mem_load", 0);
 bvar::Status<int64_t> g_load_hard_mem_limit("mm_limiter_limit_hard", 0);
 bvar::Status<int64_t> g_load_soft_mem_limit("mm_limiter_limit_soft", 0);
+bvar::Status<int64_t> g_orphan_memory("mm_limiter_mem_orphan", 0);
 
 // Calculate the total memory limit of all load tasks on this BE
 static int64_t calc_process_max_load_memory(int64_t process_mem_limit) {
@@ -236,6 +237,7 @@ void MemTableMemoryLimiter::_refresh_mem_tracker() {
     g_memtable_load_memory.set_value(_mem_usage);
     VLOG_DEBUG << "refreshed mem_tracker, num writers: " << _writers.size();
     THREAD_MEM_TRACKER_TRANSFER_TO(_mem_usage - _mem_tracker->consumption(), 
_mem_tracker.get());
+    
g_orphan_memory.set_value(ExecEnv::GetInstance()->orphan_mem_tracker()->consumption());
     if (!_hard_limit_reached()) {
         _hard_limit_end_cond.notify_all();
     }
diff --git a/be/src/runtime/tablets_channel.cpp 
b/be/src/runtime/tablets_channel.cpp
index dfae3433f35..2cafe308271 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -17,6 +17,7 @@
 
 #include "runtime/tablets_channel.h"
 
+#include <bvar/bvar.h>
 #include <fmt/format.h>
 #include <gen_cpp/internal_service.pb.h>
 #include <gen_cpp/types.pb.h>
@@ -41,6 +42,7 @@
 #include "olap/storage_engine.h"
 #include "olap/txn_manager.h"
 #include "runtime/load_channel.h"
+#include "util/defer_op.h"
 #include "util/doris_metrics.h"
 #include "util/metrics.h"
 #include "vec/core/block.h"
@@ -48,6 +50,9 @@
 namespace doris {
 class SlotDescriptor;
 
+bvar::Adder<int64_t> g_tablets_channel_send_data_allocated_size(
+        "tablets_channel_send_data_allocated_size");
+
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(tablet_writer_count, MetricUnit::NOUNIT);
 
 std::atomic<uint64_t> BaseTabletsChannel::_s_tablet_writer_count;
@@ -521,6 +526,10 @@ Status BaseTabletsChannel::add_batch(const 
PTabletWriterAddBlockRequest& request
             << "block rows: " << send_data.rows()
             << ", tablet_ids_size: " << request.tablet_ids_size();
 
+    g_tablets_channel_send_data_allocated_size << send_data.allocated_bytes();
+    Defer defer {
+            [&]() { g_tablets_channel_send_data_allocated_size << 
-send_data.allocated_bytes(); }};
+
     auto write_tablet_data = [&](uint32_t tablet_id,
                                  std::function<Status(BaseDeltaWriter * 
writer)> write_func) {
         google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors =


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to