This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b19abac5e2e [fix](move-memtable) pass num local sink to backends 
(#26897)
b19abac5e2e is described below

commit b19abac5e2ebee26f667b094e77189127c7ebfc6
Author: Kaijie Chen <[email protected]>
AuthorDate: Tue Nov 14 08:28:49 2023 +0800

    [fix](move-memtable) pass num local sink to backends (#26897)
---
 be/src/olap/delta_writer_v2.cpp                    |  3 ++
 be/src/pipeline/pipeline_fragment_context.cpp      |  1 +
 .../pipeline_x/pipeline_x_fragment_context.cpp     |  1 +
 be/src/runtime/plan_fragment_executor.cpp          |  1 +
 be/src/runtime/runtime_state.h                     |  5 +++
 be/src/vec/sink/delta_writer_v2_pool.cpp           | 35 ++++++++++++------
 be/src/vec/sink/delta_writer_v2_pool.h             | 17 ++++-----
 be/src/vec/sink/load_stream_stub.cpp               | 23 ++++++++----
 be/src/vec/sink/load_stream_stub.h                 |  9 +++--
 be/src/vec/sink/load_stream_stub_pool.cpp          | 41 +++++++++++++++-------
 be/src/vec/sink/load_stream_stub_pool.h            | 26 ++++++++++++--
 be/src/vec/sink/vtablet_sink_v2.cpp                | 33 ++++++++++-------
 be/src/vec/sink/vtablet_sink_v2.h                  | 13 +++++--
 be/test/io/fs/stream_sink_file_writer_test.cpp     |  2 +-
 be/test/vec/exec/delta_writer_v2_pool_test.cpp     |  8 ++---
 be/test/vec/exec/load_stream_stub_pool_test.cpp    | 12 +++----
 .../apache/doris/planner/StreamLoadPlanner.java    |  2 ++
 .../main/java/org/apache/doris/qe/Coordinator.java | 18 ++++++----
 gensrc/thrift/PaloInternalService.thrift           |  3 ++
 19 files changed, 177 insertions(+), 76 deletions(-)

diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp
index c87cf7510a4..0a4108970a6 100644
--- a/be/src/olap/delta_writer_v2.cpp
+++ b/be/src/olap/delta_writer_v2.cpp
@@ -100,6 +100,9 @@ Status DeltaWriterV2::init() {
         return Status::OK();
     }
     // build tablet schema in request level
+    if (_streams.size() == 0 || _streams[0]->tablet_schema(_req.index_id) == 
nullptr) {
+        return Status::InternalError("failed to find tablet schema for {}", 
_req.index_id);
+    }
     _build_current_tablet_schema(_req.index_id, _req.table_schema_param,
                                  *_streams[0]->tablet_schema(_req.index_id));
     RowsetWriterContext context;
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 02b991a7aa6..9610122bc02 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -310,6 +310,7 @@ Status PipelineFragmentContext::prepare(const 
doris::TPipelineFragmentParams& re
     _runtime_state->set_num_per_fragment_instances(request.num_senders);
     _runtime_state->set_load_stream_per_node(request.load_stream_per_node);
     _runtime_state->set_total_load_streams(request.total_load_streams);
+    _runtime_state->set_num_local_sink(request.num_local_sink);
 
     if (request.fragment.__isset.output_sink) {
         RETURN_IF_ERROR_OR_CATCH_EXCEPTION(DataSink::create_data_sink(
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index aa029709717..47805a04772 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -203,6 +203,7 @@ Status PipelineXFragmentContext::prepare(const 
doris::TPipelineFragmentParams& r
     _runtime_state->set_num_per_fragment_instances(request.num_senders);
     _runtime_state->set_load_stream_per_node(request.load_stream_per_node);
     _runtime_state->set_total_load_streams(request.total_load_streams);
+    _runtime_state->set_num_local_sink(request.num_local_sink);
 
     // 2. Build pipelines with operators in this fragment.
     auto root_pipeline = add_pipeline();
diff --git a/be/src/runtime/plan_fragment_executor.cpp 
b/be/src/runtime/plan_fragment_executor.cpp
index 9327f96a5d5..3af870c7f07 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -214,6 +214,7 @@ Status PlanFragmentExecutor::prepare(const 
TExecPlanFragmentParams& request) {
     _runtime_state->set_num_per_fragment_instances(params.num_senders);
     _runtime_state->set_load_stream_per_node(request.load_stream_per_node);
     _runtime_state->set_total_load_streams(request.total_load_streams);
+    _runtime_state->set_num_local_sink(request.num_local_sink);
 
     // set up sink, if required
     if (request.fragment.__isset.output_sink) {
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index aae2fb3cce4..3b420511fa0 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -319,6 +319,10 @@ public:
 
     int total_load_streams() const { return _total_load_streams; }
 
+    void set_num_local_sink(int num_local_sink) { _num_local_sink = 
num_local_sink; }
+
+    int num_local_sink() const { return _num_local_sink; }
+
     bool disable_stream_preaggregations() const {
         return _query_options.disable_stream_preaggregations;
     }
@@ -553,6 +557,7 @@ private:
     int _num_per_fragment_instances = 0;
     int _load_stream_per_node = 0;
     int _total_load_streams = 0;
+    int _num_local_sink = 0;
 
     // The backend id on which this fragment instance runs
     int64_t _backend_id = -1;
diff --git a/be/src/vec/sink/delta_writer_v2_pool.cpp 
b/be/src/vec/sink/delta_writer_v2_pool.cpp
index b9057136d97..dc9a2765a55 100644
--- a/be/src/vec/sink/delta_writer_v2_pool.cpp
+++ b/be/src/vec/sink/delta_writer_v2_pool.cpp
@@ -25,7 +25,8 @@ class TExpr;
 
 namespace vectorized {
 
-DeltaWriterV2Map::DeltaWriterV2Map(UniqueId load_id) : _load_id(load_id), 
_use_cnt(1) {}
+DeltaWriterV2Map::DeltaWriterV2Map(UniqueId load_id, int num_use, 
DeltaWriterV2Pool* pool)
+        : _load_id(load_id), _use_cnt(num_use), _pool(pool) {}
 
 DeltaWriterV2Map::~DeltaWriterV2Map() = default;
 
@@ -38,9 +39,15 @@ DeltaWriterV2* DeltaWriterV2Map::get_or_create(
 }
 
 Status DeltaWriterV2Map::close(RuntimeProfile* profile) {
-    if (--_use_cnt > 0) {
+    int num_use = --_use_cnt;
+    if (num_use > 0) {
+        LOG(INFO) << "not closing DeltaWriterV2Map << " << _load_id << " , 
use_cnt = " << num_use;
         return Status::OK();
     }
+    LOG(INFO) << "closing DeltaWriterV2Map " << _load_id;
+    if (_pool != nullptr) {
+        _pool->erase(_load_id);
+    }
     Status status = Status::OK();
     _map.for_each([&status](auto& entry) {
         if (status.ok()) {
@@ -59,6 +66,11 @@ Status DeltaWriterV2Map::close(RuntimeProfile* profile) {
 }
 
 void DeltaWriterV2Map::cancel(Status status) {
+    int num_use = --_use_cnt;
+    LOG(INFO) << "cancelling DeltaWriterV2Map " << _load_id << ", use_cnt = " 
<< num_use;
+    if (num_use == 0 && _pool != nullptr) {
+        _pool->erase(_load_id);
+    }
     _map.for_each([&status](auto& entry) {
         static_cast<void>(entry.second->cancel_with_status(status));
     });
@@ -68,23 +80,24 @@ DeltaWriterV2Pool::DeltaWriterV2Pool() = default;
 
 DeltaWriterV2Pool::~DeltaWriterV2Pool() = default;
 
-std::shared_ptr<DeltaWriterV2Map> DeltaWriterV2Pool::get_or_create(PUniqueId 
load_id) {
+std::shared_ptr<DeltaWriterV2Map> DeltaWriterV2Pool::get_or_create(PUniqueId 
load_id,
+                                                                   int 
num_sink) {
     UniqueId id {load_id};
     std::lock_guard<std::mutex> lock(_mutex);
-    std::shared_ptr<DeltaWriterV2Map> map = _pool[id].lock();
+    std::shared_ptr<DeltaWriterV2Map> map = _pool[id];
     if (map) {
-        map->grab();
         return map;
     }
-    auto deleter = [this](DeltaWriterV2Map* m) {
-        std::lock_guard<std::mutex> lock(_mutex);
-        _pool.erase(m->unique_id());
-        delete m;
-    };
-    map = std::shared_ptr<DeltaWriterV2Map>(new DeltaWriterV2Map(id), deleter);
+    map = std::make_shared<DeltaWriterV2Map>(id, num_sink, this);
     _pool[id] = map;
     return map;
 }
 
+void DeltaWriterV2Pool::erase(UniqueId load_id) {
+    std::lock_guard<std::mutex> lock(_mutex);
+    LOG(INFO) << "erasing DeltaWriterV2Map, load_id = " << load_id;
+    _pool.erase(load_id);
+}
+
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/sink/delta_writer_v2_pool.h 
b/be/src/vec/sink/delta_writer_v2_pool.h
index 8439062440a..f05b144200f 100644
--- a/be/src/vec/sink/delta_writer_v2_pool.h
+++ b/be/src/vec/sink/delta_writer_v2_pool.h
@@ -58,26 +58,24 @@ class RuntimeProfile;
 
 namespace vectorized {
 
+class DeltaWriterV2Pool;
+
 class DeltaWriterV2Map {
 public:
-    DeltaWriterV2Map(UniqueId load_id);
+    DeltaWriterV2Map(UniqueId load_id, int num_use = 1, DeltaWriterV2Pool* 
pool = nullptr);
 
     ~DeltaWriterV2Map();
 
-    void grab() { ++_use_cnt; }
-
     // get or create delta writer for the given tablet, memory is managed by 
DeltaWriterV2Map
     DeltaWriterV2* get_or_create(int64_t tablet_id,
                                  
std::function<std::unique_ptr<DeltaWriterV2>()> creator);
 
     // close all delta writers in this DeltaWriterV2Map if there is no other 
users
-    Status close(RuntimeProfile* profile);
+    Status close(RuntimeProfile* profile = nullptr);
 
     // cancel all delta writers in this DeltaWriterV2Map
     void cancel(Status status);
 
-    UniqueId unique_id() const { return _load_id; }
-
     size_t size() const { return _map.size(); }
 
 private:
@@ -89,6 +87,7 @@ private:
     UniqueId _load_id;
     TabletToDeltaWriterV2Map _map;
     std::atomic<int> _use_cnt;
+    DeltaWriterV2Pool* _pool;
 };
 
 class DeltaWriterV2Pool {
@@ -97,7 +96,9 @@ public:
 
     ~DeltaWriterV2Pool();
 
-    std::shared_ptr<DeltaWriterV2Map> get_or_create(PUniqueId load_id);
+    std::shared_ptr<DeltaWriterV2Map> get_or_create(PUniqueId load_id, int 
num_sink = 1);
+
+    void erase(UniqueId load_id);
 
     size_t size() {
         std::lock_guard<std::mutex> lock(_mutex);
@@ -106,7 +107,7 @@ public:
 
 private:
     std::mutex _mutex;
-    std::unordered_map<UniqueId, std::weak_ptr<DeltaWriterV2Map>> _pool;
+    std::unordered_map<UniqueId, std::shared_ptr<DeltaWriterV2Map>> _pool;
 };
 
 } // namespace vectorized
diff --git a/be/src/vec/sink/load_stream_stub.cpp 
b/be/src/vec/sink/load_stream_stub.cpp
index c2f7f246f30..75b814dc23f 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -83,14 +83,16 @@ void 
LoadStreamStub::LoadStreamReplyHandler::on_closed(brpc::StreamId id) {
     _close_cv.notify_all();
 }
 
-LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id)
-        : _load_id(load_id),
+LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id, int num_use)
+        : _use_cnt(num_use),
+          _load_id(load_id),
           _src_id(src_id),
           _tablet_schema_for_index(std::make_shared<IndexToTabletSchema>()),
           _enable_unique_mow_for_index(std::make_shared<IndexToEnableMoW>()) 
{};
 
 LoadStreamStub::LoadStreamStub(LoadStreamStub& stub)
-        : _load_id(stub._load_id),
+        : _use_cnt(stub._use_cnt.load()),
+          _load_id(stub._load_id),
           _src_id(stub._src_id),
           _tablet_schema_for_index(stub._tablet_schema_for_index),
           _enable_unique_mow_for_index(stub._enable_unique_mow_for_index) {};
@@ -107,7 +109,6 @@ Status 
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
                             const OlapTableSchemaParam& schema,
                             const std::vector<PTabletID>& tablets_for_schema, 
int total_streams,
                             bool enable_profile) {
-    _num_open++;
     std::unique_lock<bthread::Mutex> lock(_mutex);
     if (_is_init.load()) {
         return Status::OK();
@@ -190,15 +191,23 @@ Status LoadStreamStub::add_segment(int64_t partition_id, 
int64_t index_id, int64
 
 // CLOSE_LOAD
 Status LoadStreamStub::close_load(const std::vector<PTabletID>& 
tablets_to_commit) {
-    if (--_num_open > 0) {
+    {
+        std::lock_guard<std::mutex> lock(_tablets_to_commit_mutex);
+        _tablets_to_commit.insert(_tablets_to_commit.end(), 
tablets_to_commit.begin(),
+                                  tablets_to_commit.end());
+    }
+    if (--_use_cnt > 0) {
         return Status::OK();
     }
     PStreamHeader header;
     *header.mutable_load_id() = _load_id;
     header.set_src_id(_src_id);
     header.set_opcode(doris::PStreamHeader::CLOSE_LOAD);
-    for (const auto& tablet : tablets_to_commit) {
-        *header.add_tablets_to_commit() = tablet;
+    {
+        std::lock_guard<std::mutex> lock(_tablets_to_commit_mutex);
+        for (const auto& tablet : _tablets_to_commit) {
+            *header.add_tablets_to_commit() = tablet;
+        }
     }
     return _encode_and_send(header);
 }
diff --git a/be/src/vec/sink/load_stream_stub.h 
b/be/src/vec/sink/load_stream_stub.h
index 3650b2aeae2..b17acea4cb3 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -134,7 +134,7 @@ private:
 
 public:
     // construct new stub
-    LoadStreamStub(PUniqueId load_id, int64_t src_id);
+    LoadStreamStub(PUniqueId load_id, int64_t src_id, int num_use);
 
     // copy constructor, shared_ptr members are shared
     LoadStreamStub(LoadStreamStub& stub);
@@ -177,7 +177,7 @@ public:
     }
 
     std::shared_ptr<TabletSchema> tablet_schema(int64_t index_id) const {
-        return _tablet_schema_for_index->at(index_id);
+        return (*_tablet_schema_for_index)[index_id];
     }
 
     bool enable_unique_mow(int64_t index_id) const {
@@ -203,7 +203,10 @@ protected:
     std::atomic<bool> _is_init;
     bthread::Mutex _mutex;
 
-    std::atomic<int> _num_open;
+    std::atomic<int> _use_cnt;
+
+    std::mutex _tablets_to_commit_mutex;
+    std::vector<PTabletID> _tablets_to_commit;
 
     std::mutex _buffer_mutex;
     std::mutex _send_mutex;
diff --git a/be/src/vec/sink/load_stream_stub_pool.cpp 
b/be/src/vec/sink/load_stream_stub_pool.cpp
index 5848ca5a51a..ec7e53211fb 100644
--- a/be/src/vec/sink/load_stream_stub_pool.cpp
+++ b/be/src/vec/sink/load_stream_stub_pool.cpp
@@ -24,33 +24,50 @@ class TExpr;
 
 namespace stream_load {
 
+LoadStreams::LoadStreams(UniqueId load_id, int64_t dst_id, int num_use, 
LoadStreamStubPool* pool)
+        : _load_id(load_id), _dst_id(dst_id), _use_cnt(num_use), _pool(pool) {}
+
+void LoadStreams::release() {
+    int num_use = --_use_cnt;
+    if (num_use == 0) {
+        LOG(INFO) << "releasing streams for load_id = " << _load_id << ", 
dst_id = " << _dst_id;
+        _pool->erase(_load_id, _dst_id);
+    } else {
+        LOG(INFO) << "no releasing streams for load_id = " << _load_id << ", 
dst_id = " << _dst_id
+                  << ", use_cnt = " << num_use;
+    }
+}
+
 LoadStreamStubPool::LoadStreamStubPool() = default;
 
 LoadStreamStubPool::~LoadStreamStubPool() = default;
-std::shared_ptr<Streams> LoadStreamStubPool::get_or_create(PUniqueId load_id, 
int64_t src_id,
-                                                           int64_t dst_id, int 
num_streams) {
+
+std::shared_ptr<LoadStreams> LoadStreamStubPool::get_or_create(PUniqueId 
load_id, int64_t src_id,
+                                                               int64_t dst_id, 
int num_streams,
+                                                               int num_sink) {
     auto key = std::make_pair(UniqueId(load_id), dst_id);
     std::lock_guard<std::mutex> lock(_mutex);
-    std::shared_ptr<Streams> streams = _pool[key].lock();
+    std::shared_ptr<LoadStreams> streams = _pool[key];
     if (streams) {
         return streams;
     }
     DCHECK(num_streams > 0) << "stream num should be greater than 0";
-    auto [it, _] = _template_stubs.emplace(load_id, new LoadStreamStub 
{load_id, src_id});
-    auto deleter = [this, key](Streams* s) {
-        std::lock_guard<std::mutex> lock(_mutex);
-        _pool.erase(key);
-        _template_stubs.erase(key.first);
-        delete s;
-    };
-    streams = std::shared_ptr<Streams>(new Streams(), deleter);
+    DCHECK(num_sink > 0) << "sink num should be greater than 0";
+    auto [it, _] = _template_stubs.emplace(load_id, new LoadStreamStub 
{load_id, src_id, num_sink});
+    streams = std::make_shared<LoadStreams>(load_id, dst_id, num_sink, this);
     for (int32_t i = 0; i < num_streams; i++) {
         // copy construct, internal tablet schema map will be shared among all 
stubs
-        streams->emplace_back(new LoadStreamStub {*it->second});
+        streams->streams().emplace_back(new LoadStreamStub {*it->second});
     }
     _pool[key] = streams;
     return streams;
 }
 
+void LoadStreamStubPool::erase(UniqueId load_id, int64_t dst_id) {
+    std::lock_guard<std::mutex> lock(_mutex);
+    _pool.erase(std::make_pair(load_id, dst_id));
+    _template_stubs.erase(load_id);
+}
+
 } // namespace stream_load
 } // namespace doris
diff --git a/be/src/vec/sink/load_stream_stub_pool.h 
b/be/src/vec/sink/load_stream_stub_pool.h
index 73b41fdd61a..2cf55be4915 100644
--- a/be/src/vec/sink/load_stream_stub_pool.h
+++ b/be/src/vec/sink/load_stream_stub_pool.h
@@ -70,16 +70,36 @@ class LoadStreamStub;
 
 namespace stream_load {
 
+class LoadStreamStubPool;
+
 using Streams = std::vector<std::shared_ptr<LoadStreamStub>>;
 
+class LoadStreams {
+public:
+    LoadStreams(UniqueId load_id, int64_t dst_id, int num_use, 
LoadStreamStubPool* pool);
+
+    void release();
+
+    Streams& streams() { return _streams; }
+
+private:
+    Streams _streams;
+    UniqueId _load_id;
+    int64_t _dst_id;
+    std::atomic<int> _use_cnt;
+    LoadStreamStubPool* _pool;
+};
+
 class LoadStreamStubPool {
 public:
     LoadStreamStubPool();
 
     ~LoadStreamStubPool();
 
-    std::shared_ptr<Streams> get_or_create(PUniqueId load_id, int64_t src_id, 
int64_t dst_id,
-                                           int num_streams);
+    std::shared_ptr<LoadStreams> get_or_create(PUniqueId load_id, int64_t 
src_id, int64_t dst_id,
+                                               int num_streams, int num_sink);
+
+    void erase(UniqueId load_id, int64_t dst_id);
 
     size_t size() {
         std::lock_guard<std::mutex> lock(_mutex);
@@ -95,7 +115,7 @@ public:
 private:
     std::mutex _mutex;
     std::unordered_map<UniqueId, std::unique_ptr<LoadStreamStub>> 
_template_stubs;
-    std::unordered_map<std::pair<UniqueId, int64_t>, std::weak_ptr<Streams>> 
_pool;
+    std::unordered_map<std::pair<UniqueId, int64_t>, 
std::shared_ptr<LoadStreams>> _pool;
 };
 
 } // namespace stream_load
diff --git a/be/src/vec/sink/vtablet_sink_v2.cpp 
b/be/src/vec/sink/vtablet_sink_v2.cpp
index 3aa23e10595..4e93d11e2e8 100644
--- a/be/src/vec/sink/vtablet_sink_v2.cpp
+++ b/be/src/vec/sink/vtablet_sink_v2.cpp
@@ -155,10 +155,12 @@ Status VOlapTableSinkV2::prepare(RuntimeState* state) {
     _num_senders = state->num_per_fragment_instances();
     _stream_per_node = state->load_stream_per_node();
     _total_streams = state->total_load_streams();
+    _num_local_sink = state->num_local_sink();
     DCHECK(_stream_per_node > 0) << "load stream per node should be greator 
than 0";
     DCHECK(_total_streams > 0) << "total load streams should be greator than 
0";
+    DCHECK(_num_local_sink > 0) << "num local sink should be greator than 0";
     LOG(INFO) << "num senders: " << _num_senders << ", stream per node: " << 
_stream_per_node
-              << ", total_streams " << _total_streams;
+              << ", total_streams " << _total_streams << ", num_local_sink: " 
<< _num_local_sink;
     _is_high_priority =
             (state->execution_timeout() <= 
config::load_task_high_priority_threshold_second);
 
@@ -197,8 +199,8 @@ Status VOlapTableSinkV2::prepare(RuntimeState* state) {
     // Prepare the exprs to run.
     RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, 
_row_desc));
     if (config::share_delta_writers) {
-        _delta_writer_for_tablet =
-                
ExecEnv::GetInstance()->delta_writer_v2_pool()->get_or_create(_load_id);
+        _delta_writer_for_tablet = 
ExecEnv::GetInstance()->delta_writer_v2_pool()->get_or_create(
+                _load_id, _num_local_sink);
     } else {
         _delta_writer_for_tablet = 
std::make_shared<DeltaWriterV2Map>(_load_id);
     }
@@ -226,18 +228,17 @@ Status VOlapTableSinkV2::_open_streams(int64_t src_id) {
         if (node_info == nullptr) {
             return Status::InternalError("Unknown node {} in tablet location", 
dst_id);
         }
-        std::shared_ptr<Streams> streams;
-        streams = 
ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(
-                _load_id, src_id, dst_id, _stream_per_node);
+        auto streams = 
ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(
+                _load_id, src_id, dst_id, _stream_per_node, _num_local_sink);
         // get tablet schema from each backend only in the 1st stream
-        for (auto& stream : *streams | std::ranges::views::take(1)) {
+        for (auto& stream : streams->streams() | std::ranges::views::take(1)) {
             const std::vector<PTabletID>& tablets_for_schema = 
_indexes_from_node[node_info->id];
             
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(),
                                          *node_info, _txn_id, *_schema, 
tablets_for_schema,
                                          _total_streams, 
_state->enable_profile()));
         }
         // for the rest streams, open without getting tablet schema
-        for (auto& stream : *streams | std::ranges::views::drop(1)) {
+        for (auto& stream : streams->streams() | std::ranges::views::drop(1)) {
             
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(),
                                          *node_info, _txn_id, *_schema, {}, 
_total_streams,
                                          _state->enable_profile()));
@@ -300,7 +301,7 @@ Status VOlapTableSinkV2::_select_streams(int64_t tablet_id, 
Streams& streams) {
         return Status::InternalError("unknown tablet location, tablet id = 
{}", tablet_id);
     }
     for (auto& node_id : location->node_ids) {
-        streams.emplace_back(_streams_for_node[node_id]->at(_stream_index));
+        
streams.emplace_back(_streams_for_node[node_id]->streams().at(_stream_index));
     }
     _stream_index = (_stream_index + 1) % _stream_per_node;
     return Status::OK();
@@ -393,6 +394,9 @@ Status VOlapTableSinkV2::_cancel(Status status) {
         _delta_writer_for_tablet->cancel(status);
         _delta_writer_for_tablet.reset();
     }
+    for (const auto& [_, streams] : _streams_for_node) {
+        streams->release();
+    }
     return Status::OK();
 }
 
@@ -415,6 +419,11 @@ Status VOlapTableSinkV2::close(RuntimeState* state, Status 
exec_status) {
         COUNTER_SET(_row_distribution_timer, 
(int64_t)_row_distribution_watch.elapsed_time());
         COUNTER_SET(_validate_data_timer, 
_block_convertor->validate_data_ns());
 
+        // release streams from the pool first, to prevent memory leak
+        for (const auto& [_, streams] : _streams_for_node) {
+            streams->release();
+        }
+
         {
             SCOPED_TIMER(_close_writer_timer);
             // close all delta writers if this is the last user
@@ -425,14 +434,14 @@ Status VOlapTableSinkV2::close(RuntimeState* state, 
Status exec_status) {
         {
             // send CLOSE_LOAD to all streams, return ERROR if any
             for (const auto& [_, streams] : _streams_for_node) {
-                RETURN_IF_ERROR(_close_load(*streams));
+                RETURN_IF_ERROR(_close_load(streams->streams()));
             }
         }
 
         {
             SCOPED_TIMER(_close_load_timer);
             for (const auto& [_, streams] : _streams_for_node) {
-                for (const auto& stream : *streams) {
+                for (const auto& stream : streams->streams()) {
                     RETURN_IF_ERROR(stream->close_wait());
                 }
             }
@@ -440,7 +449,7 @@ Status VOlapTableSinkV2::close(RuntimeState* state, Status 
exec_status) {
 
         std::vector<TTabletCommitInfo> tablet_commit_infos;
         for (const auto& [node_id, streams] : _streams_for_node) {
-            for (const auto& stream : *streams) {
+            for (const auto& stream : streams->streams()) {
                 for (auto tablet_id : stream->success_tablets()) {
                     TTabletCommitInfo commit_info;
                     commit_info.tabletId = tablet_id;
diff --git a/be/src/vec/sink/vtablet_sink_v2.h 
b/be/src/vec/sink/vtablet_sink_v2.h
index 6f369286677..71f85c8156c 100644
--- a/be/src/vec/sink/vtablet_sink_v2.h
+++ b/be/src/vec/sink/vtablet_sink_v2.h
@@ -77,6 +77,10 @@ class TExpr;
 class TabletSchema;
 class TupleDescriptor;
 
+namespace stream_load {
+class LoadStreams;
+}
+
 namespace vectorized {
 
 class OlapTableBlockConvertor;
@@ -156,8 +160,9 @@ private:
     // To support multiple senders, we maintain a channel for each sender.
     int _sender_id = -1;
     int _num_senders = -1;
-    int _stream_per_node = 0;
-    int _total_streams = 0;
+    int _stream_per_node = -1;
+    int _total_streams = -1;
+    int _num_local_sink = -1;
     bool _is_high_priority = false;
     bool _write_file_cache = false;
 
@@ -204,7 +209,9 @@ private:
     std::unordered_map<int64_t, std::vector<PTabletID>> _tablets_for_node;
     std::unordered_map<int64_t, std::vector<PTabletID>> _indexes_from_node;
 
-    std::unordered_map<int64_t, std::shared_ptr<Streams>> _streams_for_node;
+    std::unordered_map<int64_t, 
std::shared_ptr<::doris::stream_load::LoadStreams>>
+            _streams_for_node;
+
     size_t _stream_index = 0;
     std::shared_ptr<DeltaWriterV2Map> _delta_writer_for_tablet;
 
diff --git a/be/test/io/fs/stream_sink_file_writer_test.cpp 
b/be/test/io/fs/stream_sink_file_writer_test.cpp
index c6ac4a3f501..c52b59f01e4 100644
--- a/be/test/io/fs/stream_sink_file_writer_test.cpp
+++ b/be/test/io/fs/stream_sink_file_writer_test.cpp
@@ -51,7 +51,7 @@ static std::atomic<int64_t> g_num_request;
 class StreamSinkFileWriterTest : public testing::Test {
     class MockStreamStub : public LoadStreamStub {
     public:
-        MockStreamStub(PUniqueId load_id, int64_t src_id) : 
LoadStreamStub(load_id, src_id) {};
+        MockStreamStub(PUniqueId load_id, int64_t src_id) : 
LoadStreamStub(load_id, src_id, 1) {};
 
         virtual ~MockStreamStub() = default;
 
diff --git a/be/test/vec/exec/delta_writer_v2_pool_test.cpp 
b/be/test/vec/exec/delta_writer_v2_pool_test.cpp
index 30b56b65d11..d44fd17a761 100644
--- a/be/test/vec/exec/delta_writer_v2_pool_test.cpp
+++ b/be/test/vec/exec/delta_writer_v2_pool_test.cpp
@@ -42,9 +42,9 @@ TEST_F(DeltaWriterV2PoolTest, test_pool) {
     EXPECT_EQ(2, pool.size());
     EXPECT_EQ(map, map3);
     EXPECT_NE(map, map2);
-    map.reset();
-    map2.reset();
-    map3.reset();
+    EXPECT_TRUE(map->close().ok());
+    EXPECT_TRUE(map2->close().ok());
+    EXPECT_TRUE(map3->close().ok());
     EXPECT_EQ(0, pool.size());
 }
 
@@ -62,7 +62,7 @@ TEST_F(DeltaWriterV2PoolTest, test_map) {
     EXPECT_EQ(2, map->size());
     EXPECT_EQ(writer, writer3);
     EXPECT_NE(writer, writer2);
-    map.reset();
+    static_cast<void>(map->close());
     EXPECT_EQ(0, pool.size());
 }
 
diff --git a/be/test/vec/exec/load_stream_stub_pool_test.cpp 
b/be/test/vec/exec/load_stream_stub_pool_test.cpp
index 929b906aab7..73f09508896 100644
--- a/be/test/vec/exec/load_stream_stub_pool_test.cpp
+++ b/be/test/vec/exec/load_stream_stub_pool_test.cpp
@@ -36,16 +36,16 @@ TEST_F(LoadStreamStubPoolTest, test) {
     PUniqueId load_id;
     load_id.set_hi(1);
     load_id.set_hi(2);
-    auto streams1 = pool.get_or_create(load_id, src_id, 101, 5);
-    auto streams2 = pool.get_or_create(load_id, src_id, 102, 5);
-    auto streams3 = pool.get_or_create(load_id, src_id, 101, 5);
+    auto streams1 = pool.get_or_create(load_id, src_id, 101, 5, 1);
+    auto streams2 = pool.get_or_create(load_id, src_id, 102, 5, 1);
+    auto streams3 = pool.get_or_create(load_id, src_id, 101, 5, 1);
     EXPECT_EQ(2, pool.size());
     EXPECT_EQ(1, pool.templates_size());
     EXPECT_EQ(streams1, streams3);
     EXPECT_NE(streams1, streams2);
-    streams1.reset();
-    streams2.reset();
-    streams3.reset();
+    streams1->release();
+    streams2->release();
+    streams3->release();
     EXPECT_EQ(0, pool.size());
     EXPECT_EQ(0, pool.templates_size());
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index f4131235da3..ce8b08086ab 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -300,6 +300,7 @@ public class StreamLoadPlanner {
         params.setParams(execParams);
         params.setLoadStreamPerNode(taskInfo.getStreamPerNode());
         params.setTotalLoadStreams(taskInfo.getStreamPerNode());
+        params.setNumLocalSink(1);
         TQueryOptions queryOptions = new TQueryOptions();
         queryOptions.setQueryType(TQueryType.LOAD);
         queryOptions.setQueryTimeout(timeout);
@@ -503,6 +504,7 @@ public class StreamLoadPlanner {
         pipParams.setNumSenders(1);
         pipParams.setLoadStreamPerNode(taskInfo.getStreamPerNode());
         pipParams.setTotalLoadStreams(taskInfo.getStreamPerNode());
+        pipParams.setNumLocalSink(1);
 
         TPipelineInstanceParams localParams = new TPipelineInstanceParams();
         localParams.setFragmentInstanceId(new TUniqueId(loadId.hi, loadId.lo + 
fragmentInstanceIdIndex));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 90fb91304c2..868a9928220 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -683,7 +683,7 @@ public class Coordinator implements CoordInterface {
             int backendIdx = 0;
             int profileFragmentId = 0;
             long memoryLimit = queryOptions.getMemLimit();
-            Set<Long> backendsWithOlapTableSink = Sets.newHashSet();
+            Map<Long, Integer> numSinkOnBackend = Maps.newHashMap();
             beToExecStates.clear();
             // If #fragments >=2, use twoPhaseExecution with 
exec_plan_fragments_prepare and exec_plan_fragments_start,
             // else use exec_plan_fragments directly.
@@ -753,7 +753,7 @@ public class Coordinator implements CoordInterface {
                     states.addState(execState);
                     if (tParam.getFragment().getOutputSink() != null
                             && tParam.getFragment().getOutputSink().getType() 
== TDataSinkType.OLAP_TABLE_SINK) {
-                        
backendsWithOlapTableSink.add(execState.backend.getId());
+                        numSinkOnBackend.merge(execState.backend.getId(), 1, 
Integer::sum);
                     }
                     ++backendIdx;
                 }
@@ -765,7 +765,10 @@ public class Coordinator implements CoordInterface {
                     if (tParam.getFragment().getOutputSink() != null
                             && tParam.getFragment().getOutputSink().getType() 
== TDataSinkType.OLAP_TABLE_SINK) {
                         tParam.setLoadStreamPerNode(loadStreamPerNode);
-                        
tParam.setTotalLoadStreams(backendsWithOlapTableSink.size() * 
loadStreamPerNode);
+                        tParam.setTotalLoadStreams(numSinkOnBackend.size() * 
loadStreamPerNode);
+                        
tParam.setNumLocalSink(numSinkOnBackend.get(tParam.getBackendId()));
+                        LOG.info("num local sink for backend {} is {}", 
tParam.getBackendId(),
+                                numSinkOnBackend.get(tParam.getBackendId()));
                     }
                 }
                 profileFragmentId += 1;
@@ -844,7 +847,7 @@ public class Coordinator implements CoordInterface {
                     }
                 }
 
-                Set<Long> backendsWithOlapTableSink = Sets.newHashSet();
+                int numBackendsWithSink = 0;
                 // 3. group PipelineExecContext by BE.
                 // So that we can use one RPC to send all fragment instances 
of a BE.
                 for (Map.Entry<TNetworkAddress, TPipelineFragmentParams> entry 
: tParams.entrySet()) {
@@ -881,7 +884,7 @@ public class Coordinator implements CoordInterface {
                     if (entry.getValue().getFragment().getOutputSink() != null
                             && 
entry.getValue().getFragment().getOutputSink().getType()
                             == TDataSinkType.OLAP_TABLE_SINK) {
-                        backendsWithOlapTableSink.add(backendId);
+                        numBackendsWithSink++;
                     }
                     ++backendIdx;
                 }
@@ -894,7 +897,10 @@ public class Coordinator implements CoordInterface {
                             && 
entry.getValue().getFragment().getOutputSink().getType()
                             == TDataSinkType.OLAP_TABLE_SINK) {
                         
entry.getValue().setLoadStreamPerNode(loadStreamPerNode);
-                        
entry.getValue().setTotalLoadStreams(backendsWithOlapTableSink.size() * 
loadStreamPerNode);
+                        
entry.getValue().setTotalLoadStreams(numBackendsWithSink * loadStreamPerNode);
+                        
entry.getValue().setNumLocalSink(entry.getValue().getLocalParams().size());
+                        LOG.info("num local sink for backend {} is {}", 
entry.getValue().getBackendId(),
+                                entry.getValue().getNumLocalSink());
                     }
                 }
 
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index e55ab89e328..7c55842735c 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -461,6 +461,8 @@ struct TExecPlanFragmentParams {
 
   // total num of load streams the downstream backend will see
   27: optional i32 total_load_streams
+
+  28: optional i32 num_local_sink
 }
 
 struct TExecPlanFragmentParamsList {
@@ -678,6 +680,7 @@ struct TPipelineFragmentParams {
   30: optional bool group_commit = false;
   31: optional i32 load_stream_per_node // num load stream for each sink 
backend
   32: optional i32 total_load_streams // total num of load streams the 
downstream backend will see
+  33: optional i32 num_local_sink
 }
 
 struct TPipelineFragmentParamsList {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to