This is an automated email from the ASF dual-hosted git repository.

guangmingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 1337db03 Fix batch create stream and make SetHostSocket thread safe 
(#2938)
1337db03 is described below

commit 1337db0331037240de6424a1f6373ceb7235bd13
Author: Jenrry You <jenrry...@gmail.com>
AuthorDate: Fri Apr 18 16:30:56 2025 +0800

    Fix batch create stream and make SetHostSocket thread safe (#2938)
---
 src/brpc/controller.cpp |  4 ++--
 src/brpc/stream.cpp     | 25 ++++++++++---------------
 src/brpc/stream_impl.h  |  3 ++-
 3 files changed, 14 insertions(+), 18 deletions(-)

diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp
index 0cb83dc5..1362d322 100644
--- a/src/brpc/controller.cpp
+++ b/src/brpc/controller.cpp
@@ -1439,12 +1439,12 @@ void Controller::HandleStreamConnection(Socket 
*host_socket) {
     Stream* s = (Stream*)ptrs[0]->conn();
     s->SetConnected(_remote_stream_settings);
     if (stream_num > 1) {
-        const auto& extra_stream_ids = 
_remote_stream_settings->extra_stream_ids();
+        auto extra_stream_ids = 
std::move(*_remote_stream_settings->mutable_extra_stream_ids());
         _remote_stream_settings->clear_extra_stream_ids();
         for (size_t i = 1; i < stream_num; ++i) {
             Stream* extra_stream = (Stream *) ptrs[i]->conn();
             _remote_stream_settings->set_stream_id(extra_stream_ids[i - 1]);
-            s->ShareHostSocket(*extra_stream);
+            s->SetHostSocket(host_socket);
             extra_stream->SetConnected(_remote_stream_settings);
         }
     }
diff --git a/src/brpc/stream.cpp b/src/brpc/stream.cpp
index 68397b57..2a443054 100644
--- a/src/brpc/stream.cpp
+++ b/src/brpc/stream.cpp
@@ -640,17 +640,16 @@ void Stream::SendFeedback() {
 }
 
 int Stream::SetHostSocket(Socket *host_socket) {
-    if (_host_socket != NULL) {
-        CHECK(false) << "SetHostSocket has already been called";
-        return -1;
-    }
-    SocketUniquePtr ptr;
-    host_socket->ReAddress(&ptr);
-    // TODO add *this to host socke
-    if (ptr->AddStream(id()) != 0) {
-        return -1;
-    }
-    _host_socket = ptr.release();
+    std::call_once(_set_host_socket_flag, [this, host_socket]() {
+        SocketUniquePtr ptr;
+        host_socket->ReAddress(&ptr);
+        // TODO add *this to host socke
+        if (ptr->AddStream(id()) != 0) {
+            CHECK(false) << id() << " fail to add stream to host socket";
+            return;
+        }
+        _host_socket = ptr.release();
+    });
     return 0;
 }
 
@@ -710,10 +709,6 @@ void Stream::Close(int error_code, const char* reason_fmt, 
...) {
     return TriggerOnConnectIfNeed();
 }
 
-int Stream::ShareHostSocket(Stream& other_stream) {
-    return other_stream.SetHostSocket(_host_socket);
-}
-
 int Stream::SetFailed(StreamId id, int error_code, const char* reason_fmt, 
...) {
     SocketUniquePtr ptr;
     if (Socket::AddressFailedAsWell(id, &ptr) == -1) {
diff --git a/src/brpc/stream_impl.h b/src/brpc/stream_impl.h
index 66e0d719..5ff7cb04 100644
--- a/src/brpc/stream_impl.h
+++ b/src/brpc/stream_impl.h
@@ -19,6 +19,7 @@
 #ifndef  BRPC_STREAM_IMPL_H
 #define  BRPC_STREAM_IMPL_H
 
+#include <mutex>
 #include "bthread/bthread.h"
 #include "bthread/execution_queue.h"
 #include "brpc/socket.h"
@@ -67,7 +68,6 @@ public:
     __attribute__ ((__format__ (__printf__, 3, 4)));
     void Close(int error_code, const char* reason_fmt, ...)
         __attribute__ ((__format__ (__printf__, 3, 4)));
-    int ShareHostSocket(Stream& other_stream);
 
 private:
 friend void StreamWait(StreamId stream_id, const timespec *due_time,
@@ -134,6 +134,7 @@ friend struct butil::DefaultDeleter<Stream>;
     butil::IOBuf *_pending_buf;
     int64_t _start_idle_timer_us;
     bthread_timer_t _idle_timer;
+    std::once_flag _set_host_socket_flag;
 };
 
 } // namespace brpc


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org
For additional commands, e-mail: dev-h...@brpc.apache.org

Reply via email to