This is an automated email from the ASF dual-hosted git repository.
liaoxin pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 37443aa7e1e [improve](move-memtable) reuse connection in
load_stream_stub (#39231) (#39762)
37443aa7e1e is described below
commit 37443aa7e1ebb1a109f0826c80476e2d2f646ece
Author: Kaijie Chen <[email protected]>
AuthorDate: Fri Aug 23 22:46:28 2024 +0800
[improve](move-memtable) reuse connection in load_stream_stub (#39231)
(#39762)
backport #39231
---
be/src/runtime/exec_env.h | 4 ++++
be/src/runtime/exec_env_init.cpp | 6 +++++-
be/src/util/brpc_client_cache.cpp | 22 +++++++++++++++++++---
be/src/util/brpc_client_cache.h | 24 ++++++++++++++++--------
be/src/util/doris_metrics.h | 1 +
be/src/vec/sink/load_stream_stub.cpp | 3 +--
be/src/vec/sink/writer/vtablet_writer_v2.cpp | 4 ++--
7 files changed, 48 insertions(+), 16 deletions(-)
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index c404c73a07a..4b2478ccf99 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -215,6 +215,9 @@ public:
BrpcClientCache<PBackendService_Stub>* brpc_internal_client_cache() const {
return _internal_client_cache;
}
+ BrpcClientCache<PBackendService_Stub>* brpc_streaming_client_cache() const
{
+ return _streaming_client_cache;
+ }
BrpcClientCache<PFunctionService_Stub>* brpc_function_client_cache() const
{
return _function_client_cache;
}
@@ -392,6 +395,7 @@ private:
// TODO(zhiqiang): Do not use shared_ptr in exec_env, we can not control
its life cycle.
std::shared_ptr<NewLoadStreamMgr> _new_load_stream_mgr;
BrpcClientCache<PBackendService_Stub>* _internal_client_cache = nullptr;
+ BrpcClientCache<PBackendService_Stub>* _streaming_client_cache = nullptr;
BrpcClientCache<PFunctionService_Stub>* _function_client_cache = nullptr;
std::shared_ptr<StreamLoadExecutor> _stream_load_executor;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 30098c9b613..bbd6bbc9447 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -233,7 +233,10 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths,
_load_stream_mgr = std::make_unique<LoadStreamMgr>(num_flush_threads);
_new_load_stream_mgr = NewLoadStreamMgr::create_shared();
_internal_client_cache = new BrpcClientCache<PBackendService_Stub>();
- _function_client_cache = new BrpcClientCache<PFunctionService_Stub>();
+ _streaming_client_cache =
+ new BrpcClientCache<PBackendService_Stub>("baidu_std", "single",
"streaming");
+ _function_client_cache =
+ new
BrpcClientCache<PFunctionService_Stub>(config::function_service_protocol);
_stream_load_executor = StreamLoadExecutor::create_shared(this);
_routine_load_task_executor = new RoutineLoadTaskExecutor(this);
RETURN_IF_ERROR(_routine_load_task_executor->init());
@@ -631,6 +634,7 @@ void ExecEnv::destroy() {
SAFE_DELETE(_routine_load_task_executor);
// _stream_load_executor
SAFE_DELETE(_function_client_cache);
+ SAFE_DELETE(_streaming_client_cache);
SAFE_DELETE(_internal_client_cache);
SAFE_DELETE(_bfd_parser);
diff --git a/be/src/util/brpc_client_cache.cpp
b/be/src/util/brpc_client_cache.cpp
index b9135e8014d..c5a64887878 100644
--- a/be/src/util/brpc_client_cache.cpp
+++ b/be/src/util/brpc_client_cache.cpp
@@ -25,12 +25,23 @@
namespace doris {
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(brpc_endpoint_stub_count,
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(brpc_stream_endpoint_stub_count,
MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(brpc_function_endpoint_stub_count,
MetricUnit::NOUNIT);
template <>
-BrpcClientCache<PBackendService_Stub>::BrpcClientCache() {
- REGISTER_HOOK_METRIC(brpc_endpoint_stub_count, [this]() { return
_stub_map.size(); });
+BrpcClientCache<PBackendService_Stub>::BrpcClientCache(std::string protocol,
+ std::string
connection_type,
+ std::string
connection_group)
+ : _protocol(protocol),
+ _connection_type(connection_type),
+ _connection_group(connection_group) {
+ if (connection_group == "streaming") {
+ REGISTER_HOOK_METRIC(brpc_stream_endpoint_stub_count,
+ [this]() { return _stub_map.size(); });
+ } else {
+ REGISTER_HOOK_METRIC(brpc_endpoint_stub_count, [this]() { return
_stub_map.size(); });
+ }
}
template <>
@@ -39,7 +50,12 @@ BrpcClientCache<PBackendService_Stub>::~BrpcClientCache() {
}
template <>
-BrpcClientCache<PFunctionService_Stub>::BrpcClientCache() {
+BrpcClientCache<PFunctionService_Stub>::BrpcClientCache(std::string protocol,
+ std::string
connection_type,
+ std::string
connection_group)
+ : _protocol(protocol),
+ _connection_type(connection_type),
+ _connection_group(connection_group) {
REGISTER_HOOK_METRIC(brpc_function_endpoint_stub_count, [this]() { return
_stub_map.size(); });
}
diff --git a/be/src/util/brpc_client_cache.h b/be/src/util/brpc_client_cache.h
index ebef80f4a6b..09c92fb398e 100644
--- a/be/src/util/brpc_client_cache.h
+++ b/be/src/util/brpc_client_cache.h
@@ -59,7 +59,8 @@ namespace doris {
template <class T>
class BrpcClientCache {
public:
- BrpcClientCache();
+ BrpcClientCache(std::string protocol = "baidu_std", std::string
connection_type = "",
+ std::string connection_group = "");
virtual ~BrpcClientCache();
std::shared_ptr<T> get_client(const butil::EndPoint& endpoint) {
@@ -110,20 +111,24 @@ public:
}
std::shared_ptr<T> get_new_client_no_cache(const std::string& host_port,
- const std::string& protocol =
"baidu_std",
- const std::string& connect_type
= "",
+ const std::string& protocol =
"",
+ const std::string&
connection_type = "",
const std::string&
connection_group = "") {
brpc::ChannelOptions options;
- if constexpr (std::is_same_v<T, PFunctionService_Stub>) {
- options.protocol = config::function_service_protocol;
- } else {
+ if (protocol != "") {
options.protocol = protocol;
+ } else if (_protocol != "") {
+ options.protocol = _protocol;
}
- if (connect_type != "") {
- options.connection_type = connect_type;
+ if (connection_type != "") {
+ options.connection_type = connection_type;
+ } else if (_connection_type != "") {
+ options.connection_type = _connection_type;
}
if (connection_group != "") {
options.connection_group = connection_group;
+ } else if (_connection_group != "") {
+ options.connection_group = _connection_group;
}
options.connect_timeout_ms = 2000;
options.timeout_ms = 2000;
@@ -204,6 +209,9 @@ public:
private:
StubMap<T> _stub_map;
+ const std::string _protocol;
+ const std::string _connection_type;
+ const std::string _connection_group;
};
using InternalServiceClientCache = BrpcClientCache<PBackendService_Stub>;
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index 513ef91723f..567efdc9ae5 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -176,6 +176,7 @@ public:
UIntGauge* stream_load_pipe_count = nullptr;
UIntGauge* new_stream_load_pipe_count = nullptr;
UIntGauge* brpc_endpoint_stub_count = nullptr;
+ UIntGauge* brpc_stream_endpoint_stub_count = nullptr;
UIntGauge* brpc_function_endpoint_stub_count = nullptr;
UIntGauge* tablet_writer_count = nullptr;
diff --git a/be/src/vec/sink/load_stream_stub.cpp
b/be/src/vec/sink/load_stream_stub.cpp
index dc34b13e0ac..c535f03214f 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -183,8 +183,7 @@ Status
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
}
POpenLoadStreamResponse response;
// set connection_group "streaming" to distinguish with non-streaming
connections
- const auto& stub =
- client_cache->get_new_client_no_cache(host_port, "baidu_std",
"single", "streaming");
+ const auto& stub = client_cache->get_client(host_port);
stub->open_load_stream(&cntl, &request, &response, nullptr);
for (const auto& resp : response.tablet_schemas()) {
auto tablet_schema = std::make_unique<TabletSchema>();
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index fecbd324c57..6013e31609f 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -281,13 +281,13 @@ Status VTabletWriterV2::_open_streams_to_backend(int64_t
dst_id, Streams& stream
// get tablet schema from each backend only in the 1st stream
for (auto& stream : streams | std::ranges::views::take(1)) {
const std::vector<PTabletID>& tablets_for_schema =
_indexes_from_node[node_info->id];
-
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(),
*node_info,
+
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_streaming_client_cache(),
*node_info,
_txn_id, *_schema, tablets_for_schema,
_total_streams,
idle_timeout_ms,
_state->enable_profile()));
}
// for the rest streams, open without getting tablet schema
for (auto& stream : streams | std::ranges::views::drop(1)) {
-
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(),
*node_info,
+
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_streaming_client_cache(),
*node_info,
_txn_id, *_schema, {}, _total_streams,
idle_timeout_ms,
_state->enable_profile()));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]