This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c34b52c02a0 [fix](move-memtable) fix initial use count of streams for 
auto partition (#33165)
c34b52c02a0 is described below

commit c34b52c02a0547b665496bc02d775017d67d7fb3
Author: Kaijie Chen <[email protected]>
AuthorDate: Wed Apr 3 19:44:52 2024 +0800

    [fix](move-memtable) fix initial use count of streams for auto partition 
(#33165)
---
 be/src/vec/sink/load_stream_stub.cpp            |  33 +---
 be/src/vec/sink/load_stream_stub.h              |  14 +-
 be/src/vec/sink/load_stream_stub_pool.cpp       | 110 +++++++++----
 be/src/vec/sink/load_stream_stub_pool.h         |  50 +++---
 be/src/vec/sink/writer/vtablet_writer_v2.cpp    | 200 ++++++++++++------------
 be/src/vec/sink/writer/vtablet_writer_v2.h      |  10 +-
 be/test/io/fs/stream_sink_file_writer_test.cpp  |   4 +-
 be/test/vec/exec/load_stream_stub_pool_test.cpp |  31 ++--
 8 files changed, 254 insertions(+), 198 deletions(-)

diff --git a/be/src/vec/sink/load_stream_stub.cpp 
b/be/src/vec/sink/load_stream_stub.cpp
index 1b19cd1dd52..2e118dce5c1 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -125,19 +125,13 @@ inline std::ostream& operator<<(std::ostream& ostr, const 
LoadStreamReplyHandler
     return ostr;
 }
 
-LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id, int num_use)
-        : _use_cnt(num_use),
-          _load_id(load_id),
+LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id,
+                               std::shared_ptr<IndexToTabletSchema> schema_map,
+                               std::shared_ptr<IndexToEnableMoW> mow_map)
+        : _load_id(load_id),
           _src_id(src_id),
-          _tablet_schema_for_index(std::make_shared<IndexToTabletSchema>()),
-          _enable_unique_mow_for_index(std::make_shared<IndexToEnableMoW>()) 
{};
-
-LoadStreamStub::LoadStreamStub(LoadStreamStub& stub)
-        : _use_cnt(stub._use_cnt.load()),
-          _load_id(stub._load_id),
-          _src_id(stub._src_id),
-          _tablet_schema_for_index(stub._tablet_schema_for_index),
-          _enable_unique_mow_for_index(stub._enable_unique_mow_for_index) {};
+          _tablet_schema_for_index(schema_map),
+          _enable_unique_mow_for_index(mow_map) {};
 
 LoadStreamStub::~LoadStreamStub() {
     if (_is_init.load() && !_is_closed.load()) {
@@ -241,23 +235,12 @@ Status LoadStreamStub::add_segment(int64_t partition_id, 
int64_t index_id, int64
 
 // CLOSE_LOAD
 Status LoadStreamStub::close_load(const std::vector<PTabletID>& 
tablets_to_commit) {
-    {
-        std::lock_guard<std::mutex> lock(_tablets_to_commit_mutex);
-        _tablets_to_commit.insert(_tablets_to_commit.end(), 
tablets_to_commit.begin(),
-                                  tablets_to_commit.end());
-    }
-    if (--_use_cnt > 0) {
-        return Status::OK();
-    }
     PStreamHeader header;
     *header.mutable_load_id() = _load_id;
     header.set_src_id(_src_id);
     header.set_opcode(doris::PStreamHeader::CLOSE_LOAD);
-    {
-        std::lock_guard<std::mutex> lock(_tablets_to_commit_mutex);
-        for (const auto& tablet : _tablets_to_commit) {
-            *header.add_tablets() = tablet;
-        }
+    for (const auto& tablet : tablets_to_commit) {
+        *header.add_tablets() = tablet;
     }
     return _encode_and_send(header);
 }
diff --git a/be/src/vec/sink/load_stream_stub.h 
b/be/src/vec/sink/load_stream_stub.h
index f20b0e6ea3d..aa8b850760e 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -109,10 +109,14 @@ class LoadStreamStub {
 
 public:
     // construct new stub
-    LoadStreamStub(PUniqueId load_id, int64_t src_id, int num_use);
+    LoadStreamStub(PUniqueId load_id, int64_t src_id,
+                   std::shared_ptr<IndexToTabletSchema> schema_map,
+                   std::shared_ptr<IndexToEnableMoW> mow_map);
 
-    // copy constructor, shared_ptr members are shared
-    LoadStreamStub(LoadStreamStub& stub);
+    LoadStreamStub(UniqueId load_id, int64_t src_id,
+                   std::shared_ptr<IndexToTabletSchema> schema_map,
+                   std::shared_ptr<IndexToEnableMoW> mow_map)
+            : LoadStreamStub(load_id.to_proto(), src_id, schema_map, mow_map) 
{};
 
 // for mock this class in UT
 #ifdef BE_TEST
@@ -213,7 +217,6 @@ protected:
     std::atomic<bool> _is_closed;
     std::atomic<bool> _is_cancelled;
     std::atomic<bool> _is_eos;
-    std::atomic<int> _use_cnt;
 
     PUniqueId _load_id;
     brpc::StreamId _stream_id;
@@ -226,9 +229,6 @@ protected:
     bthread::Mutex _cancel_mutex;
     bthread::ConditionVariable _close_cv;
 
-    std::mutex _tablets_to_commit_mutex;
-    std::vector<PTabletID> _tablets_to_commit;
-
     std::mutex _buffer_mutex;
     std::mutex _send_mutex;
     butil::IOBuf _buffer;
diff --git a/be/src/vec/sink/load_stream_stub_pool.cpp 
b/be/src/vec/sink/load_stream_stub_pool.cpp
index d76402b57d5..3eae49aff77 100644
--- a/be/src/vec/sink/load_stream_stub_pool.cpp
+++ b/be/src/vec/sink/load_stream_stub_pool.cpp
@@ -23,50 +23,104 @@
 namespace doris {
 class TExpr;
 
-LoadStreams::LoadStreams(UniqueId load_id, int64_t dst_id, int num_use, 
LoadStreamStubPool* pool)
-        : _load_id(load_id), _dst_id(dst_id), _use_cnt(num_use), _pool(pool) {}
+LoadStreamMap::LoadStreamMap(UniqueId load_id, int64_t src_id, int 
num_streams, int num_use,
+                             LoadStreamStubPool* pool)
+        : _load_id(load_id),
+          _src_id(src_id),
+          _num_streams(num_streams),
+          _use_cnt(num_use),
+          _pool(pool) {
+    DCHECK(num_streams > 0) << "stream num should be greater than 0";
+    DCHECK(num_use > 0) << "use num should be greater than 0";
+}
+
+std::shared_ptr<Streams> LoadStreamMap::get_or_create(int64_t dst_id) {
+    std::lock_guard<std::mutex> lock(_mutex);
+    std::shared_ptr<Streams> streams = _streams_for_node[dst_id];
+    if (streams != nullptr) {
+        return streams;
+    }
+    streams = std::make_shared<Streams>();
+    auto schema_map = std::make_shared<IndexToTabletSchema>();
+    auto mow_map = std::make_shared<IndexToEnableMoW>();
+    for (int i = 0; i < _num_streams; i++) {
+        streams->emplace_back(new LoadStreamStub(_load_id, _src_id, 
schema_map, mow_map));
+    }
+    _streams_for_node[dst_id] = streams;
+    return streams;
+}
+
+std::shared_ptr<Streams> LoadStreamMap::at(int64_t dst_id) {
+    std::lock_guard<std::mutex> lock(_mutex);
+    return _streams_for_node.at(dst_id);
+}
+
+bool LoadStreamMap::contains(int64_t dst_id) {
+    std::lock_guard<std::mutex> lock(_mutex);
+    return _streams_for_node.contains(dst_id);
+}
 
-void LoadStreams::release() {
+void LoadStreamMap::for_each(std::function<void(int64_t, const Streams&)> fn) {
+    std::lock_guard<std::mutex> lock(_mutex);
+    for (auto& [dst_id, streams] : _streams_for_node) {
+        fn(dst_id, *streams);
+    }
+}
+
+Status LoadStreamMap::for_each_st(std::function<Status(int64_t, const 
Streams&)> fn) {
+    std::lock_guard<std::mutex> lock(_mutex);
+    for (auto& [dst_id, streams] : _streams_for_node) {
+        RETURN_IF_ERROR(fn(dst_id, *streams));
+    }
+    return Status::OK();
+}
+
+void LoadStreamMap::save_tablets_to_commit(int64_t dst_id,
+                                           const std::vector<PTabletID>& 
tablets_to_commit) {
+    std::lock_guard<std::mutex> lock(_tablets_to_commit_mutex);
+    auto& tablets = _tablets_to_commit[dst_id];
+    tablets.insert(tablets.end(), tablets_to_commit.begin(), 
tablets_to_commit.end());
+}
+
+bool LoadStreamMap::release() {
     int num_use = --_use_cnt;
-    DBUG_EXECUTE_IF("LoadStreams.release.keeping_streams", { num_use = 1; });
     if (num_use == 0) {
-        LOG(INFO) << "releasing streams, load_id=" << _load_id << ", dst_id=" 
<< _dst_id;
-        _pool->erase(_load_id, _dst_id);
-    } else {
-        LOG(INFO) << "keeping streams, load_id=" << _load_id << ", dst_id=" << 
_dst_id
-                  << ", use_cnt=" << num_use;
+        LOG(INFO) << "releasing streams, load_id=" << _load_id;
+        _pool->erase(_load_id);
+        return true;
     }
+    LOG(INFO) << "keeping streams, load_id=" << _load_id << ", use_cnt=" << 
num_use;
+    return false;
+}
+
+Status LoadStreamMap::close_load() {
+    return for_each_st([this](int64_t dst_id, const Streams& streams) -> 
Status {
+        const auto& tablets = _tablets_to_commit[dst_id];
+        for (auto& stream : streams) {
+            RETURN_IF_ERROR(stream->close_load(tablets));
+        }
+        return Status::OK();
+    });
 }
 
 LoadStreamStubPool::LoadStreamStubPool() = default;
 
 LoadStreamStubPool::~LoadStreamStubPool() = default;
-
-std::shared_ptr<LoadStreams> LoadStreamStubPool::get_or_create(PUniqueId 
load_id, int64_t src_id,
-                                                               int64_t dst_id, 
int num_streams,
-                                                               int num_sink) {
-    auto key = std::make_pair(UniqueId(load_id), dst_id);
+std::shared_ptr<LoadStreamMap> LoadStreamStubPool::get_or_create(UniqueId 
load_id, int64_t src_id,
+                                                                 int 
num_streams, int num_use) {
     std::lock_guard<std::mutex> lock(_mutex);
-    std::shared_ptr<LoadStreams> streams = _pool[key];
-    if (streams) {
+    std::shared_ptr<LoadStreamMap> streams = _pool[load_id];
+    if (streams != nullptr) {
         return streams;
     }
-    DCHECK(num_streams > 0) << "stream num should be greater than 0";
-    DCHECK(num_sink > 0) << "sink num should be greater than 0";
-    auto [it, _] = _template_stubs.emplace(load_id, new LoadStreamStub 
{load_id, src_id, num_sink});
-    streams = std::make_shared<LoadStreams>(load_id, dst_id, num_sink, this);
-    for (int32_t i = 0; i < num_streams; i++) {
-        // copy construct, internal tablet schema map will be shared among all 
stubs
-        streams->streams().emplace_back(new LoadStreamStub {*it->second});
-    }
-    _pool[key] = streams;
+    streams = std::make_shared<LoadStreamMap>(load_id, src_id, num_streams, 
num_use, this);
+    _pool[load_id] = streams;
     return streams;
 }
 
-void LoadStreamStubPool::erase(UniqueId load_id, int64_t dst_id) {
+void LoadStreamStubPool::erase(UniqueId load_id) {
     std::lock_guard<std::mutex> lock(_mutex);
-    _pool.erase(std::make_pair(load_id, dst_id));
-    _template_stubs.erase(load_id);
+    _pool.erase(load_id);
 }
 
 } // namespace doris
diff --git a/be/src/vec/sink/load_stream_stub_pool.h 
b/be/src/vec/sink/load_stream_stub_pool.h
index 662fc5bc1a1..65f3bb66cd2 100644
--- a/be/src/vec/sink/load_stream_stub_pool.h
+++ b/be/src/vec/sink/load_stream_stub_pool.h
@@ -72,20 +72,41 @@ class LoadStreamStubPool;
 
 using Streams = std::vector<std::shared_ptr<LoadStreamStub>>;
 
-class LoadStreams {
+class LoadStreamMap {
 public:
-    LoadStreams(UniqueId load_id, int64_t dst_id, int num_use, 
LoadStreamStubPool* pool);
+    LoadStreamMap(UniqueId load_id, int64_t src_id, int num_streams, int 
num_use,
+                  LoadStreamStubPool* pool);
 
-    void release();
+    std::shared_ptr<Streams> get_or_create(int64_t dst_id);
 
-    Streams& streams() { return _streams; }
+    std::shared_ptr<Streams> at(int64_t dst_id);
+
+    bool contains(int64_t dst_id);
+
+    void for_each(std::function<void(int64_t, const Streams&)> fn);
+
+    Status for_each_st(std::function<Status(int64_t, const Streams&)> fn);
+
+    void save_tablets_to_commit(int64_t dst_id, const std::vector<PTabletID>& 
tablets_to_commit);
+
+    // Return true if the last instance is just released.
+    bool release();
+
+    // send CLOSE_LOAD to all streams, return ERROR if any.
+    // only call this method after release() returns true.
+    Status close_load();
 
 private:
-    Streams _streams;
-    UniqueId _load_id;
-    int64_t _dst_id;
+    const UniqueId _load_id;
+    const int64_t _src_id;
+    const int _num_streams;
     std::atomic<int> _use_cnt;
+    std::mutex _mutex;
+    std::unordered_map<int64_t, std::shared_ptr<Streams>> _streams_for_node;
     LoadStreamStubPool* _pool = nullptr;
+
+    std::mutex _tablets_to_commit_mutex;
+    std::unordered_map<int64_t, std::vector<PTabletID>> _tablets_to_commit;
 };
 
 class LoadStreamStubPool {
@@ -94,26 +115,19 @@ public:
 
     ~LoadStreamStubPool();
 
-    std::shared_ptr<LoadStreams> get_or_create(PUniqueId load_id, int64_t 
src_id, int64_t dst_id,
-                                               int num_streams, int num_sink);
+    std::shared_ptr<LoadStreamMap> get_or_create(UniqueId load_id, int64_t 
src_id, int num_streams,
+                                                 int num_use);
 
-    void erase(UniqueId load_id, int64_t dst_id);
+    void erase(UniqueId load_id);
 
     size_t size() {
         std::lock_guard<std::mutex> lock(_mutex);
         return _pool.size();
     }
 
-    // for UT only
-    size_t templates_size() {
-        std::lock_guard<std::mutex> lock(_mutex);
-        return _template_stubs.size();
-    }
-
 private:
     std::mutex _mutex;
-    std::unordered_map<UniqueId, std::unique_ptr<LoadStreamStub>> 
_template_stubs;
-    std::unordered_map<std::pair<UniqueId, int64_t>, 
std::shared_ptr<LoadStreams>> _pool;
+    std::unordered_map<UniqueId, std::shared_ptr<LoadStreamMap>> _pool;
 };
 
 } // namespace doris
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 52a2f75e263..42594f13d99 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -96,7 +96,7 @@ Status VTabletWriterV2::_incremental_open_streams(
                     tablet.set_partition_id(partition->id);
                     tablet.set_index_id(index.index_id);
                     tablet.set_tablet_id(tablet_id);
-                    if (!_streams_for_node.contains(node)) {
+                    if (!_streams_for_node->contains(node)) {
                         new_backends.insert(node);
                     }
                     _tablets_for_node[node].emplace(tablet_id, tablet);
@@ -111,11 +111,9 @@ Status VTabletWriterV2::_incremental_open_streams(
             }
         }
     }
-    for (int64_t node_id : new_backends) {
-        auto load_streams = 
ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(
-                _load_id, _backend_id, node_id, _stream_per_node, 
_num_local_sink);
-        RETURN_IF_ERROR(_open_streams_to_backend(node_id, *load_streams));
-        _streams_for_node[node_id] = load_streams;
+    for (int64_t dst_id : new_backends) {
+        auto streams = _streams_for_node->get_or_create(dst_id);
+        RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams));
     }
     return Status::OK();
 }
@@ -244,6 +242,8 @@ Status VTabletWriterV2::_init(RuntimeState* state, 
RuntimeProfile* profile) {
     } else {
         _delta_writer_for_tablet = 
std::make_shared<DeltaWriterV2Map>(_load_id);
     }
+    _streams_for_node = 
ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(
+            _load_id, _backend_id, _stream_per_node, _num_local_sink);
     return Status::OK();
 }
 
@@ -255,23 +255,21 @@ Status VTabletWriterV2::open(RuntimeState* state, 
RuntimeProfile* profile) {
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
 
     RETURN_IF_ERROR(_build_tablet_node_mapping());
-    RETURN_IF_ERROR(_open_streams(_backend_id));
+    RETURN_IF_ERROR(_open_streams());
     RETURN_IF_ERROR(_init_row_distribution());
 
     return Status::OK();
 }
 
-Status VTabletWriterV2::_open_streams(int64_t src_id) {
+Status VTabletWriterV2::_open_streams() {
     for (auto& [dst_id, _] : _tablets_for_node) {
-        auto streams = 
ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(
-                _load_id, src_id, dst_id, _stream_per_node, _num_local_sink);
+        auto streams = _streams_for_node->get_or_create(dst_id);
         RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams));
-        _streams_for_node[dst_id] = streams;
     }
     return Status::OK();
 }
 
-Status VTabletWriterV2::_open_streams_to_backend(int64_t dst_id, LoadStreams& 
streams) {
+Status VTabletWriterV2::_open_streams_to_backend(int64_t dst_id, Streams& 
streams) {
     const auto* node_info = _nodes_info->find_node(dst_id);
     DBUG_EXECUTE_IF("VTabletWriterV2._open_streams_to_backend.node_info_null",
                     { node_info = nullptr; });
@@ -280,14 +278,14 @@ Status VTabletWriterV2::_open_streams_to_backend(int64_t 
dst_id, LoadStreams& st
     }
     auto idle_timeout_ms = _state->execution_timeout() * 1000;
     // get tablet schema from each backend only in the 1st stream
-    for (auto& stream : streams.streams() | std::ranges::views::take(1)) {
+    for (auto& stream : streams | std::ranges::views::take(1)) {
         const std::vector<PTabletID>& tablets_for_schema = 
_indexes_from_node[node_info->id];
         RETURN_IF_ERROR(stream->open(stream, 
_state->exec_env()->brpc_internal_client_cache(),
                                      *node_info, _txn_id, *_schema, 
tablets_for_schema,
                                      _total_streams, idle_timeout_ms, 
_state->enable_profile()));
     }
     // for the rest streams, open without getting tablet schema
-    for (auto& stream : streams.streams() | std::ranges::views::drop(1)) {
+    for (auto& stream : streams | std::ranges::views::drop(1)) {
         RETURN_IF_ERROR(stream->open(stream, 
_state->exec_env()->brpc_internal_client_cache(),
                                      *node_info, _txn_id, *_schema, {}, 
_total_streams,
                                      idle_timeout_ms, 
_state->enable_profile()));
@@ -363,7 +361,7 @@ Status VTabletWriterV2::_select_streams(int64_t tablet_id, 
int64_t partition_id,
         tablet.set_tablet_id(tablet_id);
         VLOG_DEBUG << fmt::format("_select_streams P{} I{} T{}", partition_id, 
index_id, tablet_id);
         _tablets_for_node[node_id].emplace(tablet_id, tablet);
-        
streams.emplace_back(_streams_for_node.at(node_id)->streams().at(_stream_index));
+        
streams.emplace_back(_streams_for_node->at(node_id)->at(_stream_index));
         RETURN_IF_ERROR(streams[0]->wait_for_schema(partition_id, index_id, 
tablet_id));
     }
     _stream_index = (_stream_index + 1) % _stream_per_node;
@@ -472,11 +470,13 @@ Status VTabletWriterV2::_cancel(Status status) {
         _delta_writer_for_tablet->cancel(status);
         _delta_writer_for_tablet.reset();
     }
-    for (const auto& [_, streams] : _streams_for_node) {
-        for (const auto& stream : streams->streams()) {
-            stream->cancel(status);
-        }
-        streams->release();
+    if (_streams_for_node) {
+        _streams_for_node->for_each([status](int64_t dst_id, const Streams& 
streams) {
+            for (auto& stream : streams) {
+                stream->cancel(status);
+            }
+        });
+        _streams_for_node->release();
     }
     return Status::OK();
 }
@@ -530,87 +530,83 @@ Status VTabletWriterV2::close(Status exec_status) {
         COUNTER_SET(_row_distribution_timer, 
(int64_t)_row_distribution_watch.elapsed_time());
         COUNTER_SET(_validate_data_timer, 
_block_convertor->validate_data_ns());
 
-        // defer stream release to prevent memory leak
-        Defer defer([&] {
-            for (const auto& [_, streams] : _streams_for_node) {
-                streams->release();
-            }
-            _streams_for_node.clear();
-        });
-
+        // close DeltaWriters
         {
             SCOPED_TIMER(_close_writer_timer);
             // close all delta writers if this is the last user
-            RETURN_IF_ERROR(_delta_writer_for_tablet->close(_profile));
+            auto st = _delta_writer_for_tablet->close(_profile);
             _delta_writer_for_tablet.reset();
-        }
-
-        {
-            // send CLOSE_LOAD to all streams, return ERROR if any
-            for (const auto& [_, streams] : _streams_for_node) {
-                RETURN_IF_ERROR(_close_load(streams->streams()));
+            if (!st.ok()) {
+                RETURN_IF_ERROR(_cancel(st));
             }
         }
 
-        {
+        _calc_tablets_to_commit();
+        const bool is_last_sink = _streams_for_node->release();
+        LOG(INFO) << "sink " << _sender_id << " released streams, is_last=" << 
is_last_sink
+                  << ", load_id=" << print_id(_load_id);
+
+        // send CLOSE_LOAD and close_wait on all streams
+        if (is_last_sink) {
+            RETURN_IF_ERROR(_streams_for_node->close_load());
             SCOPED_TIMER(_close_load_timer);
-            for (const auto& [_, streams] : _streams_for_node) {
-                for (const auto& stream : streams->streams()) {
-                    int64_t remain_ms = 
static_cast<int64_t>(_state->execution_timeout()) * 1000 -
-                                        _timeout_watch.elapsed_time() / 1000 / 
1000;
-                    if (remain_ms <= 0) {
-                        LOG(WARNING) << "load timed out before close waiting, 
load_id="
-                                     << print_id(_load_id);
-                        return Status::TimedOut("load timed out before close 
waiting");
-                    }
-                    RETURN_IF_ERROR(stream->close_wait(_state, remain_ms));
-                }
-            }
+            RETURN_IF_ERROR(_streams_for_node->for_each_st(
+                    [this](int64_t dst_id, const Streams& streams) -> Status {
+                        for (auto& stream : streams) {
+                            int64_t remain_ms =
+                                    
static_cast<int64_t>(_state->execution_timeout()) * 1000 -
+                                    _timeout_watch.elapsed_time() / 1000 / 
1000;
+                            if (remain_ms <= 0) {
+                                LOG(WARNING) << "load timed out before close 
waiting, load_id="
+                                             << print_id(_load_id);
+                                return Status::TimedOut("load timed out before 
close waiting");
+                            }
+                            RETURN_IF_ERROR(stream->close_wait(_state, 
remain_ms));
+                        }
+                        return Status::OK();
+                    }));
         }
 
-        std::unordered_map<int64_t, int> failed_tablets;
+        // calculate and submit commit info
+        if (is_last_sink) {
+            std::unordered_map<int64_t, int> failed_tablets;
+            std::unordered_map<int64_t, Status> failed_reason;
+            std::vector<TTabletCommitInfo> tablet_commit_infos;
 
-        std::vector<TTabletCommitInfo> tablet_commit_infos;
-        for (const auto& [node_id, streams] : _streams_for_node) {
-            for (const auto& stream : streams->streams()) {
+            _streams_for_node->for_each([&](int64_t dst_id, const Streams& 
streams) {
                 std::unordered_set<int64_t> known_tablets;
-                for (auto [tablet_id, _] : stream->failed_tablets()) {
-                    if (known_tablets.contains(tablet_id)) {
-                        continue;
+                for (const auto& stream : streams) {
+                    for (auto [tablet_id, reason] : stream->failed_tablets()) {
+                        if (known_tablets.contains(tablet_id)) {
+                            continue;
+                        }
+                        known_tablets.insert(tablet_id);
+                        failed_tablets[tablet_id]++;
+                        failed_reason[tablet_id] = reason;
                     }
-                    known_tablets.insert(tablet_id);
-                    failed_tablets[tablet_id]++;
-                }
-                for (auto tablet_id : stream->success_tablets()) {
-                    if (known_tablets.contains(tablet_id)) {
-                        continue;
+                    for (auto tablet_id : stream->success_tablets()) {
+                        if (known_tablets.contains(tablet_id)) {
+                            continue;
+                        }
+                        known_tablets.insert(tablet_id);
+                        TTabletCommitInfo commit_info;
+                        commit_info.tabletId = tablet_id;
+                        commit_info.backendId = dst_id;
+                        
tablet_commit_infos.emplace_back(std::move(commit_info));
                     }
-                    known_tablets.insert(tablet_id);
-                    TTabletCommitInfo commit_info;
-                    commit_info.tabletId = tablet_id;
-                    commit_info.backendId = node_id;
-                    tablet_commit_infos.emplace_back(std::move(commit_info));
                 }
-            }
-        }
-        for (auto [tablet_id, replicas] : failed_tablets) {
-            if (replicas <= (_num_replicas - 1) / 2) {
-                continue;
-            }
-            auto backends = _location->find_tablet(tablet_id)->node_ids;
-            for (auto& backend_id : backends) {
-                for (const auto& stream : 
_streams_for_node[backend_id]->streams()) {
-                    const auto& failed_tablets = stream->failed_tablets();
-                    if (failed_tablets.contains(tablet_id)) {
-                        return failed_tablets.at(tablet_id);
-                    }
+            });
+
+            for (auto [tablet_id, replicas] : failed_tablets) {
+                if (replicas > (_num_replicas - 1) / 2) {
+                    return failed_reason.at(tablet_id);
                 }
             }
-            DCHECK(false) << "failed tablet " << tablet_id << " should have 
failed reason";
+            _state->tablet_commit_infos().insert(
+                    _state->tablet_commit_infos().end(),
+                    std::make_move_iterator(tablet_commit_infos.begin()),
+                    std::make_move_iterator(tablet_commit_infos.end()));
         }
-        
_state->tablet_commit_infos().insert(_state->tablet_commit_infos().end(),
-                                             
std::make_move_iterator(tablet_commit_infos.begin()),
-                                             
std::make_move_iterator(tablet_commit_infos.end()));
 
         // _number_input_rows don't contain num_rows_load_filtered and 
num_rows_load_unselected in scan node
         int64_t num_rows_load_total = _number_input_rows + 
_state->num_rows_load_filtered() +
@@ -632,30 +628,28 @@ Status VTabletWriterV2::close(Status exec_status) {
     return status;
 }
 
-Status VTabletWriterV2::_close_load(const Streams& streams) {
-    auto node_id = streams[0]->dst_id();
-    std::vector<PTabletID> tablets_to_commit;
-    std::vector<int64_t> partition_ids;
-    for (auto [tablet_id, tablet] : _tablets_for_node[node_id]) {
-        if (_tablet_finder->partition_ids().contains(tablet.partition_id())) {
-            if (VLOG_DEBUG_IS_ON) {
-                partition_ids.push_back(tablet.partition_id());
+void VTabletWriterV2::_calc_tablets_to_commit() {
+    for (const auto& [dst_id, tablets] : _tablets_for_node) {
+        std::vector<PTabletID> tablets_to_commit;
+        std::vector<int64_t> partition_ids;
+        for (const auto& [tablet_id, tablet] : tablets) {
+            if 
(_tablet_finder->partition_ids().contains(tablet.partition_id())) {
+                if (VLOG_DEBUG_IS_ON) {
+                    partition_ids.push_back(tablet.partition_id());
+                }
+                tablets_to_commit.push_back(tablet);
             }
-            tablets_to_commit.push_back(tablet);
         }
-    }
-    if (VLOG_DEBUG_IS_ON) {
-        std::string msg("close load partitions: ");
-        msg.reserve(partition_ids.size() * 7);
-        for (auto v : partition_ids) {
-            msg.append(std::to_string(v) + ", ");
+        if (VLOG_DEBUG_IS_ON) {
+            std::string msg("close load partitions: ");
+            msg.reserve(partition_ids.size() * 7);
+            for (auto v : partition_ids) {
+                msg.append(std::to_string(v) + ", ");
+            }
+            LOG(WARNING) << msg;
         }
-        LOG(WARNING) << msg;
-    }
-    for (const auto& stream : streams) {
-        RETURN_IF_ERROR(stream->close_load(tablets_to_commit));
+        _streams_for_node->save_tablets_to_commit(dst_id, tablets_to_commit);
     }
-    return Status::OK();
 }
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h 
b/be/src/vec/sink/writer/vtablet_writer_v2.h
index 460b3acc33f..7785733bf4a 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.h
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.h
@@ -69,7 +69,7 @@
 namespace doris {
 class DeltaWriterV2;
 class LoadStreamStub;
-class LoadStreams;
+class LoadStreamMap;
 class ObjectPool;
 class RowDescriptor;
 class RuntimeState;
@@ -121,9 +121,9 @@ private:
 
     Status _init(RuntimeState* state, RuntimeProfile* profile);
 
-    Status _open_streams(int64_t src_id);
+    Status _open_streams();
 
-    Status _open_streams_to_backend(int64_t dst_id, LoadStreams& streams);
+    Status _open_streams_to_backend(int64_t dst_id, Streams& streams);
 
     Status _incremental_open_streams(const std::vector<TOlapTablePartition>& 
partitions);
 
@@ -140,7 +140,7 @@ private:
     Status _select_streams(int64_t tablet_id, int64_t partition_id, int64_t 
index_id,
                            Streams& streams);
 
-    Status _close_load(const Streams& streams);
+    void _calc_tablets_to_commit();
 
     Status _cancel(Status status);
 
@@ -217,7 +217,7 @@ private:
     std::unordered_map<int64_t, std::unordered_map<int64_t, PTabletID>> 
_tablets_for_node;
     std::unordered_map<int64_t, std::vector<PTabletID>> _indexes_from_node;
 
-    std::unordered_map<int64_t, std::shared_ptr<LoadStreams>> 
_streams_for_node;
+    std::shared_ptr<LoadStreamMap> _streams_for_node;
 
     size_t _stream_index = 0;
     std::shared_ptr<DeltaWriterV2Map> _delta_writer_for_tablet;
diff --git a/be/test/io/fs/stream_sink_file_writer_test.cpp 
b/be/test/io/fs/stream_sink_file_writer_test.cpp
index 7e5bdd350f5..ad6e496c56f 100644
--- a/be/test/io/fs/stream_sink_file_writer_test.cpp
+++ b/be/test/io/fs/stream_sink_file_writer_test.cpp
@@ -51,7 +51,9 @@ static std::atomic<int64_t> g_num_request;
 class StreamSinkFileWriterTest : public testing::Test {
     class MockStreamStub : public LoadStreamStub {
     public:
-        MockStreamStub(PUniqueId load_id, int64_t src_id) : 
LoadStreamStub(load_id, src_id, 1) {};
+        MockStreamStub(PUniqueId load_id, int64_t src_id)
+                : LoadStreamStub(load_id, src_id, 
std::make_shared<IndexToTabletSchema>(),
+                                 std::make_shared<IndexToEnableMoW>()) {};
 
         virtual ~MockStreamStub() = default;
 
diff --git a/be/test/vec/exec/load_stream_stub_pool_test.cpp 
b/be/test/vec/exec/load_stream_stub_pool_test.cpp
index 24da3bb6999..e576db3bdaa 100644
--- a/be/test/vec/exec/load_stream_stub_pool_test.cpp
+++ b/be/test/vec/exec/load_stream_stub_pool_test.cpp
@@ -32,20 +32,29 @@ TEST_F(LoadStreamStubPoolTest, test) {
     LoadStreamStubPool pool;
     int64_t src_id = 100;
     PUniqueId load_id;
-    load_id.set_hi(1);
+    load_id.set_lo(1);
     load_id.set_hi(2);
-    auto streams1 = pool.get_or_create(load_id, src_id, 101, 5, 1);
-    auto streams2 = pool.get_or_create(load_id, src_id, 102, 5, 1);
-    auto streams3 = pool.get_or_create(load_id, src_id, 101, 5, 1);
+    PUniqueId load_id2;
+    load_id2.set_lo(2);
+    load_id2.set_hi(1);
+    auto streams_for_node1 = pool.get_or_create(load_id, src_id, 5, 2);
+    auto streams_for_node2 = pool.get_or_create(load_id, src_id, 5, 2);
+    EXPECT_EQ(1, pool.size());
+    auto streams_for_node3 = pool.get_or_create(load_id2, src_id, 8, 1);
     EXPECT_EQ(2, pool.size());
-    EXPECT_EQ(1, pool.templates_size());
-    EXPECT_EQ(streams1, streams3);
-    EXPECT_NE(streams1, streams2);
-    streams1->release();
-    streams2->release();
-    streams3->release();
+    EXPECT_EQ(streams_for_node1, streams_for_node2);
+    EXPECT_NE(streams_for_node1, streams_for_node3);
+
+    EXPECT_EQ(5, streams_for_node1->get_or_create(101)->size());
+    EXPECT_EQ(5, streams_for_node2->get_or_create(102)->size());
+    EXPECT_EQ(8, streams_for_node3->get_or_create(101)->size());
+
+    EXPECT_TRUE(streams_for_node3->release());
+    EXPECT_EQ(1, pool.size());
+    EXPECT_FALSE(streams_for_node1->release());
+    EXPECT_EQ(1, pool.size());
+    EXPECT_TRUE(streams_for_node2->release());
     EXPECT_EQ(0, pool.size());
-    EXPECT_EQ(0, pool.templates_size());
 }
 
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to