This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 37443aa7e1e [improve](move-memtable) reuse connection in 
load_stream_stub (#39231) (#39762)
37443aa7e1e is described below

commit 37443aa7e1ebb1a109f0826c80476e2d2f646ece
Author: Kaijie Chen <[email protected]>
AuthorDate: Fri Aug 23 22:46:28 2024 +0800

    [improve](move-memtable) reuse connection in load_stream_stub (#39231) 
(#39762)
    
    backport #39231
---
 be/src/runtime/exec_env.h                    |  4 ++++
 be/src/runtime/exec_env_init.cpp             |  6 +++++-
 be/src/util/brpc_client_cache.cpp            | 22 +++++++++++++++++++---
 be/src/util/brpc_client_cache.h              | 24 ++++++++++++++++--------
 be/src/util/doris_metrics.h                  |  1 +
 be/src/vec/sink/load_stream_stub.cpp         |  3 +--
 be/src/vec/sink/writer/vtablet_writer_v2.cpp |  4 ++--
 7 files changed, 48 insertions(+), 16 deletions(-)

diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index c404c73a07a..4b2478ccf99 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -215,6 +215,9 @@ public:
     BrpcClientCache<PBackendService_Stub>* brpc_internal_client_cache() const {
         return _internal_client_cache;
     }
+    BrpcClientCache<PBackendService_Stub>* brpc_streaming_client_cache() const 
{
+        return _streaming_client_cache;
+    }
     BrpcClientCache<PFunctionService_Stub>* brpc_function_client_cache() const 
{
         return _function_client_cache;
     }
@@ -392,6 +395,7 @@ private:
     // TODO(zhiqiang): Do not use shared_ptr in exec_env, we can not control 
its life cycle.
     std::shared_ptr<NewLoadStreamMgr> _new_load_stream_mgr;
     BrpcClientCache<PBackendService_Stub>* _internal_client_cache = nullptr;
+    BrpcClientCache<PBackendService_Stub>* _streaming_client_cache = nullptr;
     BrpcClientCache<PFunctionService_Stub>* _function_client_cache = nullptr;
 
     std::shared_ptr<StreamLoadExecutor> _stream_load_executor;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 30098c9b613..bbd6bbc9447 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -233,7 +233,10 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths,
     _load_stream_mgr = std::make_unique<LoadStreamMgr>(num_flush_threads);
     _new_load_stream_mgr = NewLoadStreamMgr::create_shared();
     _internal_client_cache = new BrpcClientCache<PBackendService_Stub>();
-    _function_client_cache = new BrpcClientCache<PFunctionService_Stub>();
+    _streaming_client_cache =
+            new BrpcClientCache<PBackendService_Stub>("baidu_std", "single", 
"streaming");
+    _function_client_cache =
+            new 
BrpcClientCache<PFunctionService_Stub>(config::function_service_protocol);
     _stream_load_executor = StreamLoadExecutor::create_shared(this);
     _routine_load_task_executor = new RoutineLoadTaskExecutor(this);
     RETURN_IF_ERROR(_routine_load_task_executor->init());
@@ -631,6 +634,7 @@ void ExecEnv::destroy() {
     SAFE_DELETE(_routine_load_task_executor);
     // _stream_load_executor
     SAFE_DELETE(_function_client_cache);
+    SAFE_DELETE(_streaming_client_cache);
     SAFE_DELETE(_internal_client_cache);
 
     SAFE_DELETE(_bfd_parser);
diff --git a/be/src/util/brpc_client_cache.cpp 
b/be/src/util/brpc_client_cache.cpp
index b9135e8014d..c5a64887878 100644
--- a/be/src/util/brpc_client_cache.cpp
+++ b/be/src/util/brpc_client_cache.cpp
@@ -25,12 +25,23 @@
 
 namespace doris {
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(brpc_endpoint_stub_count, 
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(brpc_stream_endpoint_stub_count, 
MetricUnit::NOUNIT);
 
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(brpc_function_endpoint_stub_count, 
MetricUnit::NOUNIT);
 
 template <>
-BrpcClientCache<PBackendService_Stub>::BrpcClientCache() {
-    REGISTER_HOOK_METRIC(brpc_endpoint_stub_count, [this]() { return 
_stub_map.size(); });
+BrpcClientCache<PBackendService_Stub>::BrpcClientCache(std::string protocol,
+                                                       std::string 
connection_type,
+                                                       std::string 
connection_group)
+        : _protocol(protocol),
+          _connection_type(connection_type),
+          _connection_group(connection_group) {
+    if (connection_group == "streaming") {
+        REGISTER_HOOK_METRIC(brpc_stream_endpoint_stub_count,
+                             [this]() { return _stub_map.size(); });
+    } else {
+        REGISTER_HOOK_METRIC(brpc_endpoint_stub_count, [this]() { return 
_stub_map.size(); });
+    }
 }
 
 template <>
@@ -39,7 +50,12 @@ BrpcClientCache<PBackendService_Stub>::~BrpcClientCache() {
 }
 
 template <>
-BrpcClientCache<PFunctionService_Stub>::BrpcClientCache() {
+BrpcClientCache<PFunctionService_Stub>::BrpcClientCache(std::string protocol,
+                                                        std::string 
connection_type,
+                                                        std::string 
connection_group)
+        : _protocol(protocol),
+          _connection_type(connection_type),
+          _connection_group(connection_group) {
     REGISTER_HOOK_METRIC(brpc_function_endpoint_stub_count, [this]() { return 
_stub_map.size(); });
 }
 
diff --git a/be/src/util/brpc_client_cache.h b/be/src/util/brpc_client_cache.h
index ebef80f4a6b..09c92fb398e 100644
--- a/be/src/util/brpc_client_cache.h
+++ b/be/src/util/brpc_client_cache.h
@@ -59,7 +59,8 @@ namespace doris {
 template <class T>
 class BrpcClientCache {
 public:
-    BrpcClientCache();
+    BrpcClientCache(std::string protocol = "baidu_std", std::string 
connection_type = "",
+                    std::string connection_group = "");
     virtual ~BrpcClientCache();
 
     std::shared_ptr<T> get_client(const butil::EndPoint& endpoint) {
@@ -110,20 +111,24 @@ public:
     }
 
     std::shared_ptr<T> get_new_client_no_cache(const std::string& host_port,
-                                               const std::string& protocol = 
"baidu_std",
-                                               const std::string& connect_type 
= "",
+                                               const std::string& protocol = 
"",
+                                               const std::string& 
connection_type = "",
                                                const std::string& 
connection_group = "") {
         brpc::ChannelOptions options;
-        if constexpr (std::is_same_v<T, PFunctionService_Stub>) {
-            options.protocol = config::function_service_protocol;
-        } else {
+        if (protocol != "") {
             options.protocol = protocol;
+        } else if (_protocol != "") {
+            options.protocol = _protocol;
         }
-        if (connect_type != "") {
-            options.connection_type = connect_type;
+        if (connection_type != "") {
+            options.connection_type = connection_type;
+        } else if (_connection_type != "") {
+            options.connection_type = _connection_type;
         }
         if (connection_group != "") {
             options.connection_group = connection_group;
+        } else if (_connection_group != "") {
+            options.connection_group = _connection_group;
         }
         options.connect_timeout_ms = 2000;
         options.timeout_ms = 2000;
@@ -204,6 +209,9 @@ public:
 
 private:
     StubMap<T> _stub_map;
+    const std::string _protocol;
+    const std::string _connection_type;
+    const std::string _connection_group;
 };
 
 using InternalServiceClientCache = BrpcClientCache<PBackendService_Stub>;
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index 513ef91723f..567efdc9ae5 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -176,6 +176,7 @@ public:
     UIntGauge* stream_load_pipe_count = nullptr;
     UIntGauge* new_stream_load_pipe_count = nullptr;
     UIntGauge* brpc_endpoint_stub_count = nullptr;
+    UIntGauge* brpc_stream_endpoint_stub_count = nullptr;
     UIntGauge* brpc_function_endpoint_stub_count = nullptr;
     UIntGauge* tablet_writer_count = nullptr;
 
diff --git a/be/src/vec/sink/load_stream_stub.cpp 
b/be/src/vec/sink/load_stream_stub.cpp
index dc34b13e0ac..c535f03214f 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -183,8 +183,7 @@ Status 
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
     }
     POpenLoadStreamResponse response;
     // set connection_group "streaming" to distinguish with non-streaming 
connections
-    const auto& stub =
-            client_cache->get_new_client_no_cache(host_port, "baidu_std", 
"single", "streaming");
+    const auto& stub = client_cache->get_client(host_port);
     stub->open_load_stream(&cntl, &request, &response, nullptr);
     for (const auto& resp : response.tablet_schemas()) {
         auto tablet_schema = std::make_unique<TabletSchema>();
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index fecbd324c57..6013e31609f 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -281,13 +281,13 @@ Status VTabletWriterV2::_open_streams_to_backend(int64_t 
dst_id, Streams& stream
     // get tablet schema from each backend only in the 1st stream
     for (auto& stream : streams | std::ranges::views::take(1)) {
         const std::vector<PTabletID>& tablets_for_schema = 
_indexes_from_node[node_info->id];
-        
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(), 
*node_info,
+        
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_streaming_client_cache(), 
*node_info,
                                      _txn_id, *_schema, tablets_for_schema, 
_total_streams,
                                      idle_timeout_ms, 
_state->enable_profile()));
     }
     // for the rest streams, open without getting tablet schema
     for (auto& stream : streams | std::ranges::views::drop(1)) {
-        
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(), 
*node_info,
+        
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_streaming_client_cache(), 
*node_info,
                                      _txn_id, *_schema, {}, _total_streams, 
idle_timeout_ms,
                                      _state->enable_profile()));
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to