This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 947f4952199 branch-4.0: [fix](load_stream) Fix flush token deadlock by
ensuring wait_for_flush_tasks is called before destruction #60284 (#60285)
947f4952199 is described below
commit 947f4952199a647b2b3b97ca9336853e6411b57c
Author: Xin Liao <[email protected]>
AuthorDate: Wed Jan 28 14:36:52 2026 +0800
branch-4.0: [fix](load_stream) Fix flush token deadlock by ensuring
wait_for_flush_tasks is called before destruction #60284 (#60285)
pick from:#60284
---
be/src/runtime/load_stream.cpp | 77 ++++++++++++++++++++++++++++--------------
be/src/runtime/load_stream.h | 7 +++-
2 files changed, 57 insertions(+), 27 deletions(-)
diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index 2a324f6a6ce..f94e7864377 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -147,14 +147,12 @@ Status TabletStream::append_data(const PStreamHeader&
header, butil::IOBuf* data
uint32_t new_segid = mapping->at(segid);
DCHECK(new_segid != std::numeric_limits<uint32_t>::max());
butil::IOBuf buf = data->movable();
- auto self = shared_from_this();
- auto flush_func = [self, new_segid, eos, buf, header, file_type]() mutable
{
- signal::set_signal_task_id(self->_load_id);
+ auto flush_func = [this, new_segid, eos, buf, header, file_type]() mutable
{
+ signal::set_signal_task_id(_load_id);
g_load_stream_flush_running_threads << -1;
- auto st =
- self->_load_stream_writer->append_data(new_segid,
header.offset(), buf, file_type);
+ auto st = _load_stream_writer->append_data(new_segid, header.offset(),
buf, file_type);
if (!st.ok() && !config::is_cloud_mode()) {
- auto res = ExecEnv::get_tablet(self->_id);
+ auto res = ExecEnv::get_tablet(_id);
TabletSharedPtr tablet =
res.has_value() ?
std::dynamic_pointer_cast<Tablet>(res.value()) : nullptr;
if (tablet) {
@@ -165,7 +163,7 @@ Status TabletStream::append_data(const PStreamHeader&
header, butil::IOBuf* data
DBUG_EXECUTE_IF("TabletStream.append_data.unknown_file_type",
{ file_type = static_cast<FileType>(-1); });
if (file_type == FileType::SEGMENT_FILE || file_type ==
FileType::INVERTED_INDEX_FILE) {
- st = self->_load_stream_writer->close_writer(new_segid,
file_type);
+ st = _load_stream_writer->close_writer(new_segid, file_type);
} else {
st = Status::InternalError(
"appent data failed, file type error, file type = {}, "
@@ -176,8 +174,8 @@ Status TabletStream::append_data(const PStreamHeader&
header, butil::IOBuf* data
DBUG_EXECUTE_IF("TabletStream.append_data.append_failed",
{ st = Status::InternalError("fault injection"); });
if (!st.ok()) {
- self->_status.update(st);
- LOG(WARNING) << "write data failed " << st << ", " << *self;
+ _status.update(st);
+ LOG(WARNING) << "write data failed " << st << ", " << *this;
}
};
auto load_stream_flush_token_max_tasks =
config::load_stream_flush_token_max_tasks;
@@ -249,15 +247,14 @@ Status TabletStream::add_segment(const PStreamHeader&
header, butil::IOBuf* data
}
DCHECK(new_segid != std::numeric_limits<uint32_t>::max());
- auto self = shared_from_this();
- auto add_segment_func = [self, new_segid, stat]() {
- signal::set_signal_task_id(self->_load_id);
- auto st = self->_load_stream_writer->add_segment(new_segid, stat);
+ auto add_segment_func = [this, new_segid, stat]() {
+ signal::set_signal_task_id(_load_id);
+ auto st = _load_stream_writer->add_segment(new_segid, stat);
DBUG_EXECUTE_IF("TabletStream.add_segment.add_segment_failed",
{ st = Status::InternalError("fault injection"); });
if (!st.ok()) {
- self->_status.update(st);
- LOG(INFO) << "add segment failed " << *self;
+ _status.update(st);
+ LOG(INFO) << "add segment failed " << *this;
}
};
Status st = Status::OK();
@@ -277,9 +274,8 @@ Status
TabletStream::_run_in_heavy_work_pool(std::function<Status()> fn) {
std::unique_lock<bthread::Mutex> lock(mu);
bthread::ConditionVariable cv;
auto st = Status::OK();
- auto self = shared_from_this();
- auto func = [self, &mu, &cv, &st, &fn] {
- signal::set_signal_task_id(self->_load_id);
+ auto func = [this, &mu, &cv, &st, &fn] {
+ signal::set_signal_task_id(_load_id);
st = fn();
std::lock_guard<bthread::Mutex> lock(mu);
cv.notify_one();
@@ -293,21 +289,40 @@ Status
TabletStream::_run_in_heavy_work_pool(std::function<Status()> fn) {
return st;
}
-void TabletStream::pre_close() {
+void TabletStream::wait_for_flush_tasks() {
+ {
+ std::lock_guard lock_guard(_lock);
+ if (_flush_tasks_done) {
+ return;
+ }
+ _flush_tasks_done = true;
+ }
+
if (!_status.ok()) {
- // cancel all pending tasks, wait all running tasks to finish
_flush_token->shutdown();
return;
}
- SCOPED_TIMER(_close_wait_timer);
- _status.update(_run_in_heavy_work_pool([this]() {
+ // Note: Do not use SCOPED_TIMER here because this function may be called
+ // from IndexStream::~IndexStream() during LoadStream destruction, at which
+ // point the RuntimeProfile (and _close_wait_timer) may already be
destroyed.
+ // Use heavy_work_pool to avoid blocking bthread
+ auto st = _run_in_heavy_work_pool([this]() {
_flush_token->wait();
return Status::OK();
- }));
- // it is necessary to check status after wait_func,
- // for create_rowset could fail during add_segment when loading to MOW
table,
- // in this case, should skip close to avoid submit_calc_delete_bitmap_task
which could cause coredump.
+ });
+ if (!st.ok()) {
+ // If heavy_work_pool is unavailable, fall back to shutdown
+ // which will cancel pending tasks and wait for running tasks
+ _flush_token->shutdown();
+ _status.update(st);
+ }
+}
+
+void TabletStream::pre_close() {
+ SCOPED_TIMER(_close_wait_timer);
+ wait_for_flush_tasks();
+
if (!_status.ok()) {
return;
}
@@ -346,6 +361,16 @@ IndexStream::IndexStream(const PUniqueId& load_id, int64_t
id, int64_t txn_id,
_close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime");
}
+IndexStream::~IndexStream() {
+ // Ensure all TabletStreams have their flush tokens properly handled
before destruction.
+ // In normal flow, close() should have called pre_close() on all tablet
streams.
+ // But if IndexStream is destroyed without close() being called (e.g.,
on_idle_timeout),
+ // we need to wait for flush tasks here to ensure flush tokens are
properly shut down.
+ for (auto& [_, tablet_stream] : _tablet_streams_map) {
+ tablet_stream->wait_for_flush_tasks();
+ }
+}
+
Status IndexStream::append_data(const PStreamHeader& header, butil::IOBuf*
data) {
SCOPED_TIMER(_append_data_timer);
int64_t tablet_id = header.tablet_id();
diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h
index d3f6e02558e..be147f9415c 100644
--- a/be/src/runtime/load_stream.h
+++ b/be/src/runtime/load_stream.h
@@ -42,7 +42,7 @@ class OlapTableSchemaParam;
// origin_segid(index) -> new_segid(value in vector)
using SegIdMapping = std::vector<uint32_t>;
using FailedTablets = std::vector<std::pair<int64_t, Status>>;
-class TabletStream : public std::enable_shared_from_this<TabletStream> {
+class TabletStream {
public:
TabletStream(const PUniqueId& load_id, int64_t id, int64_t txn_id,
LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile);
@@ -54,6 +54,9 @@ public:
Status add_segment(const PStreamHeader& header, butil::IOBuf* data);
void add_num_segments(int64_t num_segments) { _num_segments +=
num_segments; }
void disable_num_segments_check() { _check_num_segments = false; }
+ // Wait for all pending flush tasks to complete and shut down the flush
token.
+ // Safe to call multiple times.
+ void wait_for_flush_tasks();
void pre_close();
Status close();
int64_t id() const { return _id; }
@@ -70,6 +73,7 @@ private:
std::atomic<uint32_t> _next_segid;
int64_t _num_segments = 0;
bool _check_num_segments = true;
+ bool _flush_tasks_done = false;
bthread::Mutex _lock;
AtomicStatus _status;
PUniqueId _load_id;
@@ -88,6 +92,7 @@ public:
IndexStream(const PUniqueId& load_id, int64_t id, int64_t txn_id,
std::shared_ptr<OlapTableSchemaParam> schema, LoadStreamMgr*
load_stream_mgr,
RuntimeProfile* profile);
+ ~IndexStream();
Status append_data(const PStreamHeader& header, butil::IOBuf* data);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]