This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new b153d7e30b7 branch-4.0: [enhance](memtable) support adaptive memtable 
write buffer size #56948 (#57023)
b153d7e30b7 is described below

commit b153d7e30b7f6e957269ac103ce2a8a42a2457b6
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Oct 16 17:51:06 2025 +0800

    branch-4.0: [enhance](memtable) support adaptive memtable write buffer size 
#56948 (#57023)
    
    Cherry-picked from #56948
    
    Co-authored-by: hui lai <[email protected]>
---
 be/src/common/config.cpp                           |  1 +
 be/src/common/config.h                             |  1 +
 be/src/olap/memtable.cpp                           | 21 +++++++++-
 be/src/olap/memtable.h                             |  6 +++
 be/src/olap/memtable_writer.cpp                    | 23 +++++++---
 be/src/olap/memtable_writer.h                      |  2 +
 .../memtable/test_memtable_too_many_rows.groovy    | 49 ++++++++++++++++++++++
 7 files changed, 97 insertions(+), 6 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 5e7dec44248..c2ce46bb06e 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -687,6 +687,7 @@ DEFINE_mInt32(memory_gc_sleep_time_ms, "500");
 
 // max write buffer size before flush, default 200MB
 DEFINE_mInt64(write_buffer_size, "209715200");
+DEFINE_mBool(enable_adaptive_write_buffer_size, "true");
 // max buffer size used in memtable for the aggregated table, default 400MB
 DEFINE_mInt64(write_buffer_size_for_agg, "104857600");
 DEFINE_mInt64(min_write_buffer_size_for_partial_update, "1048576");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index af81e4ff273..df83ae46909 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -711,6 +711,7 @@ DECLARE_mInt32(memory_gc_sleep_time_ms);
 
 // max write buffer size before flush, default 200MB
 DECLARE_mInt64(write_buffer_size);
+DECLARE_mBool(enable_adaptive_write_buffer_size);
 // max buffer size used in memtable for the aggregated table, default 400MB
 DECLARE_mInt64(write_buffer_size_for_agg);
 
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 1dbe2e8020b..e5c05c3a1d6 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -83,6 +83,7 @@ MemTable::MemTable(int64_t tablet_id, 
std::shared_ptr<TabletSchema> tablet_schem
     // TODO: Support ZOrderComparator in the future
     _init_columns_offset_by_slot_descs(slot_descs, tuple_desc);
     _row_in_blocks = 
std::make_unique<DorisVector<std::shared_ptr<RowInBlock>>>();
+    _load_mem_limit = MemInfo::mem_limit() * 
config::load_process_max_memory_limit_percent / 100;
 }
 
 void MemTable::_init_columns_offset_by_slot_descs(const 
std::vector<SlotDescriptor*>* slot_descs,
@@ -652,7 +653,7 @@ void MemTable::shrink_memtable_by_agg() {
 
 bool MemTable::need_flush() const {
     DBUG_EXECUTE_IF("MemTable.need_flush", { return true; });
-    auto max_size = config::write_buffer_size;
+    auto max_size = _adaptive_write_buffer_size();
     if (_partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) {
         auto update_columns_size = _num_columns;
         auto min_buffer_size = 
config::min_write_buffer_size_for_partial_update;
@@ -662,6 +663,24 @@ bool MemTable::need_flush() const {
     return memory_usage() >= max_size;
 }
 
+int64_t MemTable::_adaptive_write_buffer_size() const {
+    if (!config::enable_adaptive_write_buffer_size) [[unlikely]] {
+        return config::write_buffer_size;
+    }
+    const int64_t current_load_mem_value = MemoryProfile::load_current_usage();
+    int64_t factor = 4;
+    // Memory usage intervals:
+    // (80 %, 100 %] → 1× buffer
+    // (50 %, 80 %]  → 2× buffer
+    // [0 %, 50 %]   → 4× buffer
+    if (current_load_mem_value > (_load_mem_limit * 4) / 5) { // > 80 %
+        factor = 1;
+    } else if (current_load_mem_value > _load_mem_limit / 2) { // > 50 %
+        factor = 2;
+    }
+    return config::write_buffer_size * factor;
+}
+
 bool MemTable::need_agg() const {
     if (_keys_type == KeysType::AGG_KEYS) {
         auto max_size = _last_agg_pos + config::write_buffer_size_for_agg;
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index 5cd70e812a9..47426e354d0 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -20,6 +20,7 @@
 #include <stddef.h>
 #include <stdint.h>
 
+#include <cstdint>
 #include <cstring>
 #include <functional>
 #include <memory>
@@ -204,6 +205,8 @@ public:
 
     void update_mem_type(MemType memtype) { _mem_type = memtype; }
 
+    int64_t raw_rows() { return _stat.raw_rows.load(); }
+
 private:
     // for vectorized
     template <bool has_skip_bitmap_col>
@@ -213,6 +216,8 @@ private:
     // Used to wrapped by to_block to do exception handle logic
     Status _to_block(std::unique_ptr<vectorized::Block>* res);
 
+    int64_t _adaptive_write_buffer_size() const;
+
 private:
     std::atomic<MemType> _mem_type;
     int64_t _tablet_id;
@@ -231,6 +236,7 @@ private:
     // In this way, we can make MemTable::memory_usage() to be more accurate, 
and eventually
     // reduce the number of segment files that are generated by current load
     vectorized::Arena _arena;
+    int64_t _load_mem_limit = -1;
 
     void _init_columns_offset_by_slot_descs(const 
std::vector<SlotDescriptor*>* slot_descs,
                                             const TupleDescriptor* tuple_desc);
diff --git a/be/src/olap/memtable_writer.cpp b/be/src/olap/memtable_writer.cpp
index 82349d2e0ee..95ad9c192aa 100644
--- a/be/src/olap/memtable_writer.cpp
+++ b/be/src/olap/memtable_writer.cpp
@@ -103,6 +103,14 @@ Status MemTableWriter::write(const vectorized::Block* 
block,
                                              _req.tablet_id, 
_req.load_id.hi(), _req.load_id.lo());
     }
 
+    // Flush and reset memtable if it is raw rows great than int32_t.
+    int64_t raw_rows = _mem_table->raw_rows();
+    DBUG_EXECUTE_IF("MemTableWriter.too_many_raws",
+                    { raw_rows = std::numeric_limits<int32_t>::max(); });
+    if (raw_rows + row_idxs.size() > std::numeric_limits<int32_t>::max()) {
+        RETURN_IF_ERROR(_flush_memtable());
+    }
+
     _total_received_rows += row_idxs.size();
     auto st = _mem_table->insert(block, row_idxs);
 
@@ -127,16 +135,21 @@ Status MemTableWriter::write(const vectorized::Block* 
block,
         _mem_table->shrink_memtable_by_agg();
     }
     if (UNLIKELY(_mem_table->need_flush())) {
-        auto s = _flush_memtable_async();
-        _reset_mem_table();
-        if (UNLIKELY(!s.ok())) {
-            return s;
-        }
+        RETURN_IF_ERROR(_flush_memtable());
     }
 
     return Status::OK();
 }
 
+Status MemTableWriter::_flush_memtable() {
+    auto s = _flush_memtable_async();
+    _reset_mem_table();
+    if (UNLIKELY(!s.ok())) {
+        return s;
+    }
+    return Status::OK();
+}
+
 Status MemTableWriter::_flush_memtable_async() {
     DCHECK(_flush_token != nullptr);
     std::shared_ptr<MemTable> memtable;
diff --git a/be/src/olap/memtable_writer.h b/be/src/olap/memtable_writer.h
index aa1fd4025ed..7465e79a452 100644
--- a/be/src/olap/memtable_writer.h
+++ b/be/src/olap/memtable_writer.h
@@ -25,6 +25,7 @@
 #include <cstdint>
 #include <memory>
 #include <mutex>
+#include <random>
 #include <vector>
 
 #include "common/status.h"
@@ -109,6 +110,7 @@ public:
     }
 
 private:
+    Status _flush_memtable();
     // push a full memtable to flush executor
     Status _flush_memtable_async();
 
diff --git 
a/regression-test/suites/fault_injection_p0/memtable/test_memtable_too_many_rows.groovy
 
b/regression-test/suites/fault_injection_p0/memtable/test_memtable_too_many_rows.groovy
new file mode 100644
index 00000000000..38516864727
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/memtable/test_memtable_too_many_rows.groovy
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_memtable_too_many_rows", "nonConcurrent") {
+    GetDebugPoint().clearDebugPointsForAllBEs()
+    def testTable = "test_memtable_too_many_rows"
+    sql """ DROP TABLE IF EXISTS ${testTable}"""
+
+    sql """
+        CREATE TABLE IF NOT EXISTS `${testTable}` (
+          `id` BIGINT NOT NULL,
+          `value` int(11) NOT NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`id`)
+        COMMENT "OLAP"
+        DISTRIBUTED BY HASH(`id`) BUCKETS 1
+        PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1"
+        )
+    """
+
+    def debugPoint = "MemTableWriter.too_many_raws"
+    try {
+        GetDebugPoint().enableDebugPointForAllBEs(debugPoint)
+        sql "insert into ${testTable} values(1,1)"
+        def res = sql "select * from ${testTable}"
+        logger.info("res: " + res.size())
+        assertTrue(res.size() == 1)
+    } catch (Exception e){
+        logger.info(e.getMessage())
+        assertTrue(e.getMessage().contains("write memtable too many rows 
fail"))
+    } finally {
+        GetDebugPoint().disableDebugPointForAllBEs(debugPoint)
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to