This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 947f4952199 branch-4.0: [fix](load_stream) Fix flush token deadlock by 
ensuring wait_for_flush_tasks is called before destruction #60284 (#60285)
947f4952199 is described below

commit 947f4952199a647b2b3b97ca9336853e6411b57c
Author: Xin Liao <[email protected]>
AuthorDate: Wed Jan 28 14:36:52 2026 +0800

    branch-4.0: [fix](load_stream) Fix flush token deadlock by ensuring 
wait_for_flush_tasks is called before destruction #60284 (#60285)
    
    pick from:#60284
---
 be/src/runtime/load_stream.cpp | 77 ++++++++++++++++++++++++++++--------------
 be/src/runtime/load_stream.h   |  7 +++-
 2 files changed, 57 insertions(+), 27 deletions(-)

diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index 2a324f6a6ce..f94e7864377 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -147,14 +147,12 @@ Status TabletStream::append_data(const PStreamHeader& 
header, butil::IOBuf* data
     uint32_t new_segid = mapping->at(segid);
     DCHECK(new_segid != std::numeric_limits<uint32_t>::max());
     butil::IOBuf buf = data->movable();
-    auto self = shared_from_this();
-    auto flush_func = [self, new_segid, eos, buf, header, file_type]() mutable 
{
-        signal::set_signal_task_id(self->_load_id);
+    auto flush_func = [this, new_segid, eos, buf, header, file_type]() mutable 
{
+        signal::set_signal_task_id(_load_id);
         g_load_stream_flush_running_threads << -1;
-        auto st =
-                self->_load_stream_writer->append_data(new_segid, 
header.offset(), buf, file_type);
+        auto st = _load_stream_writer->append_data(new_segid, header.offset(), 
buf, file_type);
         if (!st.ok() && !config::is_cloud_mode()) {
-            auto res = ExecEnv::get_tablet(self->_id);
+            auto res = ExecEnv::get_tablet(_id);
             TabletSharedPtr tablet =
                     res.has_value() ? 
std::dynamic_pointer_cast<Tablet>(res.value()) : nullptr;
             if (tablet) {
@@ -165,7 +163,7 @@ Status TabletStream::append_data(const PStreamHeader& 
header, butil::IOBuf* data
             DBUG_EXECUTE_IF("TabletStream.append_data.unknown_file_type",
                             { file_type = static_cast<FileType>(-1); });
             if (file_type == FileType::SEGMENT_FILE || file_type == 
FileType::INVERTED_INDEX_FILE) {
-                st = self->_load_stream_writer->close_writer(new_segid, 
file_type);
+                st = _load_stream_writer->close_writer(new_segid, file_type);
             } else {
                 st = Status::InternalError(
                         "appent data failed, file type error, file type = {}, "
@@ -176,8 +174,8 @@ Status TabletStream::append_data(const PStreamHeader& 
header, butil::IOBuf* data
         DBUG_EXECUTE_IF("TabletStream.append_data.append_failed",
                         { st = Status::InternalError("fault injection"); });
         if (!st.ok()) {
-            self->_status.update(st);
-            LOG(WARNING) << "write data failed " << st << ", " << *self;
+            _status.update(st);
+            LOG(WARNING) << "write data failed " << st << ", " << *this;
         }
     };
     auto load_stream_flush_token_max_tasks = 
config::load_stream_flush_token_max_tasks;
@@ -249,15 +247,14 @@ Status TabletStream::add_segment(const PStreamHeader& 
header, butil::IOBuf* data
     }
     DCHECK(new_segid != std::numeric_limits<uint32_t>::max());
 
-    auto self = shared_from_this();
-    auto add_segment_func = [self, new_segid, stat]() {
-        signal::set_signal_task_id(self->_load_id);
-        auto st = self->_load_stream_writer->add_segment(new_segid, stat);
+    auto add_segment_func = [this, new_segid, stat]() {
+        signal::set_signal_task_id(_load_id);
+        auto st = _load_stream_writer->add_segment(new_segid, stat);
         DBUG_EXECUTE_IF("TabletStream.add_segment.add_segment_failed",
                         { st = Status::InternalError("fault injection"); });
         if (!st.ok()) {
-            self->_status.update(st);
-            LOG(INFO) << "add segment failed " << *self;
+            _status.update(st);
+            LOG(INFO) << "add segment failed " << *this;
         }
     };
     Status st = Status::OK();
@@ -277,9 +274,8 @@ Status 
TabletStream::_run_in_heavy_work_pool(std::function<Status()> fn) {
     std::unique_lock<bthread::Mutex> lock(mu);
     bthread::ConditionVariable cv;
     auto st = Status::OK();
-    auto self = shared_from_this();
-    auto func = [self, &mu, &cv, &st, &fn] {
-        signal::set_signal_task_id(self->_load_id);
+    auto func = [this, &mu, &cv, &st, &fn] {
+        signal::set_signal_task_id(_load_id);
         st = fn();
         std::lock_guard<bthread::Mutex> lock(mu);
         cv.notify_one();
@@ -293,21 +289,40 @@ Status 
TabletStream::_run_in_heavy_work_pool(std::function<Status()> fn) {
     return st;
 }
 
-void TabletStream::pre_close() {
+void TabletStream::wait_for_flush_tasks() {
+    {
+        std::lock_guard lock_guard(_lock);
+        if (_flush_tasks_done) {
+            return;
+        }
+        _flush_tasks_done = true;
+    }
+
     if (!_status.ok()) {
-        // cancel all pending tasks, wait all running tasks to finish
         _flush_token->shutdown();
         return;
     }
 
-    SCOPED_TIMER(_close_wait_timer);
-    _status.update(_run_in_heavy_work_pool([this]() {
+    // Note: Do not use SCOPED_TIMER here because this function may be called
+    // from IndexStream::~IndexStream() during LoadStream destruction, at which
+    // point the RuntimeProfile (and _close_wait_timer) may already be 
destroyed.
+    // Use heavy_work_pool to avoid blocking bthread
+    auto st = _run_in_heavy_work_pool([this]() {
         _flush_token->wait();
         return Status::OK();
-    }));
-    // it is necessary to check status after wait_func,
-    // for create_rowset could fail during add_segment when loading to MOW 
table,
-    // in this case, should skip close to avoid submit_calc_delete_bitmap_task 
which could cause coredump.
+    });
+    if (!st.ok()) {
+        // If heavy_work_pool is unavailable, fall back to shutdown
+        // which will cancel pending tasks and wait for running tasks
+        _flush_token->shutdown();
+        _status.update(st);
+    }
+}
+
+void TabletStream::pre_close() {
+    SCOPED_TIMER(_close_wait_timer);
+    wait_for_flush_tasks();
+
     if (!_status.ok()) {
         return;
     }
@@ -346,6 +361,16 @@ IndexStream::IndexStream(const PUniqueId& load_id, int64_t 
id, int64_t txn_id,
     _close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime");
 }
 
+IndexStream::~IndexStream() {
+    // Ensure all TabletStreams have their flush tokens properly handled 
before destruction.
+    // In normal flow, close() should have called pre_close() on all tablet 
streams.
+    // But if IndexStream is destroyed without close() being called (e.g., 
on_idle_timeout),
+    // we need to wait for flush tasks here to ensure flush tokens are 
properly shut down.
+    for (auto& [_, tablet_stream] : _tablet_streams_map) {
+        tablet_stream->wait_for_flush_tasks();
+    }
+}
+
 Status IndexStream::append_data(const PStreamHeader& header, butil::IOBuf* 
data) {
     SCOPED_TIMER(_append_data_timer);
     int64_t tablet_id = header.tablet_id();
diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h
index d3f6e02558e..be147f9415c 100644
--- a/be/src/runtime/load_stream.h
+++ b/be/src/runtime/load_stream.h
@@ -42,7 +42,7 @@ class OlapTableSchemaParam;
 // origin_segid(index) -> new_segid(value in vector)
 using SegIdMapping = std::vector<uint32_t>;
 using FailedTablets = std::vector<std::pair<int64_t, Status>>;
-class TabletStream : public std::enable_shared_from_this<TabletStream> {
+class TabletStream {
 public:
     TabletStream(const PUniqueId& load_id, int64_t id, int64_t txn_id,
                  LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile);
@@ -54,6 +54,9 @@ public:
     Status add_segment(const PStreamHeader& header, butil::IOBuf* data);
     void add_num_segments(int64_t num_segments) { _num_segments += 
num_segments; }
     void disable_num_segments_check() { _check_num_segments = false; }
+    // Wait for all pending flush tasks to complete and shut down the flush 
token.
+    // Safe to call multiple times.
+    void wait_for_flush_tasks();
     void pre_close();
     Status close();
     int64_t id() const { return _id; }
@@ -70,6 +73,7 @@ private:
     std::atomic<uint32_t> _next_segid;
     int64_t _num_segments = 0;
     bool _check_num_segments = true;
+    bool _flush_tasks_done = false;
     bthread::Mutex _lock;
     AtomicStatus _status;
     PUniqueId _load_id;
@@ -88,6 +92,7 @@ public:
     IndexStream(const PUniqueId& load_id, int64_t id, int64_t txn_id,
                 std::shared_ptr<OlapTableSchemaParam> schema, LoadStreamMgr* 
load_stream_mgr,
                 RuntimeProfile* profile);
+    ~IndexStream();
 
     Status append_data(const PStreamHeader& header, butil::IOBuf* data);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to