This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 8d87015cab9 branch-3.1: [fix](mtls)brpc SetFailed handle error_code=0 
pick brpc #2560 #58394 (#58518)
8d87015cab9 is described below

commit 8d87015cab994b9041c39bba52f3d9cb0305d87c
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Dec 2 11:05:52 2025 +0800

    branch-3.1: [fix](mtls)brpc SetFailed handle error_code=0 pick brpc #2560 
#58394 (#58518)
    
    Cherry-picked from #58394
    
    Co-authored-by: koarz <[email protected]>
---
 ...SetFailed-to-handle-error_code-0-properly.patch | 303 +++++++++++++++++++++
 1 file changed, 303 insertions(+)

diff --git 
a/thirdparty/patches/brpc-2560-Refactor-Socket-SetFailed-to-handle-error_code-0-properly.patch
 
b/thirdparty/patches/brpc-2560-Refactor-Socket-SetFailed-to-handle-error_code-0-properly.patch
new file mode 100644
index 00000000000..79527fe61e0
--- /dev/null
+++ 
b/thirdparty/patches/brpc-2560-Refactor-Socket-SetFailed-to-handle-error_code-0-properly.patch
@@ -0,0 +1,303 @@
+From 9bd2d3112a30d2059b7f2713c626b6c412b6e2fc Mon Sep 17 00:00:00 2001
+From: koarz <[email protected]>
+Date: Thu, 13 Nov 2025 16:34:53 +0800
+Subject: [PATCH] Refactor Socket::SetFailed to handle error_code=0 properly
+
+---
+ src/brpc/socket.cpp         | 117 ++++++++++++++++++------------------
+ src/brpc/socket.h           |  17 ++++++
+ src/butil/string_printf.cpp |  49 ++++++++++-----
+ src/butil/string_printf.h   |   4 ++
+ 4 files changed, 112 insertions(+), 75 deletions(-)
+
+diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp
+index ae3bb76..da42442 100644
+--- a/src/brpc/socket.cpp
++++ b/src/brpc/socket.cpp
+@@ -862,82 +862,81 @@ int Socket::isolated_times() const {
+ }
+ 
+ int Socket::SetFailed(int error_code, const char* error_fmt, ...) {
+-    if (error_code == 0) {
+-        CHECK(false) << "error_code is 0";
+-        error_code = EFAILEDSOCKET;
+-    }
+-    const uint32_t id_ver = VersionOfSocketId(_this_id);
++    std::string error_text;
++    if (error_fmt != NULL) {
++        va_list ap;
++        va_start(ap, error_fmt);
++        butil::string_vprintf(&error_text, error_fmt, ap);
++        va_end(ap);
++    }
++    const uint32_t id_ver = VersionOfVRefId(_this_id);
+     uint64_t vref = _versioned_ref.load(butil::memory_order_relaxed);
+-    for (;;) {  // need iteration to retry compare_exchange_strong
++    for (;;) {
+         if (VersionOfVRef(vref) != id_ver) {
+             return -1;
+         }
+-        // Try to set version=id_ver+1 (to make later Address() return NULL),
++        // Try to set version=id_ver+1 (to make later address() return NULL),
+         // retry on fail.
+         if (_versioned_ref.compare_exchange_strong(
+                 vref, MakeVRef(id_ver + 1, NRefOfVRef(vref)),
+                 butil::memory_order_release,
+                 butil::memory_order_relaxed)) {
+-            // Update _error_text
+-            std::string error_text;
+-            if (error_fmt != NULL) {
+-                va_list ap;
+-                va_start(ap, error_fmt);
+-                butil::string_vprintf(&error_text, error_fmt, ap);
+-                va_end(ap);
+-            }
+-            pthread_mutex_lock(&_id_wait_list_mutex);
+-            _error_code = error_code;
+-            _error_text = error_text;
+-            pthread_mutex_unlock(&_id_wait_list_mutex);
+-            
+-            // Do health-checking even if we're not connected before, needed
+-            // by Channel to revive never-connected socket when server side
+-            // comes online.
+-            if (_health_check_interval_s > 0) {
+-                bool expect = false;
+-                if (_hc_started.compare_exchange_strong(expect,
+-                                                        true,
+-                                                        
butil::memory_order_relaxed,
+-                                                        
butil::memory_order_relaxed)) {
+-                    GetOrNewSharedPart()->circuit_breaker.MarkAsBroken();
+-                    StartHealthCheck(id(),
+-                        
GetOrNewSharedPart()->circuit_breaker.isolation_duration_ms());
+-                } else {
+-                    // No need to run 2 health checking at the same time.
+-                    RPC_VLOG << "There is already a health checking running "
+-                                "for SocketId=" << _this_id;
+-                }
+-            }
+-            // Wake up all threads waiting on EPOLLOUT when closing fd
+-            _epollout_butex->fetch_add(1, butil::memory_order_relaxed);
+-            bthread::butex_wake_all(_epollout_butex);
+-
+-            // Wake up all unresponded RPC.
+-            CHECK_EQ(0, bthread_id_list_reset2_pthreadsafe(
+-                         &_id_wait_list, error_code, error_text,
+-                         &_id_wait_list_mutex));
+-
+-            ResetAllStreams();
+-            // _app_connect shouldn't be set to NULL in SetFailed otherwise
+-            // HC is always not supported.
+-            // FIXME: Design a better interface for AppConnect
+-            // if (_app_connect) {
+-            //     AppConnect* const saved_app_connect = _app_connect;
+-            //     _app_connect = NULL;
+-            //     saved_app_connect->StopConnect(this);
+-            // }
+-
++            // Call OnFailed to handle the failure of Socket.
++            OnFailed(error_code, error_text);
+             // Deref additionally which is added at creation so that this
+-            // Socket's reference will hit 0(recycle) when no one addresses 
it.
++            // queue's reference will hit 0(recycle) when no one addresses it.
+             ReleaseAdditionalReference();
+-            // NOTE: This Socket may be recycled at this point, don't
++            // NOTE: This object may be recycled at this point, don't
+             // touch anything.
+             return 0;
+         }
+     }
+ }
+ 
++void Socket::OnFailed(int error_code, const std::string& error_text) {
++    // Update _error_text
++    pthread_mutex_lock(&_id_wait_list_mutex);
++    _error_code = error_code;
++    _error_text = error_text;
++    pthread_mutex_unlock(&_id_wait_list_mutex);
++
++    // Do health-checking even if we're not connected before, needed
++    // by Channel to revive never-connected socket when server side
++    // comes online.
++    if (HCEnabled()) {
++        bool expect = false;
++        if (_hc_started.compare_exchange_strong(expect,
++            true,
++            butil::memory_order_relaxed,
++            butil::memory_order_relaxed)) {
++            GetOrNewSharedPart()->circuit_breaker.MarkAsBroken();
++            StartHealthCheck(id(),
++                
GetOrNewSharedPart()->circuit_breaker.isolation_duration_ms());
++        } else {
++            // No need to run 2 health checking at the same time.
++            RPC_VLOG << "There is already a health checking running "
++                        "for SocketId=" << id();
++        }
++    }
++    // Wake up all threads waiting on EPOLLOUT when closing fd
++    _epollout_butex->fetch_add(1, butil::memory_order_relaxed);
++    bthread::butex_wake_all(_epollout_butex);
++
++    // Wake up all unresponded RPC.
++    CHECK_EQ(0, bthread_id_list_reset2_pthreadsafe(
++        &_id_wait_list, error_code, error_text,
++        &_id_wait_list_mutex));
++    ResetAllStreams();
++    // _app_connect shouldn't be set to NULL in SetFailed otherwise
++    // HC is always not supported.
++    // FIXME: Design a better interface for AppConnect
++    // if (_app_connect) {
++    //     AppConnect* const saved_app_connect = _app_connect;
++    //     _app_connect = NULL;
++    //     saved_app_connect->StopConnect(this);
++    // }
++}
++
+ int Socket::SetFailed() {
+     return SetFailed(EFAILEDSOCKET, NULL);
+ }
+diff --git a/src/brpc/socket.h b/src/brpc/socket.h
+index 58ccd2f..c535669 100644
+--- a/src/brpc/socket.h
++++ b/src/brpc/socket.h
+@@ -202,6 +202,12 @@ struct SocketOptions {
+     Destroyable* initial_parsing_context;
+ };
+ 
++typedef uint64_t VRefId;
++
++BUTIL_FORCE_INLINE uint32_t VersionOfVRefId(VRefId vref_id) {
++    return (uint32_t)(vref_id >> 32);
++}
++
+ // Abstractions on reading from and writing into file descriptors.
+ // NOTE: accessed by multiple threads(frequently), align it by cacheline.
+ class BAIDU_CACHELINE_ALIGNMENT/*note*/ Socket {
+@@ -305,6 +311,14 @@ public:
+     // Initialized by SocketOptions.health_check_interval_s.
+     int health_check_interval() const { return _health_check_interval_s; }
+ 
++    // True if health checking is enabled.
++    bool HCEnabled() const {
++        // This fence makes sure that we see change of
++        // `_is_hc_related_ref_held' before changing `_versioned_ref.
++        butil::atomic_thread_fence(butil::memory_order_acquire);
++        return _health_check_interval_s > 0 && _is_hc_related_ref_held;
++    }
++
+     // When someone holds a health-checking-related reference,
+     // this function need to be called to make health checking run normally.
+     void SetHCRelatedRefHeld() { _is_hc_related_ref_held = true; }
+@@ -559,6 +573,9 @@ public:
+ 
+     bthread_keytable_pool_t* keytable_pool() const { return _keytable_pool; }
+ 
++    // Called by SetFailed to handle failure logic
++    void OnFailed(int error_code, const std::string& error_text);
++
+ private:
+     DISALLOW_COPY_AND_ASSIGN(Socket);
+ 
+diff --git a/src/butil/string_printf.cpp b/src/butil/string_printf.cpp
+index 981420e..cebaf13 100644
+--- a/src/butil/string_printf.cpp
++++ b/src/butil/string_printf.cpp
+@@ -67,24 +67,49 @@ inline int string_printf_impl(std::string& output, const 
char* format,
+     }
+     return 0;
+ }
++
++inline int string_printf_impl(std::string& output,  size_t hint,
++                              const char* format, va_list args) {
++    if (hint > output.capacity()) {
++        output.reserve(hint);
++    }
++    return string_printf_impl(output,  format, args);
++}
+ }  // end anonymous namespace
+ 
+ std::string string_printf(const char* format, ...) {
+     // snprintf will tell us how large the output buffer should be, but
+-    // we then have to call it a second time, which is costly.  By
++    // we then have to call it a second time, which is costly. By
+     // guestimating the final size, we avoid the double snprintf in many
+-    // cases, resulting in a performance win.  We use this constructor
++    // cases, resulting in a performance win. We use this constructor
+     // of std::string to avoid a double allocation, though it does pad
+-    // the resulting string with nul bytes.  Our guestimation is twice
++    // the resulting string with nul bytes. Our guestimation is twice
+     // the format string size, or 32 bytes, whichever is larger.  This
+-    // is a hueristic that doesn't affect correctness but attempts to be
++    // is a heuristic that doesn't affect correctness but attempts to be
+     // reasonably fast for the most common cases.
+     std::string ret;
+-    ret.reserve(std::max(32UL, strlen(format) * 2));
++    va_list ap;
++    va_start(ap, format);
++    if (string_printf_impl(ret, std::max(32UL, strlen(format) * 2),
++                           format, ap) != 0) {
++        ret.clear();
++    }
++    va_end(ap);
++    return ret;
++}
+ 
++std::string string_printf(size_t hint_size, const char* format, ...) {
++    // snprintf will tell us how large the output buffer should be, but
++    // we then have to call it a second time, which is costly. By
++    // passing the hint size of formatted string, we avoid the double
++    // snprintf in many cases, resulting in a performance win. We use
++    // this constructor  of std::string to avoid a double allocation,
++    // though it does pad the resulting string with nul bytes.
++    std::string ret;
+     va_list ap;
+     va_start(ap, format);
+-    if (string_printf_impl(ret, format, ap) != 0) {
++    if (string_printf_impl(ret, std::max(hint_size, strlen(format) * 2),
++                           format, ap) != 0) {
+         ret.clear();
+     }
+     va_end(ap);
+@@ -96,11 +121,7 @@ std::string string_printf(const char* format, ...) {
+ int string_appendf(std::string* output, const char* format, ...) {
+     va_list ap;
+     va_start(ap, format);
+-    const size_t old_size = output->size();
+-    const int rc = string_printf_impl(*output, format, ap);
+-    if (rc != 0) {
+-        output->resize(old_size);
+-    }
++    const int rc = string_vappendf(output, format, ap);
+     va_end(ap);
+     return rc;
+ }
+@@ -117,11 +138,7 @@ int string_vappendf(std::string* output, const char* 
format, va_list args) {
+ int string_printf(std::string* output, const char* format, ...) {
+     va_list ap;
+     va_start(ap, format);
+-    output->clear();
+-    const int rc = string_printf_impl(*output, format, ap);
+-    if (rc != 0) {
+-        output->clear();
+-    }
++    const int rc = string_vprintf(output, format, ap);
+     va_end(ap);
+     return rc;
+ };
+diff --git a/src/butil/string_printf.h b/src/butil/string_printf.h
+index 64eae84..7f9e3dd 100644
+--- a/src/butil/string_printf.h
++++ b/src/butil/string_printf.h
+@@ -27,6 +27,10 @@ namespace butil {
+ std::string string_printf(const char* format, ...)
+     __attribute__ ((format (printf, 1, 2)));
+ 
++// Hint size of formatted string.
++std::string string_printf(size_t hint_size, const char* format, ...)
++    __attribute__ ((format (printf, 2, 3)));
++
+ // Write |format| and associated arguments into |output|
+ // Returns 0 on success, -1 otherwise.
+ int string_printf(std::string* output, const char* fmt, ...)
+-- 
+2.43.5
+


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to