This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new c18ef17f7e4 branch-4.0: [fix](load_stream) Fix use-after-free in 
TabletStream async lambdas #60148 (#60177)
c18ef17f7e4 is described below

commit c18ef17f7e46091e9e8cb2a1b633a7ce20bba3f9
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sat Jan 24 12:18:42 2026 +0800

    branch-4.0: [fix](load_stream) Fix use-after-free in TabletStream async 
lambdas #60148 (#60177)
    
    Cherry-picked from #60148
    
    Co-authored-by: Xin Liao <[email protected]>
---
 be/src/runtime/load_stream.cpp | 32 ++++++++++++++++++--------------
 be/src/runtime/load_stream.h   |  2 +-
 2 files changed, 19 insertions(+), 15 deletions(-)

diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index 42ac15ea51a..2a324f6a6ce 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -147,12 +147,14 @@ Status TabletStream::append_data(const PStreamHeader& 
header, butil::IOBuf* data
     uint32_t new_segid = mapping->at(segid);
     DCHECK(new_segid != std::numeric_limits<uint32_t>::max());
     butil::IOBuf buf = data->movable();
-    auto flush_func = [this, new_segid, eos, buf, header, file_type]() mutable 
{
-        signal::set_signal_task_id(_load_id);
+    auto self = shared_from_this();
+    auto flush_func = [self, new_segid, eos, buf, header, file_type]() mutable 
{
+        signal::set_signal_task_id(self->_load_id);
         g_load_stream_flush_running_threads << -1;
-        auto st = _load_stream_writer->append_data(new_segid, header.offset(), 
buf, file_type);
+        auto st =
+                self->_load_stream_writer->append_data(new_segid, 
header.offset(), buf, file_type);
         if (!st.ok() && !config::is_cloud_mode()) {
-            auto res = ExecEnv::get_tablet(_id);
+            auto res = ExecEnv::get_tablet(self->_id);
             TabletSharedPtr tablet =
                     res.has_value() ? 
std::dynamic_pointer_cast<Tablet>(res.value()) : nullptr;
             if (tablet) {
@@ -163,7 +165,7 @@ Status TabletStream::append_data(const PStreamHeader& 
header, butil::IOBuf* data
             DBUG_EXECUTE_IF("TabletStream.append_data.unknown_file_type",
                             { file_type = static_cast<FileType>(-1); });
             if (file_type == FileType::SEGMENT_FILE || file_type == 
FileType::INVERTED_INDEX_FILE) {
-                st = _load_stream_writer->close_writer(new_segid, file_type);
+                st = self->_load_stream_writer->close_writer(new_segid, 
file_type);
             } else {
                 st = Status::InternalError(
                         "appent data failed, file type error, file type = {}, "
@@ -174,8 +176,8 @@ Status TabletStream::append_data(const PStreamHeader& 
header, butil::IOBuf* data
         DBUG_EXECUTE_IF("TabletStream.append_data.append_failed",
                         { st = Status::InternalError("fault injection"); });
         if (!st.ok()) {
-            _status.update(st);
-            LOG(WARNING) << "write data failed " << st << ", " << *this;
+            self->_status.update(st);
+            LOG(WARNING) << "write data failed " << st << ", " << *self;
         }
     };
     auto load_stream_flush_token_max_tasks = 
config::load_stream_flush_token_max_tasks;
@@ -247,14 +249,15 @@ Status TabletStream::add_segment(const PStreamHeader& 
header, butil::IOBuf* data
     }
     DCHECK(new_segid != std::numeric_limits<uint32_t>::max());
 
-    auto add_segment_func = [this, new_segid, stat]() {
-        signal::set_signal_task_id(_load_id);
-        auto st = _load_stream_writer->add_segment(new_segid, stat);
+    auto self = shared_from_this();
+    auto add_segment_func = [self, new_segid, stat]() {
+        signal::set_signal_task_id(self->_load_id);
+        auto st = self->_load_stream_writer->add_segment(new_segid, stat);
         DBUG_EXECUTE_IF("TabletStream.add_segment.add_segment_failed",
                         { st = Status::InternalError("fault injection"); });
         if (!st.ok()) {
-            _status.update(st);
-            LOG(INFO) << "add segment failed " << *this;
+            self->_status.update(st);
+            LOG(INFO) << "add segment failed " << *self;
         }
     };
     Status st = Status::OK();
@@ -274,8 +277,9 @@ Status 
TabletStream::_run_in_heavy_work_pool(std::function<Status()> fn) {
     std::unique_lock<bthread::Mutex> lock(mu);
     bthread::ConditionVariable cv;
     auto st = Status::OK();
-    auto func = [this, &mu, &cv, &st, &fn] {
-        signal::set_signal_task_id(_load_id);
+    auto self = shared_from_this();
+    auto func = [self, &mu, &cv, &st, &fn] {
+        signal::set_signal_task_id(self->_load_id);
         st = fn();
         std::lock_guard<bthread::Mutex> lock(mu);
         cv.notify_one();
diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h
index 9aefa3d9093..d3f6e02558e 100644
--- a/be/src/runtime/load_stream.h
+++ b/be/src/runtime/load_stream.h
@@ -42,7 +42,7 @@ class OlapTableSchemaParam;
 // origin_segid(index) -> new_segid(value in vector)
 using SegIdMapping = std::vector<uint32_t>;
 using FailedTablets = std::vector<std::pair<int64_t, Status>>;
-class TabletStream {
+class TabletStream : public std::enable_shared_from_this<TabletStream> {
 public:
     TabletStream(const PUniqueId& load_id, int64_t id, int64_t txn_id,
                  LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to