This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new c18ef17f7e4 branch-4.0: [fix](load_stream) Fix use-after-free in
TabletStream async lambdas #60148 (#60177)
c18ef17f7e4 is described below
commit c18ef17f7e46091e9e8cb2a1b633a7ce20bba3f9
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sat Jan 24 12:18:42 2026 +0800
branch-4.0: [fix](load_stream) Fix use-after-free in TabletStream async
lambdas #60148 (#60177)
Cherry-picked from #60148
Co-authored-by: Xin Liao <[email protected]>
---
be/src/runtime/load_stream.cpp | 32 ++++++++++++++++++--------------
be/src/runtime/load_stream.h | 2 +-
2 files changed, 19 insertions(+), 15 deletions(-)
diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index 42ac15ea51a..2a324f6a6ce 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -147,12 +147,14 @@ Status TabletStream::append_data(const PStreamHeader&
header, butil::IOBuf* data
uint32_t new_segid = mapping->at(segid);
DCHECK(new_segid != std::numeric_limits<uint32_t>::max());
butil::IOBuf buf = data->movable();
- auto flush_func = [this, new_segid, eos, buf, header, file_type]() mutable
{
- signal::set_signal_task_id(_load_id);
+ auto self = shared_from_this();
+ auto flush_func = [self, new_segid, eos, buf, header, file_type]() mutable
{
+ signal::set_signal_task_id(self->_load_id);
g_load_stream_flush_running_threads << -1;
- auto st = _load_stream_writer->append_data(new_segid, header.offset(),
buf, file_type);
+ auto st =
+ self->_load_stream_writer->append_data(new_segid,
header.offset(), buf, file_type);
if (!st.ok() && !config::is_cloud_mode()) {
- auto res = ExecEnv::get_tablet(_id);
+ auto res = ExecEnv::get_tablet(self->_id);
TabletSharedPtr tablet =
res.has_value() ?
std::dynamic_pointer_cast<Tablet>(res.value()) : nullptr;
if (tablet) {
@@ -163,7 +165,7 @@ Status TabletStream::append_data(const PStreamHeader&
header, butil::IOBuf* data
DBUG_EXECUTE_IF("TabletStream.append_data.unknown_file_type",
{ file_type = static_cast<FileType>(-1); });
if (file_type == FileType::SEGMENT_FILE || file_type ==
FileType::INVERTED_INDEX_FILE) {
- st = _load_stream_writer->close_writer(new_segid, file_type);
+ st = self->_load_stream_writer->close_writer(new_segid,
file_type);
} else {
st = Status::InternalError(
"appent data failed, file type error, file type = {}, "
@@ -174,8 +176,8 @@ Status TabletStream::append_data(const PStreamHeader&
header, butil::IOBuf* data
DBUG_EXECUTE_IF("TabletStream.append_data.append_failed",
{ st = Status::InternalError("fault injection"); });
if (!st.ok()) {
- _status.update(st);
- LOG(WARNING) << "write data failed " << st << ", " << *this;
+ self->_status.update(st);
+ LOG(WARNING) << "write data failed " << st << ", " << *self;
}
};
auto load_stream_flush_token_max_tasks =
config::load_stream_flush_token_max_tasks;
@@ -247,14 +249,15 @@ Status TabletStream::add_segment(const PStreamHeader&
header, butil::IOBuf* data
}
DCHECK(new_segid != std::numeric_limits<uint32_t>::max());
- auto add_segment_func = [this, new_segid, stat]() {
- signal::set_signal_task_id(_load_id);
- auto st = _load_stream_writer->add_segment(new_segid, stat);
+ auto self = shared_from_this();
+ auto add_segment_func = [self, new_segid, stat]() {
+ signal::set_signal_task_id(self->_load_id);
+ auto st = self->_load_stream_writer->add_segment(new_segid, stat);
DBUG_EXECUTE_IF("TabletStream.add_segment.add_segment_failed",
{ st = Status::InternalError("fault injection"); });
if (!st.ok()) {
- _status.update(st);
- LOG(INFO) << "add segment failed " << *this;
+ self->_status.update(st);
+ LOG(INFO) << "add segment failed " << *self;
}
};
Status st = Status::OK();
@@ -274,8 +277,9 @@ Status
TabletStream::_run_in_heavy_work_pool(std::function<Status()> fn) {
std::unique_lock<bthread::Mutex> lock(mu);
bthread::ConditionVariable cv;
auto st = Status::OK();
- auto func = [this, &mu, &cv, &st, &fn] {
- signal::set_signal_task_id(_load_id);
+ auto self = shared_from_this();
+ auto func = [self, &mu, &cv, &st, &fn] {
+ signal::set_signal_task_id(self->_load_id);
st = fn();
std::lock_guard<bthread::Mutex> lock(mu);
cv.notify_one();
diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h
index 9aefa3d9093..d3f6e02558e 100644
--- a/be/src/runtime/load_stream.h
+++ b/be/src/runtime/load_stream.h
@@ -42,7 +42,7 @@ class OlapTableSchemaParam;
// origin_segid(index) -> new_segid(value in vector)
using SegIdMapping = std::vector<uint32_t>;
using FailedTablets = std::vector<std::pair<int64_t, Status>>;
-class TabletStream {
+class TabletStream : public std::enable_shared_from_this<TabletStream> {
public:
TabletStream(const PUniqueId& load_id, int64_t id, int64_t txn_id,
LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]