This is an automated email from the ASF dual-hosted git repository. guangmingchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push: new 1337db03 Fix batch create stream and make SetHostSocket thread safe (#2938) 1337db03 is described below commit 1337db0331037240de6424a1f6373ceb7235bd13 Author: Jenrry You <jenrry...@gmail.com> AuthorDate: Fri Apr 18 16:30:56 2025 +0800 Fix batch create stream and make SetHostSocket thread safe (#2938) --- src/brpc/controller.cpp | 4 ++-- src/brpc/stream.cpp | 25 ++++++++++--------------- src/brpc/stream_impl.h | 3 ++- 3 files changed, 14 insertions(+), 18 deletions(-) diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp index 0cb83dc5..1362d322 100644 --- a/src/brpc/controller.cpp +++ b/src/brpc/controller.cpp @@ -1439,12 +1439,12 @@ void Controller::HandleStreamConnection(Socket *host_socket) { Stream* s = (Stream*)ptrs[0]->conn(); s->SetConnected(_remote_stream_settings); if (stream_num > 1) { - const auto& extra_stream_ids = _remote_stream_settings->extra_stream_ids(); + auto extra_stream_ids = std::move(*_remote_stream_settings->mutable_extra_stream_ids()); _remote_stream_settings->clear_extra_stream_ids(); for (size_t i = 1; i < stream_num; ++i) { Stream* extra_stream = (Stream *) ptrs[i]->conn(); _remote_stream_settings->set_stream_id(extra_stream_ids[i - 1]); - s->ShareHostSocket(*extra_stream); + s->SetHostSocket(host_socket); extra_stream->SetConnected(_remote_stream_settings); } } diff --git a/src/brpc/stream.cpp b/src/brpc/stream.cpp index 68397b57..2a443054 100644 --- a/src/brpc/stream.cpp +++ b/src/brpc/stream.cpp @@ -640,17 +640,16 @@ void Stream::SendFeedback() { } int Stream::SetHostSocket(Socket *host_socket) { - if (_host_socket != NULL) { - CHECK(false) << "SetHostSocket has already been called"; - return -1; - } - SocketUniquePtr ptr; - host_socket->ReAddress(&ptr); - // TODO add *this to host socke - if (ptr->AddStream(id()) != 0) { - return -1; - } - _host_socket = ptr.release(); + std::call_once(_set_host_socket_flag, [this, host_socket]() { + SocketUniquePtr ptr; + host_socket->ReAddress(&ptr); + // TODO add *this to host socke + if (ptr->AddStream(id()) != 0) { + CHECK(false) << id() << " fail to add stream to host socket"; + return; + } + _host_socket = ptr.release(); + }); return 0; } @@ -710,10 +709,6 @@ void Stream::Close(int error_code, const char* reason_fmt, ...) { return TriggerOnConnectIfNeed(); } -int Stream::ShareHostSocket(Stream& other_stream) { - return other_stream.SetHostSocket(_host_socket); -} - int Stream::SetFailed(StreamId id, int error_code, const char* reason_fmt, ...) { SocketUniquePtr ptr; if (Socket::AddressFailedAsWell(id, &ptr) == -1) { diff --git a/src/brpc/stream_impl.h b/src/brpc/stream_impl.h index 66e0d719..5ff7cb04 100644 --- a/src/brpc/stream_impl.h +++ b/src/brpc/stream_impl.h @@ -19,6 +19,7 @@ #ifndef BRPC_STREAM_IMPL_H #define BRPC_STREAM_IMPL_H +#include <mutex> #include "bthread/bthread.h" #include "bthread/execution_queue.h" #include "brpc/socket.h" @@ -67,7 +68,6 @@ public: __attribute__ ((__format__ (__printf__, 3, 4))); void Close(int error_code, const char* reason_fmt, ...) __attribute__ ((__format__ (__printf__, 3, 4))); - int ShareHostSocket(Stream& other_stream); private: friend void StreamWait(StreamId stream_id, const timespec *due_time, @@ -134,6 +134,7 @@ friend struct butil::DefaultDeleter<Stream>; butil::IOBuf *_pending_buf; int64_t _start_idle_timer_us; bthread_timer_t _idle_timer; + std::once_flag _set_host_socket_flag; }; } // namespace brpc --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org