This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new ca486c5e12d branch-3.1: [enhancement](memory) Add configurable 
threshold for small memory task protection (#56994)
ca486c5e12d is described below

commit ca486c5e12d37564cabdb0ae7905002285ae775f
Author: Wen Zhenghu <[email protected]>
AuthorDate: Tue Oct 28 14:48:04 2025 +0800

    branch-3.1: [enhancement](memory) Add configurable threshold for small 
memory task protection (#56994)
    
    ### What problem does this PR solve?
    
    Issue Number: close https://github.com/apache/doris/issues/56104
    only branch 3.0/3.1 have this issue, so we just put pr direct to
    branch-3.1
    
    Changes:
    - Add new BE config parameter
    `mem_tracker_limit_small_memory_task_bytes` with default
      value of 32MB (33554432 bytes)
    - Replace hardcoded 32MB threshold in `free_top_overcommit_query` with
    the configurable
      parameter
    - Add small memory task protection logic in `free_top_memory_query`
    function to ensure
      consistent behavior across both memory release mechanisms
    
    This commit introduces a configurable parameter
    `mem_tracker_limit_small_memory_task_bytes`
    to replace the hardcoded 32MB threshold for protecting small memory
    tasks from being
    cancelled during memory pressure.
    
    This improvement allows administrators to adjust the small memory task
    threshold based on
    their deployment environment and requirements, while maintaining
    backward compatibility
    with the existing 32MB default value.
---
 be/src/common/config.cpp                           |  5 ++++
 be/src/common/config.h                             |  4 +++
 be/src/runtime/memory/mem_tracker_limiter.cpp      | 10 +++++--
 .../test_writer_fault_injection.groovy             | 31 +++++++++++++++++++---
 4 files changed, 44 insertions(+), 6 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 74bd707704d..186fe89326d 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -671,6 +671,11 @@ DEFINE_mInt32(memory_maintenance_sleep_time_ms, "20");
 // After minor gc, no minor gc during sleep, but full gc is possible.
 DEFINE_mInt32(memory_gc_sleep_time_ms, "500");
 
+// Small memory task threshold in bytes for memory tracker limiter
+// Tasks with memory consumption below this threshold will not be cancelled 
during memory pressure
+// Default is 32MB (33554432 bytes)
+DEFINE_mInt64(mem_tracker_limit_small_memory_task_bytes, "33554432");
+
 // max write buffer size before flush, default 200MB
 DEFINE_mInt64(write_buffer_size, "209715200");
 // max buffer size used in memtable for the aggregated table, default 400MB
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 5aa4f8aef86..bff34614156 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -714,6 +714,10 @@ DECLARE_mInt32(memory_maintenance_sleep_time_ms);
 // After minor gc, no minor gc during sleep, but full gc is possible.
 DECLARE_mInt32(memory_gc_sleep_time_ms);
 
+// Small memory task threshold in bytes for memory tracker limiter
+// Tasks with memory consumption below this threshold will not be cancelled 
during memory pressure
+DECLARE_mInt64(mem_tracker_limit_small_memory_task_bytes);
+
 // max write buffer size before flush, default 200MB
 DECLARE_mInt64(write_buffer_size);
 // max buffer size used in memtable for the aggregated table, default 400MB
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp 
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index ac4684835a6..344838f2bbf 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -445,6 +445,11 @@ int64_t MemTrackerLimiter::free_top_memory_query(
                 auto tracker = trackerWptr.lock();
                 if (tracker != nullptr && tracker->type() == type) {
                     seek_num++;
+                    // Skip small memory tasks to avoid cancelling them during 
memory pressure
+                    if (tracker->consumption() <=
+                        config::mem_tracker_limit_small_memory_task_bytes) {
+                        continue;
+                    }
                     if (tracker->is_query_cancelled()) {
                         canceling_task.push_back(fmt::format("{}:{} Bytes", 
tracker->label(),
                                                              
tracker->consumption()));
@@ -563,8 +568,9 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(
                 auto tracker = trackerWptr.lock();
                 if (tracker != nullptr && tracker->type() == type) {
                     seek_num++;
-                    // 32M small query does not cancel
-                    if (tracker->consumption() <= 33554432 ||
+                    // Use configurable small memory task threshold instead of 
hardcoded 32MB
+                    if (tracker->consumption() <=
+                                
config::mem_tracker_limit_small_memory_task_bytes ||
                         tracker->consumption() < tracker->limit()) {
                         small_num++;
                         continue;
diff --git 
a/regression-test/suites/fault_injection_p0/test_writer_fault_injection.groovy 
b/regression-test/suites/fault_injection_p0/test_writer_fault_injection.groovy
index 47c9da8d009..a4e43c40660 100644
--- 
a/regression-test/suites/fault_injection_p0/test_writer_fault_injection.groovy
+++ 
b/regression-test/suites/fault_injection_p0/test_writer_fault_injection.groovy
@@ -84,10 +84,33 @@ suite("test_writer_fault_injection", "nonConcurrent") {
         }
 
         // VTabletWriter close logic injection tests
-        // Test VNodeChannel close_wait with full gc injection
-        load_with_injection("VNodeChannel.close_wait_full_gc")
-        // Test VNodeChannel try_send_and_fetch_status with full gc injection
-        load_with_injection("VNodeChannel.try_send_and_fetch_status_full_gc")
+        def custoBeConfig = [
+            mem_tracker_limit_small_memory_task_bytes : 0
+        ]
+        setBeConfigTemporary(custoBeConfig) {
+
+        // Execute test logic with modified configuration for 
mem_tracker_limit_small_memory_task_bytes
+        logger.info("Backend configuration set - 
mem_tracker_limit_small_memory_task_bytes: 0")
+        // Waiting for backend configuration update
+        (1..20).each { count ->
+            Thread.sleep(1000)
+            def elapsedSec = count * 1000
+            def remainingSec = 20 - elapsedSec
+            logger.info("Waited for backend configuration update ${elapsedSec} 
seconds, ${remainingSec} seconds remaining")
+        }
+
+        // Check if the configuration is modified
+        def result = sql """SHOW BACKEND CONFIG LIKE 
'mem_tracker_limit_small_memory_task_bytes';"""
+        logger.info("mem_tracker_limit_small_memory_task_bytes configuration: 
" + result)
+        if (result[0][1] == "0") {
+            // Test VNodeChannel close_wait with full gc injection
+            load_with_injection("VNodeChannel.close_wait_full_gc")
+            // Test VNodeChannel try_send_and_fetch_status with full gc 
injection
+            
load_with_injection("VNodeChannel.try_send_and_fetch_status_full_gc")
+        } else {
+            logger.info("mem_tracker_limit_small_memory_task_bytes 
configuration is not modified, skip full_gc test")
+        }
+        }
         // Test VNodeChannel close_wait when cancelled
         load_with_injection("VNodeChannel.close_wait.cancelled")
         // Test IndexChannel close_wait with timeout


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to