This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 58bf79f79e6 [fix](move-memtable) pass load stream num to backends 
(#26198)
58bf79f79e6 is described below

commit 58bf79f79e6bccea1942d0234855f667fce8546e
Author: Kaijie Chen <[email protected]>
AuthorDate: Wed Nov 8 16:16:33 2023 +0800

    [fix](move-memtable) pass load stream num to backends (#26198)
---
 be/src/common/config.cpp                           |   2 -
 be/src/common/config.h                             |   2 -
 be/src/pipeline/pipeline_fragment_context.cpp      |   2 +
 .../pipeline_x/pipeline_x_fragment_context.cpp     |   2 +
 be/src/runtime/load_stream.cpp                     |  97 ++++++++--------
 be/src/runtime/load_stream.h                       |  26 +++--
 be/src/runtime/plan_fragment_executor.cpp          |   2 +
 be/src/runtime/runtime_state.h                     |  14 +++
 be/src/service/internal_service.cpp                |   1 -
 be/src/vec/sink/load_stream_stub.cpp               |   5 +-
 be/src/vec/sink/load_stream_stub.h                 |   3 +-
 be/src/vec/sink/load_stream_stub_pool.cpp          |   4 +-
 be/src/vec/sink/load_stream_stub_pool.h            |   3 +-
 be/src/vec/sink/vtablet_sink_v2.cpp                |  16 ++-
 be/src/vec/sink/vtablet_sink_v2.h                  |   2 +
 be/test/runtime/load_stream_test.cpp               | 127 +++++++++++++++++++--
 be/test/vec/exec/load_stream_stub_pool_test.cpp    |   6 +-
 .../apache/doris/planner/StreamLoadPlanner.java    |   4 +
 .../main/java/org/apache/doris/qe/Coordinator.java |  35 ++++++
 .../java/org/apache/doris/qe/SessionVariable.java  |  13 +++
 .../java/org/apache/doris/task/LoadTaskInfo.java   |   4 +
 .../java/org/apache/doris/task/StreamLoadTask.java |  13 +++
 gensrc/proto/internal_service.proto                |   1 +
 gensrc/thrift/FrontendService.thrift               |   1 +
 gensrc/thrift/PaloInternalService.thrift           |   8 ++
 25 files changed, 312 insertions(+), 81 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 17f69c3d360..0ccb48f116e 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -742,8 +742,6 @@ DEFINE_mDouble(tablet_version_graph_orphan_vertex_ratio, 
"0.1");
 
 // share delta writers when memtable_on_sink_node = true
 DEFINE_Bool(share_delta_writers, "true");
-// number of brpc stream per load
-DEFINE_Int32(num_streams_per_load, "5");
 // timeout for open load stream rpc in ms
 DEFINE_Int64(open_load_stream_timeout_ms, "500");
 
diff --git a/be/src/common/config.h b/be/src/common/config.h
index cfcd09c1984..f0fdf58558c 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -799,8 +799,6 @@ DECLARE_mDouble(tablet_version_graph_orphan_vertex_ratio);
 
 // share delta writers when memtable_on_sink_node = true
 DECLARE_Bool(share_delta_writers);
-// number of brpc stream per load
-DECLARE_Int32(num_streams_per_load);
 // timeout for open load stream rpc in ms
 DECLARE_Int64(open_load_stream_timeout_ms);
 
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 67d0e6045b8..62bc3ca8dec 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -318,6 +318,8 @@ Status PipelineFragmentContext::prepare(const 
doris::TPipelineFragmentParams& re
 
     _runtime_state->set_per_fragment_instance_idx(local_params.sender_id);
     _runtime_state->set_num_per_fragment_instances(request.num_senders);
+    _runtime_state->set_load_stream_per_node(request.load_stream_per_node);
+    _runtime_state->set_total_load_streams(request.total_load_streams);
 
     if (request.fragment.__isset.output_sink) {
         RETURN_IF_ERROR_OR_CATCH_EXCEPTION(DataSink::create_data_sink(
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 95cdf47bec5..fee49621f0e 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -211,6 +211,8 @@ Status PipelineXFragmentContext::prepare(const 
doris::TPipelineFragmentParams& r
     }
     _runtime_state->set_desc_tbl(_desc_tbl);
     _runtime_state->set_num_per_fragment_instances(request.num_senders);
+    _runtime_state->set_load_stream_per_node(request.load_stream_per_node);
+    _runtime_state->set_total_load_streams(request.total_load_streams);
 
     // 2. Build pipelines with operators in this fragment.
     auto root_pipeline = add_pipeline();
diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index b9cc5891b48..6f66e7239d6 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -30,6 +30,7 @@
 #include "runtime/load_channel.h"
 #include "runtime/load_stream_mgr.h"
 #include "runtime/load_stream_writer.h"
+#include "util/runtime_profile.h"
 #include "util/thrift_util.h"
 #include "util/uid_util.h"
 
@@ -251,6 +252,8 @@ LoadStream::~LoadStream() {
 
 Status LoadStream::init(const POpenLoadStreamRequest* request) {
     _txn_id = request->txn_id();
+    _total_streams = request->total_streams();
+    DCHECK(_total_streams > 0) << "total streams should be greator than 0";
 
     _schema = std::make_shared<OlapTableSchemaParam>();
     RETURN_IF_ERROR(_schema->init(request->schema()));
@@ -265,41 +268,49 @@ Status LoadStream::init(const POpenLoadStreamRequest* 
request) {
 Status LoadStream::close(int64_t src_id, const std::vector<PTabletID>& 
tablets_to_commit,
                          std::vector<int64_t>* success_tablet_ids,
                          std::vector<int64_t>* failed_tablet_ids) {
-    std::lock_guard lock_guard(_lock);
+    std::lock_guard<bthread::Mutex> lock_guard(_lock);
     SCOPED_TIMER(_close_wait_timer);
 
     // we do nothing until recv CLOSE_LOAD from all stream to ensure all data 
are handled before ack
     _open_streams[src_id]--;
-    LOG(INFO) << "received CLOSE_LOAD from sender " << src_id << ", remaining "
-              << _open_streams[src_id] << " streams";
     if (_open_streams[src_id] == 0) {
         _open_streams.erase(src_id);
     }
+    _close_load_cnt++;
+    LOG(INFO) << "received CLOSE_LOAD from sender " << src_id << ", remaining "
+              << _total_streams - _close_load_cnt << " senders";
+
+    _tablets_to_commit.insert(_tablets_to_commit.end(), 
tablets_to_commit.begin(),
+                              tablets_to_commit.end());
+
+    if (_close_load_cnt < _total_streams) {
+        // do not return commit info if there is remaining streams.
+        return Status::OK();
+    }
 
     Status st = Status::OK();
-    if (_open_streams.size() == 0) {
+    {
         bthread::Mutex mutex;
         std::unique_lock<bthread::Mutex> lock(mutex);
         bthread::ConditionVariable cond;
-        bool ret = _load_stream_mgr->heavy_work_pool()->try_offer([this, 
&success_tablet_ids,
-                                                                   
&failed_tablet_ids,
-                                                                   
&tablets_to_commit, &mutex,
-                                                                   &cond, 
&st]() {
-            signal::set_signal_task_id(_load_id);
-            for (auto& it : _index_streams_map) {
-                st = it.second->close(tablets_to_commit, success_tablet_ids, 
failed_tablet_ids);
-                if (!st.ok()) {
+        bool ret = _load_stream_mgr->heavy_work_pool()->try_offer(
+                [this, &success_tablet_ids, &failed_tablet_ids, &mutex, &cond, 
&st]() {
+                    signal::set_signal_task_id(_load_id);
+                    for (auto& it : _index_streams_map) {
+                        st = it.second->close(_tablets_to_commit, 
success_tablet_ids,
+                                              failed_tablet_ids);
+                        if (!st.ok()) {
+                            std::unique_lock<bthread::Mutex> lock(mutex);
+                            cond.notify_one();
+                            return;
+                        }
+                    }
+                    LOG(INFO) << "close load " << *this
+                              << ", failed_tablet_num=" << 
failed_tablet_ids->size()
+                              << ", success_tablet_num=" << 
success_tablet_ids->size();
                     std::unique_lock<bthread::Mutex> lock(mutex);
                     cond.notify_one();
-                    return;
-                }
-            }
-            LOG(INFO) << "close load " << *this
-                      << ", failed_tablet_num=" << failed_tablet_ids->size()
-                      << ", success_tablet_num=" << success_tablet_ids->size();
-            std::unique_lock<bthread::Mutex> lock(mutex);
-            cond.notify_one();
-        });
+                });
         if (ret) {
             cond.wait(lock);
         } else {
@@ -307,24 +318,21 @@ Status LoadStream::close(int64_t src_id, const 
std::vector<PTabletID>& tablets_t
                     "there is not enough thread resource for close load");
         }
     }
-
-    // do not return commit info for non-last one.
     return st;
 }
 
-void LoadStream::_report_result(StreamId stream, Status& st,
-                                std::vector<int64_t>* success_tablet_ids,
-                                std::vector<int64_t>* failed_tablet_ids) {
-    LOG(INFO) << "report result, success tablet num " << 
success_tablet_ids->size()
-              << ", failed tablet num " << failed_tablet_ids->size();
+void LoadStream::_report_result(StreamId stream, const Status& st,
+                                const std::vector<int64_t>& success_tablet_ids,
+                                const std::vector<int64_t>& failed_tablet_ids) 
{
+    LOG(INFO) << "report result, success tablet num " << 
success_tablet_ids.size()
+              << ", failed tablet num " << failed_tablet_ids.size();
     butil::IOBuf buf;
     PWriteStreamSinkResponse response;
     st.to_protobuf(response.mutable_status());
-    for (auto& id : *success_tablet_ids) {
+    for (auto& id : success_tablet_ids) {
         response.add_success_tablet_ids(id);
     }
-
-    for (auto& id : *failed_tablet_ids) {
+    for (auto& id : failed_tablet_ids) {
         response.add_failed_tablet_ids(id);
     }
 
@@ -421,18 +429,20 @@ int LoadStream::on_received_messages(StreamId id, 
butil::IOBuf* const messages[]
 void LoadStream::_dispatch(StreamId id, const PStreamHeader& hdr, 
butil::IOBuf* data) {
     VLOG_DEBUG << PStreamHeader_Opcode_Name(hdr.opcode()) << " from " << 
hdr.src_id()
                << " with tablet " << hdr.tablet_id();
+    if (UniqueId(hdr.load_id()) != UniqueId(_load_id)) {
+        Status st = Status::Error<ErrorCode::INVALID_ARGUMENT>("invalid load 
id {}, expected {}",
+                                                               
UniqueId(hdr.load_id()).to_string(),
+                                                               
UniqueId(_load_id).to_string());
+        _report_failure(id, st, hdr);
+        return;
+    }
 
     {
         std::lock_guard lock_guard(_lock);
         if (!_open_streams.contains(hdr.src_id())) {
-            std::vector<int64_t> success_tablet_ids;
-            std::vector<int64_t> failed_tablet_ids;
-            if (hdr.has_tablet_id()) {
-                failed_tablet_ids.push_back(hdr.tablet_id());
-            }
             Status st = Status::Error<ErrorCode::INVALID_ARGUMENT>("no open 
stream from source {}",
                                                                    
hdr.src_id());
-            _report_result(id, st, &success_tablet_ids, &failed_tablet_ids);
+            _report_failure(id, st, hdr);
             return;
         }
     }
@@ -442,10 +452,7 @@ void LoadStream::_dispatch(StreamId id, const 
PStreamHeader& hdr, butil::IOBuf*
     case PStreamHeader::APPEND_DATA: {
         auto st = _append_data(hdr, data);
         if (!st.ok()) {
-            std::vector<int64_t> success_tablet_ids;
-            std::vector<int64_t> failed_tablet_ids;
-            failed_tablet_ids.push_back(hdr.tablet_id());
-            _report_result(id, st, &success_tablet_ids, &failed_tablet_ids);
+            _report_failure(id, st, hdr);
         }
     } break;
     case PStreamHeader::CLOSE_LOAD: {
@@ -454,7 +461,7 @@ void LoadStream::_dispatch(StreamId id, const 
PStreamHeader& hdr, butil::IOBuf*
         std::vector<PTabletID> 
tablets_to_commit(hdr.tablets_to_commit().begin(),
                                                  
hdr.tablets_to_commit().end());
         auto st = close(hdr.src_id(), tablets_to_commit, &success_tablet_ids, 
&failed_tablet_ids);
-        _report_result(id, st, &success_tablet_ids, &failed_tablet_ids);
+        _report_result(id, st, success_tablet_ids, failed_tablet_ids);
         brpc::StreamClose(id);
     } break;
     default:
@@ -468,9 +475,9 @@ void LoadStream::on_idle_timeout(StreamId id) {
 }
 
 void LoadStream::on_closed(StreamId id) {
-    auto remaining_rpc_stream = remove_rpc_stream();
-    LOG(INFO) << "stream closed " << id << ", remaining_rpc_stream=" << 
remaining_rpc_stream;
-    if (remaining_rpc_stream == 0) {
+    auto remaining_streams = _total_streams - _close_rpc_cnt.fetch_add(1) - 1;
+    LOG(INFO) << "stream " << id << " on_closed, remaining streams = " << 
remaining_streams;
+    if (remaining_streams == 0) {
         _load_stream_mgr->clear_load(_load_id);
     }
 }
diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h
index fe7d90d502e..1c16c086e29 100644
--- a/be/src/runtime/load_stream.h
+++ b/be/src/runtime/load_stream.h
@@ -113,9 +113,6 @@ public:
         _open_streams[src_id]++;
     }
 
-    uint32_t add_rpc_stream() { return ++_num_rpc_streams; }
-    uint32_t remove_rpc_stream() { return --_num_rpc_streams; }
-
     Status close(int64_t src_id, const std::vector<PTabletID>& 
tablets_to_commit,
                  std::vector<int64_t>* success_tablet_ids, 
std::vector<int64_t>* failed_tablet_ids);
 
@@ -130,16 +127,31 @@ private:
     void _parse_header(butil::IOBuf* const message, PStreamHeader& hdr);
     void _dispatch(StreamId id, const PStreamHeader& hdr, butil::IOBuf* data);
     Status _append_data(const PStreamHeader& header, butil::IOBuf* data);
-    void _report_result(StreamId stream, Status& st, std::vector<int64_t>* 
success_tablet_ids,
-                        std::vector<int64_t>* failed_tablet_ids);
+
+    void _report_result(StreamId stream, const Status& st,
+                        const std::vector<int64_t>& success_tablet_ids,
+                        const std::vector<int64_t>& failed_tablet_ids);
+
+    // report failure for one message
+    void _report_failure(StreamId stream, const Status& status, const 
PStreamHeader& header) {
+        std::vector<int64_t> success; // empty
+        std::vector<int64_t> failure;
+        if (header.has_tablet_id()) {
+            failure.push_back(header.tablet_id());
+        }
+        _report_result(stream, status, success, failure);
+    }
 
 private:
     PUniqueId _load_id;
     std::unordered_map<int64_t, IndexStreamSharedPtr> _index_streams_map;
-    std::atomic<uint32_t> _num_rpc_streams;
+    int32_t _total_streams = 0;
+    int32_t _close_load_cnt = 0;
+    std::atomic<int32_t> _close_rpc_cnt = 0;
+    std::vector<PTabletID> _tablets_to_commit;
     bthread::Mutex _lock;
     std::unordered_map<int64_t, int32_t> _open_streams;
-    int64_t _txn_id;
+    int64_t _txn_id = 0;
     std::shared_ptr<OlapTableSchemaParam> _schema;
     bool _enable_profile = false;
     std::unique_ptr<RuntimeProfile> _profile;
diff --git a/be/src/runtime/plan_fragment_executor.cpp 
b/be/src/runtime/plan_fragment_executor.cpp
index 9344378e91e..39277c1a42f 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -224,6 +224,8 @@ Status PlanFragmentExecutor::prepare(const 
TExecPlanFragmentParams& request) {
 
     _runtime_state->set_per_fragment_instance_idx(params.sender_id);
     _runtime_state->set_num_per_fragment_instances(params.num_senders);
+    _runtime_state->set_load_stream_per_node(request.load_stream_per_node);
+    _runtime_state->set_total_load_streams(request.total_load_streams);
 
     // set up sink, if required
     if (request.fragment.__isset.output_sink) {
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index cba5142c154..e990540d2f4 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -309,6 +309,18 @@ public:
 
     int num_per_fragment_instances() const { return 
_num_per_fragment_instances; }
 
+    void set_load_stream_per_node(int load_stream_per_node) {
+        _load_stream_per_node = load_stream_per_node;
+    }
+
+    int load_stream_per_node() const { return _load_stream_per_node; }
+
+    void set_total_load_streams(int total_load_streams) {
+        _total_load_streams = total_load_streams;
+    }
+
+    int total_load_streams() const { return _total_load_streams; }
+
     bool disable_stream_preaggregations() const {
         return _query_options.disable_stream_preaggregations;
     }
@@ -545,6 +557,8 @@ private:
 
     int _per_fragment_instance_idx;
     int _num_per_fragment_instances = 0;
+    int _load_stream_per_node = 0;
+    int _total_load_streams = 0;
 
     // The backend id on which this fragment instance runs
     int64_t _backend_id = -1;
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 01151dcf340..d6846c07a16 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -401,7 +401,6 @@ void 
PInternalServiceImpl::open_load_stream(google::protobuf::RpcController* con
             return;
         }
 
-        load_stream->add_rpc_stream();
         VLOG_DEBUG << "get streamid =" << streamid;
         st.to_protobuf(response->mutable_status());
     });
diff --git a/be/src/vec/sink/load_stream_stub.cpp 
b/be/src/vec/sink/load_stream_stub.cpp
index fe9887b3a3e..c2f7f246f30 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -102,11 +102,11 @@ LoadStreamStub::~LoadStreamStub() {
 }
 
 // open_load_stream
-// tablets means
 Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* 
client_cache,
                             const NodeInfo& node_info, int64_t txn_id,
                             const OlapTableSchemaParam& schema,
-                            const std::vector<PTabletID>& tablets_for_schema, 
bool enable_profile) {
+                            const std::vector<PTabletID>& tablets_for_schema, 
int total_streams,
+                            bool enable_profile) {
     _num_open++;
     std::unique_lock<bthread::Mutex> lock(_mutex);
     if (_is_init.load()) {
@@ -130,6 +130,7 @@ Status 
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
     request.set_src_id(_src_id);
     request.set_txn_id(txn_id);
     request.set_enable_profile(enable_profile);
+    request.set_total_streams(total_streams);
     schema.to_protobuf(request.mutable_schema());
     for (auto& tablet : tablets_for_schema) {
         *request.add_tablets() = tablet;
diff --git a/be/src/vec/sink/load_stream_stub.h 
b/be/src/vec/sink/load_stream_stub.h
index 2db9ffcd950..3650b2aeae2 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -148,7 +148,8 @@ public:
     // open_load_stream
     Status open(BrpcClientCache<PBackendService_Stub>* client_cache, const 
NodeInfo& node_info,
                 int64_t txn_id, const OlapTableSchemaParam& schema,
-                const std::vector<PTabletID>& tablets_for_schema, bool 
enable_profile);
+                const std::vector<PTabletID>& tablets_for_schema, int 
total_streams,
+                bool enable_profile);
 
 // for mock this class in UT
 #ifdef BE_TEST
diff --git a/be/src/vec/sink/load_stream_stub_pool.cpp 
b/be/src/vec/sink/load_stream_stub_pool.cpp
index 834c1523865..5848ca5a51a 100644
--- a/be/src/vec/sink/load_stream_stub_pool.cpp
+++ b/be/src/vec/sink/load_stream_stub_pool.cpp
@@ -28,14 +28,14 @@ LoadStreamStubPool::LoadStreamStubPool() = default;
 
 LoadStreamStubPool::~LoadStreamStubPool() = default;
 std::shared_ptr<Streams> LoadStreamStubPool::get_or_create(PUniqueId load_id, 
int64_t src_id,
-                                                           int64_t dst_id) {
+                                                           int64_t dst_id, int 
num_streams) {
     auto key = std::make_pair(UniqueId(load_id), dst_id);
     std::lock_guard<std::mutex> lock(_mutex);
     std::shared_ptr<Streams> streams = _pool[key].lock();
     if (streams) {
         return streams;
     }
-    int32_t num_streams = std::max(1, config::num_streams_per_load);
+    DCHECK(num_streams > 0) << "stream num should be greater than 0";
     auto [it, _] = _template_stubs.emplace(load_id, new LoadStreamStub 
{load_id, src_id});
     auto deleter = [this, key](Streams* s) {
         std::lock_guard<std::mutex> lock(_mutex);
diff --git a/be/src/vec/sink/load_stream_stub_pool.h 
b/be/src/vec/sink/load_stream_stub_pool.h
index ae550340d25..73b41fdd61a 100644
--- a/be/src/vec/sink/load_stream_stub_pool.h
+++ b/be/src/vec/sink/load_stream_stub_pool.h
@@ -78,7 +78,8 @@ public:
 
     ~LoadStreamStubPool();
 
-    std::shared_ptr<Streams> get_or_create(PUniqueId load_id, int64_t src_id, 
int64_t dst_id);
+    std::shared_ptr<Streams> get_or_create(PUniqueId load_id, int64_t src_id, 
int64_t dst_id,
+                                           int num_streams);
 
     size_t size() {
         std::lock_guard<std::mutex> lock(_mutex);
diff --git a/be/src/vec/sink/vtablet_sink_v2.cpp 
b/be/src/vec/sink/vtablet_sink_v2.cpp
index 69a372e0e38..6e610ee717a 100644
--- a/be/src/vec/sink/vtablet_sink_v2.cpp
+++ b/be/src/vec/sink/vtablet_sink_v2.cpp
@@ -152,6 +152,12 @@ Status VOlapTableSinkV2::prepare(RuntimeState* state) {
 
     _sender_id = state->per_fragment_instance_idx();
     _num_senders = state->num_per_fragment_instances();
+    _stream_per_node = state->load_stream_per_node();
+    _total_streams = state->total_load_streams();
+    DCHECK(_stream_per_node > 0) << "load stream per node should be greator 
than 0";
+    DCHECK(_total_streams > 0) << "total load streams should be greator than 
0";
+    LOG(INFO) << "num senders: " << _num_senders << ", stream per node: " << 
_stream_per_node
+              << ", total_streams " << _total_streams;
     _is_high_priority =
             (state->execution_timeout() <= 
config::load_task_high_priority_threshold_second);
 
@@ -218,19 +224,19 @@ Status VOlapTableSinkV2::_open_streams(int64_t src_id) {
             return Status::InternalError("Unknown node {} in tablet location", 
dst_id);
         }
         std::shared_ptr<Streams> streams;
-        streams = 
ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(_load_id, src_id,
-                                                                               
  dst_id);
+        streams = 
ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(
+                _load_id, src_id, dst_id, _stream_per_node);
         // get tablet schema from each backend only in the 1st stream
         for (auto& stream : *streams | std::ranges::views::take(1)) {
             const std::vector<PTabletID>& tablets_for_schema = 
_indexes_from_node[node_info->id];
             
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(),
                                          *node_info, _txn_id, *_schema, 
tablets_for_schema,
-                                         _state->enable_profile()));
+                                         _total_streams, 
_state->enable_profile()));
         }
         // for the rest streams, open without getting tablet schema
         for (auto& stream : *streams | std::ranges::views::drop(1)) {
             
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(),
-                                         *node_info, _txn_id, *_schema, {},
+                                         *node_info, _txn_id, *_schema, {}, 
_total_streams,
                                          _state->enable_profile()));
         }
         _streams_for_node[dst_id] = streams;
@@ -293,7 +299,7 @@ Status VOlapTableSinkV2::_select_streams(int64_t tablet_id, 
Streams& streams) {
     for (auto& node_id : location->node_ids) {
         streams.emplace_back(_streams_for_node[node_id]->at(_stream_index));
     }
-    _stream_index = (_stream_index + 1) % config::num_streams_per_load;
+    _stream_index = (_stream_index + 1) % _stream_per_node;
     return Status::OK();
 }
 
diff --git a/be/src/vec/sink/vtablet_sink_v2.h 
b/be/src/vec/sink/vtablet_sink_v2.h
index a67c4e65cd2..1a67ea581ec 100644
--- a/be/src/vec/sink/vtablet_sink_v2.h
+++ b/be/src/vec/sink/vtablet_sink_v2.h
@@ -155,6 +155,8 @@ private:
     // To support multiple senders, we maintain a channel for each sender.
     int _sender_id = -1;
     int _num_senders = -1;
+    int _stream_per_node = 0;
+    int _total_streams = 0;
     bool _is_high_priority = false;
     bool _write_file_cache = false;
 
diff --git a/be/test/runtime/load_stream_test.cpp 
b/be/test/runtime/load_stream_test.cpp
index bdd0ace9a8b..05dfa2ee1a8 100644
--- a/be/test/runtime/load_stream_test.cpp
+++ b/be/test/runtime/load_stream_test.cpp
@@ -67,6 +67,7 @@ const uint32_t ABNORMAL_SENDER_ID = 10000;
 const int64_t NORMAL_TXN_ID = 600001;
 const UniqueId NORMAL_LOAD_ID(1, 1);
 const UniqueId ABNORMAL_LOAD_ID(1, 0);
+std::string NORMAL_STRING("normal");
 std::string ABNORMAL_STRING("abnormal");
 
 void construct_schema(OlapTableSchemaParam* schema) {
@@ -374,6 +375,8 @@ public:
             }
 
             LoadStreamSharedPtr load_stream;
+            LOG(INFO) << "total streams: " << request->total_streams();
+            EXPECT_GT(request->total_streams(), 0);
             auto st = _load_stream_mgr->open_load_stream(request, load_stream);
 
             stream_options.handler = load_stream.get();
@@ -387,8 +390,6 @@ public:
                 return;
             }
 
-            load_stream->add_rpc_stream();
-
             status->set_status_code(TStatusCode::OK);
             response->set_allocated_status(status.get());
             static_cast<void>(response->release_status());
@@ -417,7 +418,7 @@ public:
             std::function<void()> _cb;
         };
 
-        Status connect_stream(int64_t sender_id = NORMAL_SENDER_ID) {
+        Status connect_stream(int64_t sender_id = NORMAL_SENDER_ID, int 
total_streams = 1) {
             brpc::Channel channel;
             std::cerr << "connect_stream" << std::endl;
             // Initialize the channel, NULL means using default options.
@@ -450,6 +451,7 @@ public:
             *request.mutable_load_id() = id;
             request.set_txn_id(NORMAL_TXN_ID);
             request.set_src_id(sender_id);
+            request.set_total_streams(total_streams);
             auto ptablet = request.add_tablets();
             ptablet->set_tablet_id(NORMAL_TABLET_ID);
             ptablet->set_index_id(NORMAL_INDEX_ID);
@@ -491,7 +493,7 @@ public:
             : _heavy_work_pool(4, 32, "load_stream_test_heavy"),
               _light_work_pool(4, 32, "load_stream_test_light") {}
 
-    void close_load(MockSinkClient& client, uint32_t sender_id) {
+    void close_load(MockSinkClient& client, uint32_t sender_id = 
NORMAL_SENDER_ID) {
         butil::IOBuf append_buf;
         PStreamHeader header;
         header.mutable_load_id()->set_hi(1);
@@ -535,6 +537,11 @@ public:
         static_cast<void>(client.send(&append_buf));
     }
 
+    void write_normal(MockSinkClient& client) {
+        write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, 
NORMAL_INDEX_ID,
+                         NORMAL_TABLET_ID, 0, NORMAL_STRING, true);
+    }
+
     void write_abnormal_load(MockSinkClient& client) {
         write_one_tablet(client, ABNORMAL_LOAD_ID, NORMAL_SENDER_ID, 
NORMAL_INDEX_ID,
                          NORMAL_TABLET_ID, 0, ABNORMAL_STRING, true);
@@ -657,25 +664,52 @@ public:
 
 // <client, index, bucket>
 // one client
-TEST_F(LoadStreamMgrTest, one_client_abnormal_load) {
+TEST_F(LoadStreamMgrTest, one_client_normal) {
     MockSinkClient client;
     auto st = client.connect_stream();
     EXPECT_TRUE(st.ok());
 
-    write_abnormal_load(client);
-    // TODO check abnormal load id
+    write_normal(client);
 
     reset_response_stat();
-    close_load(client, 1);
+    close_load(client, ABNORMAL_SENDER_ID);
     wait_for_ack(1);
     EXPECT_EQ(g_response_stat.num, 1);
     EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
+    EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
     EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 1);
 
-    close_load(client, 0);
+    close_load(client);
     wait_for_ack(2);
     EXPECT_EQ(g_response_stat.num, 2);
     EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1);
+    EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
+    EXPECT_EQ(g_response_stat.success_tablet_ids[0], NORMAL_TABLET_ID);
+
+    // server will close stream on CLOSE_LOAD
+    wait_for_close();
+    EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0);
+}
+
+TEST_F(LoadStreamMgrTest, one_client_abnormal_load) {
+    MockSinkClient client;
+    auto st = client.connect_stream();
+    EXPECT_TRUE(st.ok());
+
+    reset_response_stat();
+    write_abnormal_load(client);
+    wait_for_ack(1);
+    EXPECT_EQ(g_response_stat.num, 1);
+    EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
+    EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
+    EXPECT_EQ(g_response_stat.failed_tablet_ids[0], NORMAL_TABLET_ID);
+    EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 1);
+
+    close_load(client);
+    wait_for_ack(2);
+    EXPECT_EQ(g_response_stat.num, 2);
+    EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
+    EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
     EXPECT_EQ(g_response_stat.success_tablet_ids[0], NORMAL_TABLET_ID);
 
     // server will close stream on CLOSE_LOAD
@@ -1063,7 +1097,7 @@ TEST_F(LoadStreamMgrTest, 
two_client_one_index_one_tablet_three_segment) {
     MockSinkClient clients[2];
 
     for (int i = 0; i < 2; i++) {
-        auto st = clients[i].connect_stream(NORMAL_SENDER_ID + i);
+        auto st = clients[i].connect_stream(NORMAL_SENDER_ID + i, 2);
         EXPECT_TRUE(st.ok());
     }
     reset_response_stat();
@@ -1132,4 +1166,77 @@ TEST_F(LoadStreamMgrTest, 
two_client_one_index_one_tablet_three_segment) {
     EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0);
 }
 
+TEST_F(LoadStreamMgrTest, two_client_one_close_before_the_other_open) {
+    MockSinkClient clients[2];
+
+    EXPECT_TRUE(clients[0].connect_stream(NORMAL_SENDER_ID, 2).ok());
+
+    reset_response_stat();
+
+    std::vector<std::string> segment_data;
+    segment_data.resize(6);
+    for (int32_t segid = 2; segid >= 0; segid--) {
+        for (int i = 0; i < 2; i++) {
+            std::string data = "sender_id=" + std::to_string(i) + ",segid=" + 
std::to_string(segid);
+            segment_data[i * 3 + segid] = data;
+            LOG(INFO) << "segment_data[" << i * 3 + segid << "]" << data;
+        }
+    }
+
+    for (int32_t segid = 2; segid >= 0; segid--) {
+        int i = 0;
+        write_one_tablet(clients[i], NORMAL_LOAD_ID, NORMAL_SENDER_ID + i, 
NORMAL_INDEX_ID,
+                         NORMAL_TABLET_ID, segid, segment_data[i * 3 + segid], 
true);
+    }
+
+    EXPECT_EQ(g_response_stat.num, 0);
+    // CLOSE_LOAD
+    close_load(clients[0], 0);
+    wait_for_ack(1);
+    EXPECT_EQ(g_response_stat.num, 1);
+    EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
+    EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
+
+    // sender 0 closed, before open sender 1, load stream should still be open
+    EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 1);
+
+    EXPECT_TRUE(clients[1].connect_stream(NORMAL_SENDER_ID + 1, 2).ok());
+
+    for (int32_t segid = 2; segid >= 0; segid--) {
+        int i = 1;
+        write_one_tablet(clients[i], NORMAL_LOAD_ID, NORMAL_SENDER_ID + i, 
NORMAL_INDEX_ID,
+                         NORMAL_TABLET_ID, segid, segment_data[i * 3 + segid], 
true);
+    }
+
+    close_load(clients[1], 1);
+    wait_for_ack(2);
+    EXPECT_EQ(g_response_stat.num, 2);
+    EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1);
+    EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
+    EXPECT_EQ(g_response_stat.success_tablet_ids[0], NORMAL_TABLET_ID);
+
+    // server will close stream on CLOSE_LOAD
+    wait_for_close();
+    EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0);
+
+    auto written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, 
NORMAL_TABLET_ID, 0);
+    size_t sender_pos = written_data.find('=');
+    size_t sender_end = written_data.find(',');
+    EXPECT_NE(sender_pos, std::string::npos);
+    EXPECT_NE(sender_end, std::string::npos);
+    auto sender_str = written_data.substr(sender_pos + 1, sender_end - 
sender_pos);
+    LOG(INFO) << "sender_str " << sender_str;
+    uint32_t sender_id = std::stoi(sender_str);
+
+    for (int i = 0; i < 3; i++) {
+        auto written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, 
NORMAL_TABLET_ID, i);
+        EXPECT_EQ(written_data, segment_data[sender_id * 3 + i]);
+    }
+    sender_id = (sender_id + 1) % 2;
+    for (int i = 0; i < 3; i++) {
+        auto written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, 
NORMAL_TABLET_ID, i + 3);
+        EXPECT_EQ(written_data, segment_data[sender_id * 3 + i]);
+    }
+}
+
 } // namespace doris
diff --git a/be/test/vec/exec/load_stream_stub_pool_test.cpp 
b/be/test/vec/exec/load_stream_stub_pool_test.cpp
index f1ccb70beeb..929b906aab7 100644
--- a/be/test/vec/exec/load_stream_stub_pool_test.cpp
+++ b/be/test/vec/exec/load_stream_stub_pool_test.cpp
@@ -36,9 +36,9 @@ TEST_F(LoadStreamStubPoolTest, test) {
     PUniqueId load_id;
     load_id.set_hi(1);
     load_id.set_hi(2);
-    auto streams1 = pool.get_or_create(load_id, src_id, 101);
-    auto streams2 = pool.get_or_create(load_id, src_id, 102);
-    auto streams3 = pool.get_or_create(load_id, src_id, 101);
+    auto streams1 = pool.get_or_create(load_id, src_id, 101, 5);
+    auto streams2 = pool.get_or_create(load_id, src_id, 102, 5);
+    auto streams3 = pool.get_or_create(load_id, src_id, 101, 5);
     EXPECT_EQ(2, pool.size());
     EXPECT_EQ(1, pool.templates_size());
     EXPECT_EQ(streams1, streams3);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index 934bca7ac0c..f4131235da3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -298,6 +298,8 @@ public class StreamLoadPlanner {
         perNodeScanRange.put(scanNode.getId().asInt(), scanRangeParams);
         execParams.setPerNodeScanRanges(perNodeScanRange);
         params.setParams(execParams);
+        params.setLoadStreamPerNode(taskInfo.getStreamPerNode());
+        params.setTotalLoadStreams(taskInfo.getStreamPerNode());
         TQueryOptions queryOptions = new TQueryOptions();
         queryOptions.setQueryType(TQueryType.LOAD);
         queryOptions.setQueryTimeout(timeout);
@@ -499,6 +501,8 @@ public class StreamLoadPlanner {
         pipParams.per_exch_num_senders = Maps.newHashMap();
         pipParams.destinations = Lists.newArrayList();
         pipParams.setNumSenders(1);
+        pipParams.setLoadStreamPerNode(taskInfo.getStreamPerNode());
+        pipParams.setTotalLoadStreams(taskInfo.getStreamPerNode());
 
         TPipelineInstanceParams localParams = new TPipelineInstanceParams();
         localParams.setFragmentInstanceId(new TUniqueId(loadId.hi, loadId.lo + 
fragmentInstanceIdIndex));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 9aaca2f8e2c..56ca362e69b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -78,6 +78,7 @@ import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.task.LoadEtlTask;
 import org.apache.doris.thrift.PaloInternalServiceVersion;
 import org.apache.doris.thrift.TBrokerScanRange;
+import org.apache.doris.thrift.TDataSinkType;
 import org.apache.doris.thrift.TDescriptorTable;
 import org.apache.doris.thrift.TDetailedReportParams;
 import org.apache.doris.thrift.TErrorTabletInfo;
@@ -688,6 +689,7 @@ public class Coordinator implements CoordInterface {
             int backendIdx = 0;
             int profileFragmentId = 0;
             long memoryLimit = queryOptions.getMemLimit();
+            Set<Long> backendsWithOlapTableSink = Sets.newHashSet();
             beToExecStates.clear();
             // If #fragments >=2, use twoPhaseExecution with 
exec_plan_fragments_prepare and exec_plan_fragments_start,
             // else use exec_plan_fragments directly.
@@ -755,8 +757,23 @@ public class Coordinator implements CoordInterface {
                         beToExecStates.putIfAbsent(execState.backend.getId(), 
states);
                     }
                     states.addState(execState);
+                    if (tParam.getFragment().getOutputSink() != null
+                            && tParam.getFragment().getOutputSink().getType() 
== TDataSinkType.OLAP_TABLE_SINK) {
+                        
backendsWithOlapTableSink.add(execState.backend.getId());
+                    }
                     ++backendIdx;
                 }
+                int loadStreamPerNode = 1;
+                if (ConnectContext.get() != null && 
ConnectContext.get().getSessionVariable() != null) {
+                    loadStreamPerNode = 
ConnectContext.get().getSessionVariable().getLoadStreamPerNode();
+                }
+                for (TExecPlanFragmentParams tParam : tParams) {
+                    if (tParam.getFragment().getOutputSink() != null
+                            && tParam.getFragment().getOutputSink().getType() 
== TDataSinkType.OLAP_TABLE_SINK) {
+                        tParam.setLoadStreamPerNode(loadStreamPerNode);
+                        
tParam.setTotalLoadStreams(backendsWithOlapTableSink.size() * 
loadStreamPerNode);
+                    }
+                }
                 profileFragmentId += 1;
             } // end for fragments
 
@@ -845,6 +862,7 @@ public class Coordinator implements CoordInterface {
                     }
                 }
 
+                Set<Long> backendsWithOlapTableSink = Sets.newHashSet();
                 // 3. group PipelineExecContext by BE.
                 // So that we can use one RPC to send all fragment instances 
of a BE.
                 for (Map.Entry<TNetworkAddress, TPipelineFragmentParams> entry 
: tParams.entrySet()) {
@@ -878,8 +896,25 @@ public class Coordinator implements CoordInterface {
                     }
                     ctxs.addContext(pipelineExecContext);
 
+                    if (entry.getValue().getFragment().getOutputSink() != null
+                            && 
entry.getValue().getFragment().getOutputSink().getType()
+                            == TDataSinkType.OLAP_TABLE_SINK) {
+                        backendsWithOlapTableSink.add(backendId);
+                    }
                     ++backendIdx;
                 }
+                int loadStreamPerNode = 1;
+                if (ConnectContext.get() != null && 
ConnectContext.get().getSessionVariable() != null) {
+                    loadStreamPerNode = 
ConnectContext.get().getSessionVariable().getLoadStreamPerNode();
+                }
+                for (Map.Entry<TNetworkAddress, TPipelineFragmentParams> entry 
: tParams.entrySet()) {
+                    if (entry.getValue().getFragment().getOutputSink() != null
+                            && 
entry.getValue().getFragment().getOutputSink().getType()
+                            == TDataSinkType.OLAP_TABLE_SINK) {
+                        
entry.getValue().setLoadStreamPerNode(loadStreamPerNode);
+                        
entry.getValue().setTotalLoadStreams(backendsWithOlapTableSink.size() * 
loadStreamPerNode);
+                    }
+                }
 
                 profileFragmentId += 1;
             } // end for fragments
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 36980570039..955450a15c4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -411,6 +411,8 @@ public class SessionVariable implements Serializable, 
Writable {
     public static final String ENABLE_MEMTABLE_ON_SINK_NODE =
             "enable_memtable_on_sink_node";
 
+    public static final String LOAD_STREAM_PER_NODE = "load_stream_per_node";
+
     public static final String ENABLE_UNIQUE_KEY_PARTIAL_UPDATE = 
"enable_unique_key_partial_update";
 
     public static final String INVERTED_INDEX_CONJUNCTION_OPT_THRESHOLD = 
"inverted_index_conjunction_opt_threshold";
@@ -1236,6 +1238,9 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = ENABLE_MEMTABLE_ON_SINK_NODE, needForward = 
true)
     public boolean enableMemtableOnSinkNode = false;
 
+    @VariableMgr.VarAttr(name = LOAD_STREAM_PER_NODE)
+    public int loadStreamPerNode = 20;
+
     @VariableMgr.VarAttr(name = ENABLE_INSERT_GROUP_COMMIT)
     public boolean enableInsertGroupCommit = false;
 
@@ -2405,6 +2410,14 @@ public class SessionVariable implements Serializable, 
Writable {
         this.enableUniqueKeyPartialUpdate = enableUniqueKeyPartialUpdate;
     }
 
+    public int getLoadStreamPerNode() {
+        return loadStreamPerNode;
+    }
+
+    public void setLoadStreamPerNode(int loadStreamPerNode) {
+        this.loadStreamPerNode = loadStreamPerNode;
+    }
+
     /**
      * Serialize to thrift object.
      * Used for rest api.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
index 3174e4d5c6b..610e243cd92 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
@@ -125,6 +125,10 @@ public interface LoadTaskInfo {
         return false;
     }
 
+    default int getStreamPerNode() {
+        return 20;
+    }
+
     class ImportColumnDescs {
         public List<ImportColumnDesc> descs = Lists.newArrayList();
         public boolean isColumnDescsRewrited = false;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
index 485a3599b38..53a47d385b5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
@@ -89,6 +89,7 @@ public class StreamLoadTask implements LoadTaskInfo {
     private boolean enableProfile = false;
 
     private boolean memtableOnSinkNode = false;
+    private int streamPerNode = 20;
 
     private byte enclose = 0;
 
@@ -309,6 +310,15 @@ public class StreamLoadTask implements LoadTaskInfo {
         this.memtableOnSinkNode = memtableOnSinkNode;
     }
 
+    @Override
+    public int getStreamPerNode() {
+        return streamPerNode;
+    }
+
+    public void setStreamPerNode(int streamPerNode) {
+        this.streamPerNode = streamPerNode;
+    }
+
     public static StreamLoadTask 
fromTStreamLoadPutRequest(TStreamLoadPutRequest request) throws UserException {
         StreamLoadTask streamLoadTask = new 
StreamLoadTask(request.getLoadId(), request.getTxnId(),
                 request.getFileType(), request.getFormatType(),
@@ -447,6 +457,9 @@ public class StreamLoadTask implements LoadTaskInfo {
         if (request.isSetMemtableOnSinkNode()) {
             this.memtableOnSinkNode = request.isMemtableOnSinkNode();
         }
+        if (request.isSetStreamPerNode()) {
+            this.streamPerNode = request.getStreamPerNode();
+        }
     }
 
     // used for stream load
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 9a36916317f..f9c2603cb98 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -749,6 +749,7 @@ message POpenLoadStreamRequest {
     optional POlapTableSchemaParam schema = 4;
     repeated PTabletID tablets = 5;
     optional bool enable_profile = 6 [default = false];
+    optional int64 total_streams = 7;
 }
 
 message PTabletSchemaWithIndex {
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 1e0ad2d4624..04eaa578241 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -641,6 +641,7 @@ struct TStreamLoadPutRequest {
     52: optional i8 escape
     53: optional bool memtable_on_sink_node;
     54: optional bool group_commit
+    55: optional i32 stream_per_node;
 }
 
 struct TStreamLoadPutResult {
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index cf38f51c054..e55ab89e328 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -455,6 +455,12 @@ struct TExecPlanFragmentParams {
   24: optional map<Types.TPlanNodeId, PlanNodes.TFileScanRangeParams> 
file_scan_params
 
   25: optional i64 wal_id
+
+  // num load stream for each sink backend
+  26: optional i32 load_stream_per_node
+
+  // total num of load streams the downstream backend will see
+  27: optional i32 total_load_streams
 }
 
 struct TExecPlanFragmentParamsList {
@@ -670,6 +676,8 @@ struct TPipelineFragmentParams {
   // scan node id -> scan range params, only for external file scan
   29: optional map<Types.TPlanNodeId, PlanNodes.TFileScanRangeParams> 
file_scan_params
   30: optional bool group_commit = false;
+  31: optional i32 load_stream_per_node // num load stream for each sink 
backend
+  32: optional i32 total_load_streams // total num of load streams the 
downstream backend will see
 }
 
 struct TPipelineFragmentParamsList {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to