This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 0476118c73b branch-3.1: [fix](load) allow memtable to detect 
cancellation while blocked by memory limiter #53070 (#54792)
0476118c73b is described below

commit 0476118c73bcad99793253d4a46257d7f5125152
Author: Kaijie Chen <[email protected]>
AuthorDate: Wed Aug 20 17:40:26 2025 +0800

    branch-3.1: [fix](load) allow memtable to detect cancellation while blocked 
by memory limiter #53070 (#54792)
    
    Backport #53070
---
 be/src/olap/memtable_memory_limiter.cpp       | 6 +++++-
 be/src/olap/memtable_memory_limiter.h         | 4 +++-
 be/src/runtime/load_channel.cpp               | 1 +
 be/src/runtime/load_channel.h                 | 3 +++
 be/src/runtime/load_channel_mgr.cpp           | 6 +++++-
 be/src/vec/sink/writer/vtablet_writer_v2.cpp  | 6 +++++-
 be/test/olap/memtable_memory_limiter_test.cpp | 2 +-
 7 files changed, 23 insertions(+), 5 deletions(-)

diff --git a/be/src/olap/memtable_memory_limiter.cpp 
b/be/src/olap/memtable_memory_limiter.cpp
index 35700aa36d6..7defd1a557d 100644
--- a/be/src/olap/memtable_memory_limiter.cpp
+++ b/be/src/olap/memtable_memory_limiter.cpp
@@ -118,7 +118,7 @@ int64_t MemTableMemoryLimiter::_need_flush() {
     return need_flush - _queue_mem_usage - _flush_mem_usage;
 }
 
-void MemTableMemoryLimiter::handle_memtable_flush() {
+void MemTableMemoryLimiter::handle_memtable_flush(std::function<bool()> 
cancel_check) {
     // Check the soft limit.
     DCHECK(_load_soft_mem_limit > 0);
     do {
@@ -142,6 +142,10 @@ void MemTableMemoryLimiter::handle_memtable_flush() {
                 LOG(INFO) << "timeout when waiting for memory hard limit end, 
try again";
             }
         }
+        if (cancel_check && cancel_check()) {
+            LOG(INFO) << "cancelled when waiting for memtable flush";
+            return;
+        }
         first = false;
         int64_t need_flush = _need_flush();
         if (need_flush > 0) {
diff --git a/be/src/olap/memtable_memory_limiter.h 
b/be/src/olap/memtable_memory_limiter.h
index 1e32cb165e4..52f842a2c31 100644
--- a/be/src/olap/memtable_memory_limiter.h
+++ b/be/src/olap/memtable_memory_limiter.h
@@ -19,6 +19,8 @@
 
 #include <stdint.h>
 
+#include <functional>
+
 #include "common/status.h"
 #include "runtime/memory/mem_tracker.h"
 #include "util/countdown_latch.h"
@@ -39,7 +41,7 @@ public:
 
     // check if the total mem consumption exceeds limit.
     // If yes, it will flush memtable to try to reduce memory consumption.
-    void handle_memtable_flush();
+    void handle_memtable_flush(std::function<bool()> cancel_check);
 
     void register_writer(std::weak_ptr<MemTableWriter> writer);
 
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index cf10d3d66e8..326b7cc6031 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -292,6 +292,7 @@ bool LoadChannel::is_finished() {
 }
 
 Status LoadChannel::cancel() {
+    _cancelled.store(true);
     std::lock_guard<std::mutex> l(_lock);
     for (auto& it : _tablets_channels) {
         static_cast<void>(it.second->cancel());
diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h
index 8b074245d46..9cd0c902129 100644
--- a/be/src/runtime/load_channel.h
+++ b/be/src/runtime/load_channel.h
@@ -68,6 +68,8 @@ public:
 
     bool is_high_priority() const { return _is_high_priority; }
 
+    bool is_cancelled() const { return _cancelled.load(); }
+
     RuntimeProfile::Counter* get_mgr_add_batch_timer() { return 
_mgr_add_batch_timer; }
     RuntimeProfile::Counter* get_handle_mem_limit_timer() { return 
_handle_mem_limit_timer; }
 
@@ -107,6 +109,7 @@ private:
     std::unordered_set<int64_t> _finished_channel_ids;
     // set to true if at least one tablets channel has been opened
     bool _opened = false;
+    std::atomic<bool> _cancelled {false};
 
     QueryThreadContext _query_thread_context;
 
diff --git a/be/src/runtime/load_channel_mgr.cpp 
b/be/src/runtime/load_channel_mgr.cpp
index 014fa146e3c..0bb352d2feb 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -152,7 +152,11 @@ Status LoadChannelMgr::add_batch(const 
PTabletWriterAddBlockRequest& request,
         // If this is a high priority load task, do not handle this.
         // because this may block for a while, which may lead to rpc timeout.
         SCOPED_TIMER(channel->get_handle_mem_limit_timer());
-        
ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush();
+        
ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush(
+                [channel]() { return channel->is_cancelled(); });
+        if (channel->is_cancelled()) {
+            return Status::Cancelled("LoadChannel has been cancelled: {}.", 
load_id.to_string());
+        }
     }
 
     // 3. add batch to load channel
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 187f6160aff..08a791457c0 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -555,7 +555,11 @@ Status 
VTabletWriterV2::_write_memtable(std::shared_ptr<vectorized::Block> block
     }
     {
         SCOPED_TIMER(_wait_mem_limit_timer);
-        
ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush();
+        
ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush(
+                [state = _state]() { return state->is_cancelled(); });
+        if (_state->is_cancelled()) {
+            return _state->cancel_reason();
+        }
     }
     SCOPED_TIMER(_write_memtable_timer);
     st = delta_writer->write(block.get(), rows.row_idxes);
diff --git a/be/test/olap/memtable_memory_limiter_test.cpp 
b/be/test/olap/memtable_memory_limiter_test.cpp
index 14eae847163..06f98332862 100644
--- a/be/test/olap/memtable_memory_limiter_test.cpp
+++ b/be/test/olap/memtable_memory_limiter_test.cpp
@@ -170,7 +170,7 @@ TEST_F(MemTableMemoryLimiterTest, 
handle_memtable_flush_test) {
         ASSERT_TRUE(res.ok());
     }
     static_cast<void>(mem_limiter->init(100));
-    mem_limiter->handle_memtable_flush();
+    mem_limiter->handle_memtable_flush(nullptr);
     CHECK_EQ(0, mem_limiter->mem_usage());
 
     res = delta_writer->close();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to