This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 8d87015cab9 branch-3.1: [fix](mtls)brpc SetFailed handle error_code=0
pick brpc #2560 #58394 (#58518)
8d87015cab9 is described below
commit 8d87015cab994b9041c39bba52f3d9cb0305d87c
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Dec 2 11:05:52 2025 +0800
branch-3.1: [fix](mtls)brpc SetFailed handle error_code=0 pick brpc #2560
#58394 (#58518)
Cherry-picked from #58394
Co-authored-by: koarz <[email protected]>
---
...SetFailed-to-handle-error_code-0-properly.patch | 303 +++++++++++++++++++++
1 file changed, 303 insertions(+)
diff --git
a/thirdparty/patches/brpc-2560-Refactor-Socket-SetFailed-to-handle-error_code-0-properly.patch
b/thirdparty/patches/brpc-2560-Refactor-Socket-SetFailed-to-handle-error_code-0-properly.patch
new file mode 100644
index 00000000000..79527fe61e0
--- /dev/null
+++
b/thirdparty/patches/brpc-2560-Refactor-Socket-SetFailed-to-handle-error_code-0-properly.patch
@@ -0,0 +1,303 @@
+From 9bd2d3112a30d2059b7f2713c626b6c412b6e2fc Mon Sep 17 00:00:00 2001
+From: koarz <[email protected]>
+Date: Thu, 13 Nov 2025 16:34:53 +0800
+Subject: [PATCH] Refactor Socket::SetFailed to handle error_code=0 properly
+
+---
+ src/brpc/socket.cpp | 117 ++++++++++++++++++------------------
+ src/brpc/socket.h | 17 ++++++
+ src/butil/string_printf.cpp | 49 ++++++++++-----
+ src/butil/string_printf.h | 4 ++
+ 4 files changed, 112 insertions(+), 75 deletions(-)
+
+diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp
+index ae3bb76..da42442 100644
+--- a/src/brpc/socket.cpp
++++ b/src/brpc/socket.cpp
+@@ -862,82 +862,81 @@ int Socket::isolated_times() const {
+ }
+
+ int Socket::SetFailed(int error_code, const char* error_fmt, ...) {
+- if (error_code == 0) {
+- CHECK(false) << "error_code is 0";
+- error_code = EFAILEDSOCKET;
+- }
+- const uint32_t id_ver = VersionOfSocketId(_this_id);
++ std::string error_text;
++ if (error_fmt != NULL) {
++ va_list ap;
++ va_start(ap, error_fmt);
++ butil::string_vprintf(&error_text, error_fmt, ap);
++ va_end(ap);
++ }
++ const uint32_t id_ver = VersionOfVRefId(_this_id);
+ uint64_t vref = _versioned_ref.load(butil::memory_order_relaxed);
+- for (;;) { // need iteration to retry compare_exchange_strong
++ for (;;) {
+ if (VersionOfVRef(vref) != id_ver) {
+ return -1;
+ }
+- // Try to set version=id_ver+1 (to make later Address() return NULL),
++ // Try to set version=id_ver+1 (to make later address() return NULL),
+ // retry on fail.
+ if (_versioned_ref.compare_exchange_strong(
+ vref, MakeVRef(id_ver + 1, NRefOfVRef(vref)),
+ butil::memory_order_release,
+ butil::memory_order_relaxed)) {
+- // Update _error_text
+- std::string error_text;
+- if (error_fmt != NULL) {
+- va_list ap;
+- va_start(ap, error_fmt);
+- butil::string_vprintf(&error_text, error_fmt, ap);
+- va_end(ap);
+- }
+- pthread_mutex_lock(&_id_wait_list_mutex);
+- _error_code = error_code;
+- _error_text = error_text;
+- pthread_mutex_unlock(&_id_wait_list_mutex);
+-
+- // Do health-checking even if we're not connected before, needed
+- // by Channel to revive never-connected socket when server side
+- // comes online.
+- if (_health_check_interval_s > 0) {
+- bool expect = false;
+- if (_hc_started.compare_exchange_strong(expect,
+- true,
+-
butil::memory_order_relaxed,
+-
butil::memory_order_relaxed)) {
+- GetOrNewSharedPart()->circuit_breaker.MarkAsBroken();
+- StartHealthCheck(id(),
+-
GetOrNewSharedPart()->circuit_breaker.isolation_duration_ms());
+- } else {
+- // No need to run 2 health checking at the same time.
+- RPC_VLOG << "There is already a health checking running "
+- "for SocketId=" << _this_id;
+- }
+- }
+- // Wake up all threads waiting on EPOLLOUT when closing fd
+- _epollout_butex->fetch_add(1, butil::memory_order_relaxed);
+- bthread::butex_wake_all(_epollout_butex);
+-
+- // Wake up all unresponded RPC.
+- CHECK_EQ(0, bthread_id_list_reset2_pthreadsafe(
+- &_id_wait_list, error_code, error_text,
+- &_id_wait_list_mutex));
+-
+- ResetAllStreams();
+- // _app_connect shouldn't be set to NULL in SetFailed otherwise
+- // HC is always not supported.
+- // FIXME: Design a better interface for AppConnect
+- // if (_app_connect) {
+- // AppConnect* const saved_app_connect = _app_connect;
+- // _app_connect = NULL;
+- // saved_app_connect->StopConnect(this);
+- // }
+-
++ // Call OnFailed to handle the failure of Socket.
++ OnFailed(error_code, error_text);
+ // Deref additionally which is added at creation so that this
+- // Socket's reference will hit 0(recycle) when no one addresses
it.
++ // queue's reference will hit 0(recycle) when no one addresses it.
+ ReleaseAdditionalReference();
+- // NOTE: This Socket may be recycled at this point, don't
++ // NOTE: This object may be recycled at this point, don't
+ // touch anything.
+ return 0;
+ }
+ }
+ }
+
++void Socket::OnFailed(int error_code, const std::string& error_text) {
++ // Update _error_text
++ pthread_mutex_lock(&_id_wait_list_mutex);
++ _error_code = error_code;
++ _error_text = error_text;
++ pthread_mutex_unlock(&_id_wait_list_mutex);
++
++ // Do health-checking even if we're not connected before, needed
++ // by Channel to revive never-connected socket when server side
++ // comes online.
++ if (HCEnabled()) {
++ bool expect = false;
++ if (_hc_started.compare_exchange_strong(expect,
++ true,
++ butil::memory_order_relaxed,
++ butil::memory_order_relaxed)) {
++ GetOrNewSharedPart()->circuit_breaker.MarkAsBroken();
++ StartHealthCheck(id(),
++
GetOrNewSharedPart()->circuit_breaker.isolation_duration_ms());
++ } else {
++ // No need to run 2 health checking at the same time.
++ RPC_VLOG << "There is already a health checking running "
++ "for SocketId=" << id();
++ }
++ }
++ // Wake up all threads waiting on EPOLLOUT when closing fd
++ _epollout_butex->fetch_add(1, butil::memory_order_relaxed);
++ bthread::butex_wake_all(_epollout_butex);
++
++ // Wake up all unresponded RPC.
++ CHECK_EQ(0, bthread_id_list_reset2_pthreadsafe(
++ &_id_wait_list, error_code, error_text,
++ &_id_wait_list_mutex));
++ ResetAllStreams();
++ // _app_connect shouldn't be set to NULL in SetFailed otherwise
++ // HC is always not supported.
++ // FIXME: Design a better interface for AppConnect
++ // if (_app_connect) {
++ // AppConnect* const saved_app_connect = _app_connect;
++ // _app_connect = NULL;
++ // saved_app_connect->StopConnect(this);
++ // }
++}
++
+ int Socket::SetFailed() {
+ return SetFailed(EFAILEDSOCKET, NULL);
+ }
+diff --git a/src/brpc/socket.h b/src/brpc/socket.h
+index 58ccd2f..c535669 100644
+--- a/src/brpc/socket.h
++++ b/src/brpc/socket.h
+@@ -202,6 +202,12 @@ struct SocketOptions {
+ Destroyable* initial_parsing_context;
+ };
+
++typedef uint64_t VRefId;
++
++BUTIL_FORCE_INLINE uint32_t VersionOfVRefId(VRefId vref_id) {
++ return (uint32_t)(vref_id >> 32);
++}
++
+ // Abstractions on reading from and writing into file descriptors.
+ // NOTE: accessed by multiple threads(frequently), align it by cacheline.
+ class BAIDU_CACHELINE_ALIGNMENT/*note*/ Socket {
+@@ -305,6 +311,14 @@ public:
+ // Initialized by SocketOptions.health_check_interval_s.
+ int health_check_interval() const { return _health_check_interval_s; }
+
++ // True if health checking is enabled.
++ bool HCEnabled() const {
++ // This fence makes sure that we see change of
++ // `_is_hc_related_ref_held' before changing `_versioned_ref.
++ butil::atomic_thread_fence(butil::memory_order_acquire);
++ return _health_check_interval_s > 0 && _is_hc_related_ref_held;
++ }
++
+ // When someone holds a health-checking-related reference,
+ // this function need to be called to make health checking run normally.
+ void SetHCRelatedRefHeld() { _is_hc_related_ref_held = true; }
+@@ -559,6 +573,9 @@ public:
+
+ bthread_keytable_pool_t* keytable_pool() const { return _keytable_pool; }
+
++ // Called by SetFailed to handle failure logic
++ void OnFailed(int error_code, const std::string& error_text);
++
+ private:
+ DISALLOW_COPY_AND_ASSIGN(Socket);
+
+diff --git a/src/butil/string_printf.cpp b/src/butil/string_printf.cpp
+index 981420e..cebaf13 100644
+--- a/src/butil/string_printf.cpp
++++ b/src/butil/string_printf.cpp
+@@ -67,24 +67,49 @@ inline int string_printf_impl(std::string& output, const
char* format,
+ }
+ return 0;
+ }
++
++inline int string_printf_impl(std::string& output, size_t hint,
++ const char* format, va_list args) {
++ if (hint > output.capacity()) {
++ output.reserve(hint);
++ }
++ return string_printf_impl(output, format, args);
++}
+ } // end anonymous namespace
+
+ std::string string_printf(const char* format, ...) {
+ // snprintf will tell us how large the output buffer should be, but
+- // we then have to call it a second time, which is costly. By
++ // we then have to call it a second time, which is costly. By
+ // guestimating the final size, we avoid the double snprintf in many
+- // cases, resulting in a performance win. We use this constructor
++ // cases, resulting in a performance win. We use this constructor
+ // of std::string to avoid a double allocation, though it does pad
+- // the resulting string with nul bytes. Our guestimation is twice
++ // the resulting string with nul bytes. Our guestimation is twice
+ // the format string size, or 32 bytes, whichever is larger. This
+- // is a hueristic that doesn't affect correctness but attempts to be
++ // is a heuristic that doesn't affect correctness but attempts to be
+ // reasonably fast for the most common cases.
+ std::string ret;
+- ret.reserve(std::max(32UL, strlen(format) * 2));
++ va_list ap;
++ va_start(ap, format);
++ if (string_printf_impl(ret, std::max(32UL, strlen(format) * 2),
++ format, ap) != 0) {
++ ret.clear();
++ }
++ va_end(ap);
++ return ret;
++}
+
++std::string string_printf(size_t hint_size, const char* format, ...) {
++ // snprintf will tell us how large the output buffer should be, but
++ // we then have to call it a second time, which is costly. By
++ // passing the hint size of formatted string, we avoid the double
++ // snprintf in many cases, resulting in a performance win. We use
++ // this constructor of std::string to avoid a double allocation,
++ // though it does pad the resulting string with nul bytes.
++ std::string ret;
+ va_list ap;
+ va_start(ap, format);
+- if (string_printf_impl(ret, format, ap) != 0) {
++ if (string_printf_impl(ret, std::max(hint_size, strlen(format) * 2),
++ format, ap) != 0) {
+ ret.clear();
+ }
+ va_end(ap);
+@@ -96,11 +121,7 @@ std::string string_printf(const char* format, ...) {
+ int string_appendf(std::string* output, const char* format, ...) {
+ va_list ap;
+ va_start(ap, format);
+- const size_t old_size = output->size();
+- const int rc = string_printf_impl(*output, format, ap);
+- if (rc != 0) {
+- output->resize(old_size);
+- }
++ const int rc = string_vappendf(output, format, ap);
+ va_end(ap);
+ return rc;
+ }
+@@ -117,11 +138,7 @@ int string_vappendf(std::string* output, const char*
format, va_list args) {
+ int string_printf(std::string* output, const char* format, ...) {
+ va_list ap;
+ va_start(ap, format);
+- output->clear();
+- const int rc = string_printf_impl(*output, format, ap);
+- if (rc != 0) {
+- output->clear();
+- }
++ const int rc = string_vprintf(output, format, ap);
+ va_end(ap);
+ return rc;
+ };
+diff --git a/src/butil/string_printf.h b/src/butil/string_printf.h
+index 64eae84..7f9e3dd 100644
+--- a/src/butil/string_printf.h
++++ b/src/butil/string_printf.h
+@@ -27,6 +27,10 @@ namespace butil {
+ std::string string_printf(const char* format, ...)
+ __attribute__ ((format (printf, 1, 2)));
+
++// Hint size of formatted string.
++std::string string_printf(size_t hint_size, const char* format, ...)
++ __attribute__ ((format (printf, 2, 3)));
++
+ // Write |format| and associated arguments into |output|
+ // Returns 0 on success, -1 otherwise.
+ int string_printf(std::string* output, const char* fmt, ...)
+--
+2.43.5
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]