This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 2dccf9fd1e6b40e09b7d55c83d35a28ce93b2a22
Author: Kaijie Chen <[email protected]>
AuthorDate: Wed Apr 17 10:40:17 2024 +0800

    [fix](move-memtable) close wait on all sinks (#33710)
---
 be/src/vec/sink/load_stream_map_pool.cpp     | 16 ++++++++++++----
 be/src/vec/sink/writer/vtablet_writer_v2.cpp |  8 +++++++-
 2 files changed, 19 insertions(+), 5 deletions(-)

diff --git a/be/src/vec/sink/load_stream_map_pool.cpp 
b/be/src/vec/sink/load_stream_map_pool.cpp
index f335f05e162..fdcfe190dbf 100644
--- a/be/src/vec/sink/load_stream_map_pool.cpp
+++ b/be/src/vec/sink/load_stream_map_pool.cpp
@@ -61,15 +61,23 @@ bool LoadStreamMap::contains(int64_t dst_id) {
 }
 
 void LoadStreamMap::for_each(std::function<void(int64_t, const Streams&)> fn) {
-    std::lock_guard<std::mutex> lock(_mutex);
-    for (auto& [dst_id, streams] : _streams_for_node) {
+    decltype(_streams_for_node) snapshot;
+    {
+        std::lock_guard<std::mutex> lock(_mutex);
+        snapshot = _streams_for_node;
+    }
+    for (auto& [dst_id, streams] : snapshot) {
         fn(dst_id, *streams);
     }
 }
 
 Status LoadStreamMap::for_each_st(std::function<Status(int64_t, const 
Streams&)> fn) {
-    std::lock_guard<std::mutex> lock(_mutex);
-    for (auto& [dst_id, streams] : _streams_for_node) {
+    decltype(_streams_for_node) snapshot;
+    {
+        std::lock_guard<std::mutex> lock(_mutex);
+        snapshot = _streams_for_node;
+    }
+    for (auto& [dst_id, streams] : snapshot) {
         RETURN_IF_ERROR(fn(dst_id, *streams));
     }
     return Status::OK();
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 617e96c6cc4..1f1756b5a16 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -546,9 +546,15 @@ Status VTabletWriterV2::close(Status exec_status) {
         LOG(INFO) << "sink " << _sender_id << " released streams, is_last=" << 
is_last_sink
                   << ", load_id=" << print_id(_load_id);
 
-        // send CLOSE_LOAD and close_wait on all streams
+        // send CLOSE_LOAD on all streams if this is the last sink
         if (is_last_sink) {
             RETURN_IF_ERROR(_load_stream_map->close_load());
+        }
+
+        // close_wait on all streams, even if this is not the last sink.
+        // because some per-instance data structures are now shared among all 
sinks
+        // due to sharing delta writers and load stream stubs.
+        {
             SCOPED_TIMER(_close_load_timer);
             RETURN_IF_ERROR(_load_stream_map->for_each_st([this](int64_t 
dst_id,
                                                                  const 
Streams& streams) -> Status {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to