This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 03a1d4ed70c branch-4.1: [fix](load) replace tablet writer close 
polling with event wakeup #64221 (#64747)
03a1d4ed70c is described below

commit 03a1d4ed70cf373a69e25db4431742c3f37cb8f6
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Jun 24 14:51:06 2026 +0800

    branch-4.1: [fix](load) replace tablet writer close polling with event 
wakeup #64221 (#64747)
    
    Cherry-picked from #64221
    
    Co-authored-by: hui lai <[email protected]>
---
 be/src/exec/sink/load_stream_map_pool.cpp     | 17 ++++++++++---
 be/src/exec/sink/load_stream_map_pool.h       |  5 ++++
 be/src/exec/sink/load_stream_stub.cpp         | 32 ++++++++++++++++++++++--
 be/src/exec/sink/load_stream_stub.h           | 36 ++++++++++++++++++++++-----
 be/src/exec/sink/writer/vtablet_writer.cpp    | 26 +++++++++++++++++--
 be/src/exec/sink/writer/vtablet_writer.h      | 13 ++++++++++
 be/src/exec/sink/writer/vtablet_writer_v2.cpp | 11 ++++++--
 be/test/exec/sink/vtablet_writer_v2_test.cpp  | 21 ++++++++++++++++
 8 files changed, 145 insertions(+), 16 deletions(-)

diff --git a/be/src/exec/sink/load_stream_map_pool.cpp 
b/be/src/exec/sink/load_stream_map_pool.cpp
index eb9a3c669b6..a135aee6a0f 100644
--- a/be/src/exec/sink/load_stream_map_pool.cpp
+++ b/be/src/exec/sink/load_stream_map_pool.cpp
@@ -32,7 +32,8 @@ LoadStreamMap::LoadStreamMap(UniqueId load_id, int64_t 
src_id, int num_streams,
           _num_incremental_streams(0),
           _pool(pool),
           _tablet_schema_for_index(std::make_shared<IndexToTabletSchema>()),
-          _enable_unique_mow_for_index(std::make_shared<IndexToEnableMoW>()) {
+          _enable_unique_mow_for_index(std::make_shared<IndexToEnableMoW>()),
+          _close_wait_notifier(std::make_shared<CloseWaitNotifier>()) {
     DCHECK(num_streams > 0) << "stream num should be greater than 0";
     DCHECK(num_use > 0) << "use num should be greater than 0";
 }
@@ -46,9 +47,9 @@ std::shared_ptr<LoadStreamStubs> 
LoadStreamMap::get_or_create(int64_t dst_id, bo
     if (incremental) {
         _num_incremental_streams.fetch_add(1);
     }
-    streams = std::make_shared<LoadStreamStubs>(_num_streams, _load_id, 
_src_id,
-                                                _tablet_schema_for_index,
-                                                _enable_unique_mow_for_index, 
incremental);
+    streams = std::make_shared<LoadStreamStubs>(
+            _num_streams, _load_id, _src_id, _tablet_schema_for_index, 
_enable_unique_mow_for_index,
+            incremental, _close_wait_notifier);
     _streams_for_node[dst_id] = streams;
     return streams;
 }
@@ -130,6 +131,14 @@ void LoadStreamMap::close_load(bool incremental) {
     }
 }
 
+int64_t LoadStreamMap::close_wait_version() const {
+    return _close_wait_notifier->close_wait_version();
+}
+
+void LoadStreamMap::wait_for_close_event(int64_t observed_version, int64_t 
timeout_ms) {
+    _close_wait_notifier->wait_for_close_event(observed_version, timeout_ms);
+}
+
 LoadStreamMapPool::LoadStreamMapPool() = default;
 
 LoadStreamMapPool::~LoadStreamMapPool() = default;
diff --git a/be/src/exec/sink/load_stream_map_pool.h 
b/be/src/exec/sink/load_stream_map_pool.h
index cb210b3663c..7cd447d615f 100644
--- a/be/src/exec/sink/load_stream_map_pool.h
+++ b/be/src/exec/sink/load_stream_map_pool.h
@@ -97,6 +97,10 @@ public:
     // only call this method after release() returns true.
     void close_load(bool incremental);
 
+    int64_t close_wait_version() const;
+
+    void wait_for_close_event(int64_t observed_version, int64_t timeout_ms);
+
     std::unordered_map<int64_t, std::shared_ptr<LoadStreamStubs>> 
get_streams_for_node() {
         decltype(_streams_for_node) snapshot;
         {
@@ -117,6 +121,7 @@ private:
     LoadStreamMapPool* _pool = nullptr;
     std::shared_ptr<IndexToTabletSchema> _tablet_schema_for_index;
     std::shared_ptr<IndexToEnableMoW> _enable_unique_mow_for_index;
+    std::shared_ptr<CloseWaitNotifier> _close_wait_notifier;
 
     std::mutex _tablets_to_commit_mutex;
     std::unordered_map<int64_t, std::unordered_map<int64_t, PTabletID>> 
_tablets_to_commit;
diff --git a/be/src/exec/sink/load_stream_stub.cpp 
b/be/src/exec/sink/load_stream_stub.cpp
index bc54ebd16fc..3e462badd4b 100644
--- a/be/src/exec/sink/load_stream_stub.cpp
+++ b/be/src/exec/sink/load_stream_stub.cpp
@@ -31,6 +31,24 @@
 namespace doris {
 #include "common/compile_check_begin.h"
 
+int64_t CloseWaitNotifier::close_wait_version() const {
+    return _close_wait_version.load(std::memory_order_acquire);
+}
+
+void CloseWaitNotifier::wait_for_close_event(int64_t observed_version, int64_t 
timeout_ms) {
+    std::unique_lock<bthread::Mutex> lock(_close_wait_mutex);
+    if (observed_version != close_wait_version()) {
+        return;
+    }
+    static_cast<void>(_close_wait_cv.wait_for(lock, timeout_ms * 1000));
+}
+
+void CloseWaitNotifier::notify_close_wait() {
+    _close_wait_version.fetch_add(1, std::memory_order_acq_rel);
+    std::lock_guard<bthread::Mutex> lock(_close_wait_mutex);
+    _close_wait_cv.notify_all();
+}
+
 int LoadStreamReplyHandler::on_received_messages(brpc::StreamId id, 
butil::IOBuf* const messages[],
                                                  size_t size) {
     auto stub = _stub.lock();
@@ -119,6 +137,7 @@ void LoadStreamReplyHandler::on_closed(brpc::StreamId id) {
         return;
     }
     stub->_is_closed.store(true);
+    stub->notify_close_wait();
 }
 
 inline std::ostream& operator<<(std::ostream& ostr, const 
LoadStreamReplyHandler& handler) {
@@ -129,12 +148,16 @@ inline std::ostream& operator<<(std::ostream& ostr, const 
LoadStreamReplyHandler
 
 LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id,
                                std::shared_ptr<IndexToTabletSchema> schema_map,
-                               std::shared_ptr<IndexToEnableMoW> mow_map, bool 
incremental)
+                               std::shared_ptr<IndexToEnableMoW> mow_map, bool 
incremental,
+                               std::shared_ptr<CloseWaitNotifier> 
close_wait_notifier)
         : _load_id(load_id),
           _src_id(src_id),
           _tablet_schema_for_index(schema_map),
           _enable_unique_mow_for_index(mow_map),
-          _is_incremental(incremental) {};
+          _is_incremental(incremental),
+          _close_wait_notifier(std::move(close_wait_notifier)) {
+    DCHECK(_close_wait_notifier != nullptr);
+};
 
 LoadStreamStub::~LoadStreamStub() {
     if (_is_open.load() && !_is_closed.load()) {
@@ -365,6 +388,10 @@ Status LoadStreamStub::close_finish_check(RuntimeState* 
state, bool* is_closed)
     return Status::OK();
 }
 
+void LoadStreamStub::notify_close_wait() {
+    _close_wait_notifier->notify_close_wait();
+}
+
 void LoadStreamStub::cancel(Status reason) {
     LOG(WARNING) << *this << " is cancelled because of " << reason;
     if (_is_open.load()) {
@@ -376,6 +403,7 @@ void LoadStreamStub::cancel(Status reason) {
         _is_cancelled.store(true);
     }
     _is_closed.store(true);
+    notify_close_wait();
 }
 
 Status LoadStreamStub::_encode_and_send(PStreamHeader& header, std::span<const 
Slice> data) {
diff --git a/be/src/exec/sink/load_stream_stub.h 
b/be/src/exec/sink/load_stream_stub.h
index 4dfe2e252d9..638d82882a7 100644
--- a/be/src/exec/sink/load_stream_stub.h
+++ b/be/src/exec/sink/load_stream_stub.h
@@ -82,6 +82,20 @@ using IndexToEnableMoW =
                                       std::allocator<phmap::Pair<const 
int64_t, bool>>, 4,
                                       std::mutex>;
 
+class CloseWaitNotifier {
+public:
+    int64_t close_wait_version() const;
+
+    void wait_for_close_event(int64_t observed_version, int64_t timeout_ms);
+
+    void notify_close_wait();
+
+private:
+    std::atomic<int64_t> _close_wait_version {0};
+    bthread::Mutex _close_wait_mutex;
+    bthread::ConditionVariable _close_wait_cv;
+};
+
 class LoadStreamReplyHandler : public brpc::StreamInputHandler {
 public:
     LoadStreamReplyHandler(PUniqueId load_id, int64_t dst_id, 
std::weak_ptr<LoadStreamStub> stub)
@@ -109,12 +123,17 @@ public:
     // construct new stub
     LoadStreamStub(PUniqueId load_id, int64_t src_id,
                    std::shared_ptr<IndexToTabletSchema> schema_map,
-                   std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental 
= false);
+                   std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental 
= false,
+                   std::shared_ptr<CloseWaitNotifier> close_wait_notifier =
+                           std::make_shared<CloseWaitNotifier>());
 
     LoadStreamStub(UniqueId load_id, int64_t src_id,
                    std::shared_ptr<IndexToTabletSchema> schema_map,
-                   std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental 
= false)
-            : LoadStreamStub(load_id.to_proto(), src_id, schema_map, mow_map, 
incremental) {};
+                   std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental 
= false,
+                   std::shared_ptr<CloseWaitNotifier> close_wait_notifier =
+                           std::make_shared<CloseWaitNotifier>())
+            : LoadStreamStub(load_id.to_proto(), src_id, schema_map, mow_map, 
incremental,
+                             std::move(close_wait_notifier)) {};
 
 // for mock this class in UT
 #ifdef BE_TEST
@@ -244,6 +263,8 @@ public:
                     tablet_load_infos);
 
 private:
+    void notify_close_wait();
+
     Status _encode_and_send(PStreamHeader& header, std::span<const Slice> data 
= {});
     Status _send_with_buffer(butil::IOBuf& buf, bool sync = false);
     Status _send_with_retry(butil::IOBuf& buf);
@@ -282,6 +303,7 @@ protected:
     std::unordered_map<int64_t, Status> _failed_tablets;
 
     bool _is_incremental = false;
+    std::shared_ptr<CloseWaitNotifier> _close_wait_notifier;
 
     bthread::Mutex _write_mutex;
     size_t _bytes_written = 0;
@@ -294,12 +316,14 @@ class LoadStreamStubs {
 public:
     LoadStreamStubs(size_t num_streams, UniqueId load_id, int64_t src_id,
                     std::shared_ptr<IndexToTabletSchema> schema_map,
-                    std::shared_ptr<IndexToEnableMoW> mow_map, bool 
incremental = false)
+                    std::shared_ptr<IndexToEnableMoW> mow_map, bool 
incremental = false,
+                    std::shared_ptr<CloseWaitNotifier> close_wait_notifier =
+                            std::make_shared<CloseWaitNotifier>())
             : _is_incremental(incremental) {
         _streams.reserve(num_streams);
         for (size_t i = 0; i < num_streams; i++) {
-            _streams.emplace_back(
-                    new LoadStreamStub(load_id, src_id, schema_map, mow_map, 
incremental));
+            _streams.emplace_back(new LoadStreamStub(load_id, src_id, 
schema_map, mow_map,
+                                                     incremental, 
close_wait_notifier));
         }
     }
 
diff --git a/be/src/exec/sink/writer/vtablet_writer.cpp 
b/be/src/exec/sink/writer/vtablet_writer.cpp
index 1af9e77272c..803082fa7ee 100644
--- a/be/src/exec/sink/writer/vtablet_writer.cpp
+++ b/be/src/exec/sink/writer/vtablet_writer.cpp
@@ -103,6 +103,8 @@ bvar::PerSecond<bvar::Adder<int64_t>> 
g_sink_write_rows_per_second("sink_through
 bvar::Adder<int64_t> g_sink_load_back_pressure_version_time_ms(
         "load_back_pressure_version_time_ms");
 
+static constexpr int64_t CLOSE_WAIT_EVENT_FALLBACK_MS = 1000;
+
 Status IndexChannel::init(RuntimeState* state, const 
std::vector<TTabletWithPartition>& tablets,
                           bool incremental) {
     SCOPED_CONSUME_MEM_TRACKER(_index_channel_tracker.get());
@@ -310,6 +312,20 @@ static Status 
cancel_channel_and_check_intolerable_failure(Status status,
     return status;
 }
 
+void IndexChannel::wait_for_close_event(int64_t observed_version, int64_t 
timeout_ms) {
+    std::unique_lock<bthread::Mutex> lock(_close_wait_mutex);
+    if (observed_version != close_wait_version()) {
+        return;
+    }
+    static_cast<void>(_close_wait_cv.wait_for(lock, timeout_ms * 1000));
+}
+
+void IndexChannel::notify_close_wait() {
+    _close_wait_version.fetch_add(1, std::memory_order_acq_rel);
+    std::lock_guard<bthread::Mutex> lock(_close_wait_mutex);
+    _close_wait_cv.notify_all();
+}
+
 Status IndexChannel::close_wait(
         RuntimeState* state, WriterStats* writer_stats,
         std::unordered_map<int64_t, AddBatchCounter>* 
node_add_batch_counter_map,
@@ -331,6 +347,7 @@ Status IndexChannel::close_wait(
         }
     }
     while (true) {
+        int64_t close_wait_version = this->close_wait_version();
         RETURN_IF_ERROR(check_each_node_channel_close(
                 &unfinished_node_channel_ids, node_add_batch_counter_map, 
writer_stats, status));
         bool quorum_success = _quorum_success(unfinished_node_channel_ids, 
need_finish_tablets);
@@ -341,7 +358,7 @@ Status IndexChannel::close_wait(
                       << ", load_id: " << print_id(_parent->_load_id);
             break;
         }
-        bthread_usleep(1000 * 10);
+        wait_for_close_event(close_wait_version, CLOSE_WAIT_EVENT_FALLBACK_MS);
     }
 
     // 2. wait for all node channel to complete as much as possible
@@ -349,6 +366,7 @@ Status IndexChannel::close_wait(
         int64_t arrival_quorum_success_time = UnixMillis();
         int64_t max_wait_time_ms = 
_calc_max_wait_time_ms(unfinished_node_channel_ids);
         while (true) {
+            int64_t close_wait_version = this->close_wait_version();
             
RETURN_IF_ERROR(check_each_node_channel_close(&unfinished_node_channel_ids,
                                                           
node_add_batch_counter_map, writer_stats,
                                                           status));
@@ -372,7 +390,8 @@ Status IndexChannel::close_wait(
                              << unfinished_node_channel_host_str.str();
                 break;
             }
-            bthread_usleep(1000 * 10);
+            wait_for_close_event(close_wait_version, 
std::min(CLOSE_WAIT_EVENT_FALLBACK_MS,
+                                                              max_wait_time_ms 
- elapsed_ms));
         }
     }
     return status;
@@ -873,6 +892,7 @@ void VNodeChannel::_cancel_with_msg(const std::string& msg) 
{
         }
     }
     _cancelled = true;
+    _index_channel->notify_close_wait();
 }
 
 void VNodeChannel::_refresh_back_pressure_version_wait_time(
@@ -1135,6 +1155,7 @@ void VNodeChannel::_add_block_success_callback(const 
PTabletWriterAddBlockResult
                 }
             }
             _add_batches_finished = true;
+            _index_channel->notify_close_wait();
         }
     } else {
         _cancel_with_msg(fmt::format("{}, add batch req success but status 
isn't ok, err: {}",
@@ -1185,6 +1206,7 @@ void VNodeChannel::_add_block_failed_callback(const 
WriteBlockCallbackContext& c
         // if this is last rpc, will must set _add_batches_finished. 
otherwise, node channel's close_wait
         // will be blocked.
         _add_batches_finished = true;
+        _index_channel->notify_close_wait();
     }
 }
 
diff --git a/be/src/exec/sink/writer/vtablet_writer.h 
b/be/src/exec/sink/writer/vtablet_writer.h
index d3e6e8da0f1..69b78fa280c 100644
--- a/be/src/exec/sink/writer/vtablet_writer.h
+++ b/be/src/exec/sink/writer/vtablet_writer.h
@@ -31,6 +31,7 @@
 #include <google/protobuf/stubs/callback.h>
 
 // IWYU pragma: no_include <bits/chrono.h>
+#include <bthread/condition_variable.h>
 #include <bthread/mutex.h>
 
 #include <atomic>
@@ -524,6 +525,14 @@ public:
                       std::unordered_set<int64_t> unfinished_node_channel_ids,
                       bool need_wait_after_quorum_success);
 
+    int64_t close_wait_version() const {
+        return _close_wait_version.load(std::memory_order_acquire);
+    }
+
+    void wait_for_close_event(int64_t observed_version, int64_t timeout_ms);
+
+    void notify_close_wait();
+
     Status check_each_node_channel_close(
             std::unordered_set<int64_t>* unfinished_node_channel_ids,
             std::unordered_map<int64_t, AddBatchCounter>* 
node_add_batch_counter_map,
@@ -614,6 +623,10 @@ private:
     std::map<int64_t, std::vector<std::pair<int64_t, int64_t>>> 
_tablets_filtered_rows;
 
     int64_t _start_time = 0;
+
+    std::atomic<int64_t> _close_wait_version {0};
+    bthread::Mutex _close_wait_mutex;
+    bthread::ConditionVariable _close_wait_cv;
 };
 } // namespace doris
 
diff --git a/be/src/exec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/exec/sink/writer/vtablet_writer_v2.cpp
index 37c9b94ddf2..610500aeffa 100644
--- a/be/src/exec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/exec/sink/writer/vtablet_writer_v2.cpp
@@ -24,6 +24,7 @@
 #include <gen_cpp/Types_types.h>
 #include <gen_cpp/internal_service.pb.h>
 
+#include <algorithm>
 #include <cstdint>
 #include <mutex>
 #include <ranges>
@@ -59,6 +60,8 @@ namespace doris {
 
 extern bvar::Adder<int64_t> g_sink_load_back_pressure_version_time_ms;
 
+static constexpr int64_t CLOSE_WAIT_EVENT_FALLBACK_MS = 1000;
+
 VTabletWriterV2::VTabletWriterV2(const TDataSink& t_sink, const 
VExprContextSPtrs& output_exprs,
                                  std::shared_ptr<Dependency> dep,
                                  std::shared_ptr<Dependency> fin_dep)
@@ -821,6 +824,7 @@ Status VTabletWriterV2::_close_wait(
         }
     }
     while (true) {
+        int64_t close_wait_version = _load_stream_map->close_wait_version();
         RETURN_IF_ERROR(_check_timeout());
         RETURN_IF_ERROR(_check_streams_finish(unfinished_streams, status, 
streams_for_node));
         bool quorum_success = _quorum_success(unfinished_streams, 
need_finish_tablets);
@@ -830,7 +834,7 @@ Status VTabletWriterV2::_close_wait(
                       << ", txn_id: " << _txn_id << ", load_id: " << 
print_id(_load_id);
             break;
         }
-        bthread_usleep(1000 * 10);
+        _load_stream_map->wait_for_close_event(close_wait_version, 
CLOSE_WAIT_EVENT_FALLBACK_MS);
     }
 
     // 2. then wait for remaining streams as much as possible
@@ -838,6 +842,7 @@ Status VTabletWriterV2::_close_wait(
         int64_t arrival_quorum_success_time = UnixMillis();
         int64_t max_wait_time_ms = _calc_max_wait_time_ms(streams_for_node, 
unfinished_streams);
         while (true) {
+            int64_t close_wait_version = 
_load_stream_map->close_wait_version();
             RETURN_IF_ERROR(_check_timeout());
             RETURN_IF_ERROR(_check_streams_finish(unfinished_streams, status, 
streams_for_node));
             if (unfinished_streams.empty()) {
@@ -856,7 +861,9 @@ Status VTabletWriterV2::_close_wait(
                              << ", unfinished streams: " << 
unfinished_streams_str.str();
                 break;
             }
-            bthread_usleep(1000 * 10);
+            _load_stream_map->wait_for_close_event(
+                    close_wait_version,
+                    std::min(CLOSE_WAIT_EVENT_FALLBACK_MS, max_wait_time_ms - 
elapsed_ms));
         }
     }
 
diff --git a/be/test/exec/sink/vtablet_writer_v2_test.cpp 
b/be/test/exec/sink/vtablet_writer_v2_test.cpp
index 2ddde5bc4b3..10814c4b9db 100644
--- a/be/test/exec/sink/vtablet_writer_v2_test.cpp
+++ b/be/test/exec/sink/vtablet_writer_v2_test.cpp
@@ -294,6 +294,27 @@ TEST_F(TestVTabletWriterV2, 
shared_delta_writer_should_not_access_destroyed_crea
     current_writer->_cancel(Status::Cancelled("test cleanup"));
 }
 
+TEST_F(TestVTabletWriterV2, 
close_wait_notifier_should_be_scoped_to_load_stream_map) {
+    UniqueId load_id1;
+    UniqueId load_id2;
+    load_id2.lo = 1;
+    std::shared_ptr<LoadStreamMap> load_stream_map1 =
+            std::make_shared<LoadStreamMap>(load_id1, src_id, 1, 1, nullptr);
+    std::shared_ptr<LoadStreamMap> load_stream_map2 =
+            std::make_shared<LoadStreamMap>(load_id2, src_id, 1, 1, nullptr);
+    auto streams1 = load_stream_map1->get_or_create(1001);
+    auto streams2 = load_stream_map2->get_or_create(1002);
+    streams1->mark_open();
+    streams2->mark_open();
+
+    int64_t version1 = load_stream_map1->close_wait_version();
+    int64_t version2 = load_stream_map2->close_wait_version();
+    streams1->select_one_stream()->cancel(Status::Cancelled("test"));
+
+    ASSERT_GT(load_stream_map1->close_wait_version(), version1);
+    ASSERT_EQ(load_stream_map2->close_wait_version(), version2);
+}
+
 TEST_F(TestVTabletWriterV2, one_replica) {
     UniqueId load_id;
     std::vector<TTabletCommitInfo> tablet_commit_infos;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to