This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 2dccf9fd1e6b40e09b7d55c83d35a28ce93b2a22 Author: Kaijie Chen <[email protected]> AuthorDate: Wed Apr 17 10:40:17 2024 +0800 [fix](move-memtable) close wait on all sinks (#33710) --- be/src/vec/sink/load_stream_map_pool.cpp | 16 ++++++++++++---- be/src/vec/sink/writer/vtablet_writer_v2.cpp | 8 +++++++- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/be/src/vec/sink/load_stream_map_pool.cpp b/be/src/vec/sink/load_stream_map_pool.cpp index f335f05e162..fdcfe190dbf 100644 --- a/be/src/vec/sink/load_stream_map_pool.cpp +++ b/be/src/vec/sink/load_stream_map_pool.cpp @@ -61,15 +61,23 @@ bool LoadStreamMap::contains(int64_t dst_id) { } void LoadStreamMap::for_each(std::function<void(int64_t, const Streams&)> fn) { - std::lock_guard<std::mutex> lock(_mutex); - for (auto& [dst_id, streams] : _streams_for_node) { + decltype(_streams_for_node) snapshot; + { + std::lock_guard<std::mutex> lock(_mutex); + snapshot = _streams_for_node; + } + for (auto& [dst_id, streams] : snapshot) { fn(dst_id, *streams); } } Status LoadStreamMap::for_each_st(std::function<Status(int64_t, const Streams&)> fn) { - std::lock_guard<std::mutex> lock(_mutex); - for (auto& [dst_id, streams] : _streams_for_node) { + decltype(_streams_for_node) snapshot; + { + std::lock_guard<std::mutex> lock(_mutex); + snapshot = _streams_for_node; + } + for (auto& [dst_id, streams] : snapshot) { RETURN_IF_ERROR(fn(dst_id, *streams)); } return Status::OK(); diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index 617e96c6cc4..1f1756b5a16 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -546,9 +546,15 @@ Status VTabletWriterV2::close(Status exec_status) { LOG(INFO) << "sink " << _sender_id << " released streams, is_last=" << is_last_sink << ", load_id=" << print_id(_load_id); - // send CLOSE_LOAD and close_wait on all streams + // send CLOSE_LOAD on all streams if this is the last sink if (is_last_sink) { RETURN_IF_ERROR(_load_stream_map->close_load()); + } + + // close_wait on all streams, even if this is not the last sink. + // because some per-instance data structures are now shared among all sinks + // due to sharing delta writers and load stream stubs. + { SCOPED_TIMER(_close_load_timer); RETURN_IF_ERROR(_load_stream_map->for_each_st([this](int64_t dst_id, const Streams& streams) -> Status { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
