This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 0476118c73b branch-3.1: [fix](load) allow memtable to detect
cancellation while blocked by memory limiter #53070 (#54792)
0476118c73b is described below
commit 0476118c73bcad99793253d4a46257d7f5125152
Author: Kaijie Chen <[email protected]>
AuthorDate: Wed Aug 20 17:40:26 2025 +0800
branch-3.1: [fix](load) allow memtable to detect cancellation while blocked
by memory limiter #53070 (#54792)
Backport #53070
---
be/src/olap/memtable_memory_limiter.cpp | 6 +++++-
be/src/olap/memtable_memory_limiter.h | 4 +++-
be/src/runtime/load_channel.cpp | 1 +
be/src/runtime/load_channel.h | 3 +++
be/src/runtime/load_channel_mgr.cpp | 6 +++++-
be/src/vec/sink/writer/vtablet_writer_v2.cpp | 6 +++++-
be/test/olap/memtable_memory_limiter_test.cpp | 2 +-
7 files changed, 23 insertions(+), 5 deletions(-)
diff --git a/be/src/olap/memtable_memory_limiter.cpp
b/be/src/olap/memtable_memory_limiter.cpp
index 35700aa36d6..7defd1a557d 100644
--- a/be/src/olap/memtable_memory_limiter.cpp
+++ b/be/src/olap/memtable_memory_limiter.cpp
@@ -118,7 +118,7 @@ int64_t MemTableMemoryLimiter::_need_flush() {
return need_flush - _queue_mem_usage - _flush_mem_usage;
}
-void MemTableMemoryLimiter::handle_memtable_flush() {
+void MemTableMemoryLimiter::handle_memtable_flush(std::function<bool()>
cancel_check) {
// Check the soft limit.
DCHECK(_load_soft_mem_limit > 0);
do {
@@ -142,6 +142,10 @@ void MemTableMemoryLimiter::handle_memtable_flush() {
LOG(INFO) << "timeout when waiting for memory hard limit end,
try again";
}
}
+ if (cancel_check && cancel_check()) {
+ LOG(INFO) << "cancelled when waiting for memtable flush";
+ return;
+ }
first = false;
int64_t need_flush = _need_flush();
if (need_flush > 0) {
diff --git a/be/src/olap/memtable_memory_limiter.h
b/be/src/olap/memtable_memory_limiter.h
index 1e32cb165e4..52f842a2c31 100644
--- a/be/src/olap/memtable_memory_limiter.h
+++ b/be/src/olap/memtable_memory_limiter.h
@@ -19,6 +19,8 @@
#include <stdint.h>
+#include <functional>
+
#include "common/status.h"
#include "runtime/memory/mem_tracker.h"
#include "util/countdown_latch.h"
@@ -39,7 +41,7 @@ public:
// check if the total mem consumption exceeds limit.
// If yes, it will flush memtable to try to reduce memory consumption.
- void handle_memtable_flush();
+ void handle_memtable_flush(std::function<bool()> cancel_check);
void register_writer(std::weak_ptr<MemTableWriter> writer);
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index cf10d3d66e8..326b7cc6031 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -292,6 +292,7 @@ bool LoadChannel::is_finished() {
}
Status LoadChannel::cancel() {
+ _cancelled.store(true);
std::lock_guard<std::mutex> l(_lock);
for (auto& it : _tablets_channels) {
static_cast<void>(it.second->cancel());
diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h
index 8b074245d46..9cd0c902129 100644
--- a/be/src/runtime/load_channel.h
+++ b/be/src/runtime/load_channel.h
@@ -68,6 +68,8 @@ public:
bool is_high_priority() const { return _is_high_priority; }
+ bool is_cancelled() const { return _cancelled.load(); }
+
RuntimeProfile::Counter* get_mgr_add_batch_timer() { return
_mgr_add_batch_timer; }
RuntimeProfile::Counter* get_handle_mem_limit_timer() { return
_handle_mem_limit_timer; }
@@ -107,6 +109,7 @@ private:
std::unordered_set<int64_t> _finished_channel_ids;
// set to true if at least one tablets channel has been opened
bool _opened = false;
+ std::atomic<bool> _cancelled {false};
QueryThreadContext _query_thread_context;
diff --git a/be/src/runtime/load_channel_mgr.cpp
b/be/src/runtime/load_channel_mgr.cpp
index 014fa146e3c..0bb352d2feb 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -152,7 +152,11 @@ Status LoadChannelMgr::add_batch(const
PTabletWriterAddBlockRequest& request,
// If this is a high priority load task, do not handle this.
// because this may block for a while, which may lead to rpc timeout.
SCOPED_TIMER(channel->get_handle_mem_limit_timer());
-
ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush();
+
ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush(
+ [channel]() { return channel->is_cancelled(); });
+ if (channel->is_cancelled()) {
+ return Status::Cancelled("LoadChannel has been cancelled: {}.",
load_id.to_string());
+ }
}
// 3. add batch to load channel
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 187f6160aff..08a791457c0 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -555,7 +555,11 @@ Status
VTabletWriterV2::_write_memtable(std::shared_ptr<vectorized::Block> block
}
{
SCOPED_TIMER(_wait_mem_limit_timer);
-
ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush();
+
ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush(
+ [state = _state]() { return state->is_cancelled(); });
+ if (_state->is_cancelled()) {
+ return _state->cancel_reason();
+ }
}
SCOPED_TIMER(_write_memtable_timer);
st = delta_writer->write(block.get(), rows.row_idxes);
diff --git a/be/test/olap/memtable_memory_limiter_test.cpp
b/be/test/olap/memtable_memory_limiter_test.cpp
index 14eae847163..06f98332862 100644
--- a/be/test/olap/memtable_memory_limiter_test.cpp
+++ b/be/test/olap/memtable_memory_limiter_test.cpp
@@ -170,7 +170,7 @@ TEST_F(MemTableMemoryLimiterTest,
handle_memtable_flush_test) {
ASSERT_TRUE(res.ok());
}
static_cast<void>(mem_limiter->init(100));
- mem_limiter->handle_memtable_flush();
+ mem_limiter->handle_memtable_flush(nullptr);
CHECK_EQ(0, mem_limiter->mem_usage());
res = delta_writer->close();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]