This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 9d556d08bfb branch-3.1: [enhancement](brpc) remove client from brpc
cache if the underlying channel has error #47487 (#53743)
9d556d08bfb is described below
commit 9d556d08bfbf70fa6af415fdd02903d6963b19ea
Author: yiguolei <[email protected]>
AuthorDate: Fri Jul 25 17:37:54 2025 +0800
branch-3.1: [enhancement](brpc) remove client from brpc cache if the
underlying channel has error #47487 (#53743)
picked from #47487
---
be/src/util/brpc_client_cache.h | 87 ++++++++++++++++++++++++++++++++-
be/test/util/brpc_client_cache_test.cpp | 47 ++++++++++++++++++
2 files changed, 132 insertions(+), 2 deletions(-)
diff --git a/be/src/util/brpc_client_cache.h b/be/src/util/brpc_client_cache.h
index 24bd284f302..58d544dd5a9 100644
--- a/be/src/util/brpc_client_cache.h
+++ b/be/src/util/brpc_client_cache.h
@@ -40,7 +40,9 @@
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
+#include "common/status.h"
#include "runtime/exec_env.h"
+#include "service/backend_options.h"
#include "util/dns_cache.h"
#include "util/network_util.h"
@@ -56,6 +58,80 @@ using StubMap = phmap::parallel_flat_hash_map<
namespace doris {
+class FailureDetectClosure : public ::google::protobuf::Closure {
+public:
+ FailureDetectClosure(std::shared_ptr<AtomicStatus>& channel_st,
+ ::google::protobuf::RpcController* controller,
+ ::google::protobuf::Closure* done)
+ : _channel_st(channel_st), _controller(controller), _done(done) {}
+
+ void Run() override {
+ Defer defer {[&]() { delete this; }};
+ // All brpc related API will use brpc::Controller, so that it is safe
+ // to do static cast here.
+ auto* cntl = static_cast<brpc::Controller*>(_controller);
+ if (cntl->Failed() && cntl->ErrorCode() == EHOSTDOWN) {
+ Status error_st = Status::NetworkError(
+ "Failed to send brpc, error={}, error_text={}, client: {},
latency = {}",
+ berror(cntl->ErrorCode()), cntl->ErrorText(),
BackendOptions::get_localhost(),
+ cntl->latency_us());
+ LOG(WARNING) << error_st;
+ _channel_st->update(error_st);
+ }
+ // Sometimes done == nullptr, for example hand_shake API.
+ if (_done != nullptr) {
+ _done->Run();
+ }
+ // _done->Run may throw exception, so that move delete this to Defer.
+ // delete this;
+ }
+
+private:
+ std::shared_ptr<AtomicStatus> _channel_st;
+ ::google::protobuf::RpcController* _controller;
+ ::google::protobuf::Closure* _done;
+};
+
+// This channel will use FailureDetectClosure to wrap the original closure
+// If some non-recoverable rpc failure happens, it will save the error status
in
+// _channel_st.
+// And brpc client cache will depend on it to detect if the client is health.
+class FailureDetectChannel : public ::brpc::Channel {
+public:
+ FailureDetectChannel() : ::brpc::Channel() {
+ _channel_st = std::make_shared<AtomicStatus>(); // default OK
+ }
+ void CallMethod(const google::protobuf::MethodDescriptor* method,
+ google::protobuf::RpcController* controller,
+ const google::protobuf::Message* request,
google::protobuf::Message* response,
+ google::protobuf::Closure* done) override {
+ FailureDetectClosure* failure_detect_closure = nullptr;
+ if (done != nullptr) {
+ // If done == nullptr, then it means the call is sync call, so
that should not
+ // gen a failure detect closure for it. Or it will core.
+ failure_detect_closure = new FailureDetectClosure(_channel_st,
controller, done);
+ }
+ ::brpc::Channel::CallMethod(method, controller, request, response,
failure_detect_closure);
+ // Done == nullptr, it is a sync call, should also deal with the bad
channel.
+ if (done == nullptr) {
+ auto* cntl = static_cast<brpc::Controller*>(controller);
+ if (cntl->Failed() && cntl->ErrorCode() == EHOSTDOWN) {
+ Status error_st = Status::NetworkError(
+ "Failed to send brpc, error={}, error_text={}, client:
{}, latency = {}",
+ berror(cntl->ErrorCode()), cntl->ErrorText(),
+ BackendOptions::get_localhost(), cntl->latency_us());
+ LOG(WARNING) << error_st;
+ _channel_st->update(error_st);
+ }
+ }
+ }
+
+ std::shared_ptr<AtomicStatus> channel_status() { return _channel_st; }
+
+private:
+ std::shared_ptr<AtomicStatus> _channel_st;
+};
+
template <class T>
class BrpcClientCache {
public:
@@ -99,7 +175,14 @@ public:
auto get_value = [&stub_ptr](const auto& v) { stub_ptr = v.second; };
if (LIKELY(_stub_map.if_contains(host_port, get_value))) {
DCHECK(stub_ptr != nullptr);
- return stub_ptr;
+ // All client created from this cache will use
FailureDetectChannel, so it is
+ // safe to do static cast here.
+ // Check if the base channel is OK, if not ignore the stub and
create new one.
+ if
(static_cast<FailureDetectChannel*>(stub_ptr->channel())->channel_status()->ok())
{
+ return stub_ptr;
+ } else {
+ _stub_map.erase(host_port);
+ }
}
// new one stub and insert into map
@@ -148,7 +231,7 @@ public:
options.timeout_ms = 2000;
options.max_retry = 10;
- std::unique_ptr<brpc::Channel> channel(new brpc::Channel());
+ std::unique_ptr<FailureDetectChannel> channel(new
FailureDetectChannel());
int ret_code = 0;
if (host_port.find("://") == std::string::npos) {
ret_code = channel->Init(host_port.c_str(), &options);
diff --git a/be/test/util/brpc_client_cache_test.cpp
b/be/test/util/brpc_client_cache_test.cpp
index 5377ce7eeb9..5ae2aa0e7f9 100644
--- a/be/test/util/brpc_client_cache_test.cpp
+++ b/be/test/util/brpc_client_cache_test.cpp
@@ -56,4 +56,51 @@ TEST_F(BrpcClientCacheTest, invalid) {
EXPECT_EQ(nullptr, stub1);
}
+TEST_F(BrpcClientCacheTest, failure) {
+ BrpcClientCache<PBackendService_Stub> cache;
+ TNetworkAddress address;
+ address.hostname = "127.0.0.1";
+ address.port = 123;
+ std::shared_ptr<PBackendService_Stub> stub1 = cache.get_client(address);
+ EXPECT_NE(nullptr, stub1);
+ std::shared_ptr<PBackendService_Stub> stub2 = cache.get_client(address);
+ EXPECT_NE(nullptr, stub2);
+ // The channel is ok, so that the stub is the same
+ EXPECT_EQ(stub1, stub2);
+
EXPECT_TRUE(static_cast<FailureDetectChannel*>(stub1->channel())->channel_status()->ok());
+
+ // update channel st to error, will get a new stub
+ static_cast<FailureDetectChannel*>(stub1->channel())
+ ->channel_status()
+ ->update(Status::NetworkError("test brpc error"));
+ std::shared_ptr<PBackendService_Stub> stub3 = cache.get_client(address);
+ EXPECT_NE(nullptr, stub3);
+
EXPECT_TRUE(static_cast<FailureDetectChannel*>(stub3->channel())->channel_status()->ok());
+ // Then will get a new brpc stub not the previous one.
+ EXPECT_NE(stub2, stub3);
+ // The previous channel is not ok.
+
EXPECT_FALSE(static_cast<FailureDetectChannel*>(stub2->channel())->channel_status()->ok());
+
+ // Call handshake method, it will trigger host is down error. It is a sync
call, not use closure.
+ cache.available(stub3, address.hostname, address.port);
+
EXPECT_FALSE(static_cast<FailureDetectChannel*>(stub3->channel())->channel_status()->ok());
+
+ std::shared_ptr<PBackendService_Stub> stub4 = cache.get_client(address);
+ EXPECT_NE(nullptr, stub4);
+
EXPECT_TRUE(static_cast<FailureDetectChannel*>(stub4->channel())->channel_status()->ok());
+
+ // Call handshake method, it will trigger host is down error. It is a
async all, will use closure.
+ std::string message = "hello doris!";
+ PHandShakeRequest request;
+ request.set_hello(message);
+ PHandShakeResponse response;
+ brpc::Controller cntl4;
+ stub4->hand_shake(&cntl4, &request, &response, brpc::DoNothing());
+ brpc::Join(cntl4.call_id());
+
EXPECT_FALSE(static_cast<FailureDetectChannel*>(stub4->channel())->channel_status()->ok());
+
+ // Check map size is 1
+ EXPECT_EQ(1, cache.size());
+}
+
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]