This is an automated email from the ASF dual-hosted git repository. wwbmmm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push: new 15469ed3 Support connect on socket create (#2574) 15469ed3 is described below commit 15469ed314f22e4cc598e0f0abdde64463e0b768 Author: Bright Chen <chenguangmin...@foxmail.com> AuthorDate: Thu Jun 13 12:23:28 2024 +0800 Support connect on socket create (#2574) --- src/brpc/rtmp.cpp | 2 +- src/brpc/selective_channel.cpp | 20 +++++--- src/brpc/socket.cpp | 65 +++++++++++++++++++------ src/brpc/socket.h | 27 ++++++++--- src/brpc/socket_inl.h | 2 + src/brpc/socket_map.cpp | 17 ++++--- src/brpc/versioned_ref_with_id.h | 5 ++ test/brpc_socket_unittest.cpp | 21 ++++---- test/brpc_ssl_unittest.cpp | 102 ++++++++++++++++++++++++++++++++++++--- 9 files changed, 207 insertions(+), 54 deletions(-) diff --git a/src/brpc/rtmp.cpp b/src/brpc/rtmp.cpp index ae6eb6ad..4913881c 100644 --- a/src/brpc/rtmp.cpp +++ b/src/brpc/rtmp.cpp @@ -1087,7 +1087,7 @@ public: : _connect_options(connect_options) { } - int CreateSocket(const SocketOptions& opt, SocketId* id) { + int CreateSocket(const SocketOptions& opt, SocketId* id) override { SocketOptions sock_opt = opt; sock_opt.app_connect = std::make_shared<RtmpConnect>(); sock_opt.initial_parsing_context = new policy::RtmpContext(&_connect_options, NULL); diff --git a/src/brpc/selective_channel.cpp b/src/brpc/selective_channel.cpp index 9ad5f9a0..5a815821 100644 --- a/src/brpc/selective_channel.cpp +++ b/src/brpc/selective_channel.cpp @@ -158,8 +158,8 @@ private: ChannelBalancer::~ChannelBalancer() { for (ChannelToIdMap::iterator it = _chan_map.begin(); it != _chan_map.end(); ++it) { - SocketUniquePtr ptr(it->second); // Dereference it->second->ReleaseAdditionalReference(); + it->second->ReleaseHCRelatedReference(); } _chan_map.clear(); } @@ -196,15 +196,21 @@ int ChannelBalancer::AddChannel(ChannelBase* sub_channel, return -1; } SocketUniquePtr ptr; - CHECK_EQ(0, Socket::Address(sock_id, &ptr)); + int rc = Socket::AddressFailedAsWell(sock_id, &ptr); + if (rc < 0 || (rc > 0 && !ptr->HCEnabled())) { + LOG(FATAL) << "Fail to address SocketId=" << sock_id; + return -1; + } if (!AddServer(ServerId(sock_id))) { LOG(ERROR) << "Duplicated sub_channel=" << sub_channel; // sub_chan will be deleted when the socket is recycled. ptr->SetFailed(); + // Cancel health checking. + ptr->ReleaseHCRelatedReference(); return -1; } - ptr->SetHCRelatedRefHeld(); // set held status - _chan_map[sub_channel]= ptr.release(); // Add reference. + // The health-check-related reference has been held on created. + _chan_map[sub_channel]= ptr.get(); if (handle) { *handle = sock_id; } @@ -223,13 +229,11 @@ void ChannelBalancer::RemoveAndDestroyChannel(SelectiveChannel::ChannelHandle ha BAIDU_SCOPED_LOCK(_mutex); CHECK_EQ(1UL, _chan_map.erase(sub->chan)); } - { - ptr->SetHCRelatedRefReleased(); // set released status to cancel health checking - SocketUniquePtr ptr2(ptr.get()); // Dereference. - } if (rc == 0) { ptr->ReleaseAdditionalReference(); } + // Cancel health checking. + ptr->ReleaseHCRelatedReference(); } } diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp index 85aa1507..ac1c37ae 100644 --- a/src/brpc/socket.cpp +++ b/src/brpc/socket.cpp @@ -577,6 +577,10 @@ int Socket::ResetFileDescriptor(int fd) { if (!ValidFileDescriptor(fd)) { return 0; } + if (_remote_side == butil::EndPoint()) { + // OK to fail, non-socket fd does not support this. + butil::get_remote_side(fd, &_remote_side); + } // OK to fail, non-socket fd does not support this. if (butil::get_local_side(fd, &_local_side) != 0) { _local_side = butil::EndPoint(); @@ -781,6 +785,19 @@ int Socket::OnCreated(const SocketOptions& options) { _keepalive_options = options.keepalive_options; CHECK(NULL == _write_head.load(butil::memory_order_relaxed)); _is_write_shutdown = false; + int fd = options.fd; + if (!ValidFileDescriptor(fd) && options.connect_on_create) { + // Connect on create. + fd = DoConnect(options.connect_abstime, NULL, NULL); + if (fd < 0) { + PLOG(ERROR) << "Fail to connect to " << options.remote_side; + int error_code = errno != 0 ? errno : EHOSTDOWN; + SetFailed(error_code, "Fail to connect to %s: %s", + butil::endpoint2str(options.remote_side).c_str(), + berror(error_code)); + return -1; + } + } // Must be the last one! Internal fields of this Socket may be accessed // just after calling ResetFileDescriptor. if (ResetFileDescriptor(options.fd) != 0) { @@ -790,6 +807,7 @@ int Socket::OnCreated(const SocketOptions& options) { berror(saved_errno)); return -1; } + HoldHCRelatedRef(); guard.dismiss(); return 0; @@ -940,6 +958,20 @@ std::string Socket::OnDescription() const { return result; } +void Socket::HoldHCRelatedRef() { + if (_health_check_interval_s > 0) { + _is_hc_related_ref_held = true; + AddReference(); + } +} + +void Socket::ReleaseHCRelatedReference() { + if (_health_check_interval_s > 0) { + _is_hc_related_ref_held = false; + Dereference(); + } +} + int Socket::WaitAndReset(int32_t expected_nref) { const uint32_t id_ver = VersionOfVRefId(id()); uint64_t vref; @@ -1350,16 +1382,27 @@ int Socket::CheckConnected(int sockfd) { return -1; } - butil::EndPoint local_point; - CHECK_EQ(0, butil::get_local_side(sockfd, &local_point)); - LOG_IF(INFO, FLAGS_log_connected) - << "Connected to " << remote_side() - << " via fd=" << (int)sockfd << " SocketId=" << id() - << " local_side=" << local_point; + if (FLAGS_log_connected) { + butil::EndPoint local_point; + CHECK_EQ(0, butil::get_local_side(sockfd, &local_point)); + LOG(INFO) << "Connected to " << remote_side() + << " via fd=" << (int)sockfd << " SocketId=" << id() + << " local_side=" << local_point; + } + // Doing SSL handshake after TCP connected return SSLHandshake(sockfd, false); } +int Socket::DoConnect(const timespec* abstime, + int (*on_connect)(int, int, void*), void* data) { + if (_conn) { + return _conn->Connect(this, abstime, on_connect, data); + } else { + return Connect(abstime, on_connect, data); + } +} + int Socket::ConnectIfNot(const timespec* abstime, WriteRequest* req) { if (_fd.load(butil::memory_order_consume) >= 0) { return 0; @@ -1370,14 +1413,8 @@ int Socket::ConnectIfNot(const timespec* abstime, WriteRequest* req) { SocketUniquePtr s; ReAddress(&s); req->set_socket(s.get()); - if (_conn) { - if (_conn->Connect(this, abstime, KeepWriteIfConnected, req) < 0) { - return -1; - } - } else { - if (Connect(abstime, KeepWriteIfConnected, req) < 0) { - return -1; - } + if (DoConnect(abstime, KeepWriteIfConnected, req) < 0) { + return -1; } s.release(); return 1; diff --git a/src/brpc/socket.h b/src/brpc/socket.h index 6d9c8f11..2a6ab748 100644 --- a/src/brpc/socket.h +++ b/src/brpc/socket.h @@ -256,6 +256,14 @@ struct SocketOptions { // user->BeforeRecycle() before recycling. int fd; butil::EndPoint remote_side; + // If `connect_on_create' is true and `fd' is less than 0, + // a client connection will be established to remote_side() + // regarding deadline `connect_abstime' when Socket is being created. + // Default: false, means that a connection will be established + // on first write. + bool connect_on_create; + // Default: NULL, means no timeout. + const timespec* connect_abstime; SocketUser* user; // When *edge-triggered* events happen on the file descriptor, callback // `on_edge_triggered_events' will be called. Inside the callback, user @@ -409,16 +417,15 @@ public: // True if health checking is enabled. bool HCEnabled() const { + // This fence makes sure that we see change of + // `_is_hc_related_ref_held' before changing `_versioned_ref. + butil::atomic_thread_fence(butil::memory_order_acquire); return _health_check_interval_s > 0 && _is_hc_related_ref_held; } - // When someone holds a health-checking-related reference, - // this function need to be called to make health checking run normally. - void SetHCRelatedRefHeld() { _is_hc_related_ref_held = true; } - // When someone releases the health-checking-related reference, - // this function need to be called to cancel health checking. - void SetHCRelatedRefReleased() { _is_hc_related_ref_held = false; } - bool IsHCRelatedRefHeld() const { return _is_hc_related_ref_held; } + // Release the health-checking-related + // reference which is held on created. + void ReleaseHCRelatedReference(); // After health checking is complete, set _hc_started to false. void AfterHCCompleted() { _hc_started.store(false, butil::memory_order_relaxed); } @@ -665,6 +672,9 @@ private: std::string OnDescription() const; + // Hold the health-checking-related + // reference on created. + void HoldHCRelatedRef(); static int Status(SocketId, int32_t* nref = NULL); // for unit-test. @@ -699,8 +709,11 @@ private: // starting a connection request and `on_connect' will be called // when connecting completes (whether it succeeds or not) // Returns the socket fd on success, -1 otherwise + int DoConnect(const timespec* abstime, + int (*on_connect)(int fd, int err, void* data), void* data); int Connect(const timespec* abstime, int (*on_connect)(int fd, int err, void* data), void* data); + int CheckConnected(int sockfd); // [Not thread-safe] Only used by `Write'. diff --git a/src/brpc/socket_inl.h b/src/brpc/socket_inl.h index a8ff3ce8..d704b900 100644 --- a/src/brpc/socket_inl.h +++ b/src/brpc/socket_inl.h @@ -25,6 +25,8 @@ namespace brpc { inline SocketOptions::SocketOptions() : fd(-1) + , connect_on_create(false) + , connect_abstime(NULL) , user(NULL) , on_edge_triggered_events(NULL) , health_check_interval_s(-1) diff --git a/src/brpc/socket_map.cpp b/src/brpc/socket_map.cpp index 774bf5a7..609d2776 100644 --- a/src/brpc/socket_map.cpp +++ b/src/brpc/socket_map.cpp @@ -58,7 +58,7 @@ static butil::static_atomic<SocketMap*> g_socket_map = BUTIL_STATIC_ATOMIC_INIT( class GlobalSocketCreator : public SocketCreator { public: - int CreateSocket(const SocketOptions& opt, SocketId* id) { + int CreateSocket(const SocketOptions& opt, SocketId* id) override { SocketOptions sock_opt = opt; sock_opt.health_check_interval_s = FLAGS_health_check_interval; return get_client_side_messenger()->Create(sock_opt, id); @@ -237,8 +237,7 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id, return 0; } // A socket w/o HC is failed (permanently), replace it. - sc->socket->SetHCRelatedRefReleased(); // set released status to cancel health checking - SocketUniquePtr ptr(sc->socket); // Remove the ref added at insertion. + sc->socket->ReleaseHCRelatedReference(); _map.erase(key); // in principle, we can override the entry in map w/o // removing and inserting it again. But this would make error branches // below have to remove the entry before returning, which is @@ -258,12 +257,15 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id, // use SocketUniquePtr which cannot put into containers before c++11. // The ref will be removed at entry's removal. SocketUniquePtr ptr; - if (Socket::Address(tmp_id, &ptr) != 0) { + int rc = Socket::AddressFailedAsWell(tmp_id, &ptr); + if (rc < 0) { LOG(FATAL) << "Fail to address SocketId=" << tmp_id; return -1; + } else if (rc > 0 && !ptr->HCEnabled()) { + LOG(FATAL) << "Failed socket is not HC-enabled"; + return -1; } - ptr->SetHCRelatedRefHeld(); // set held status - SingleConnection new_sc = { 1, ptr.release(), 0 }; + SingleConnection new_sc = { 1, ptr.get(), 0 }; _map[key] = new_sc; *id = tmp_id; mu.unlock(); @@ -301,8 +303,7 @@ void SocketMap::RemoveInternal(const SocketMapKey& key, _map.erase(key); mu.unlock(); s->ReleaseAdditionalReference(); // release extra ref - s->SetHCRelatedRefReleased(); // set released status to cancel health checking - SocketUniquePtr ptr(s); // Dereference + s->ReleaseHCRelatedReference(); } } } diff --git a/src/brpc/versioned_ref_with_id.h b/src/brpc/versioned_ref_with_id.h index 38141f3d..c78f019b 100644 --- a/src/brpc/versioned_ref_with_id.h +++ b/src/brpc/versioned_ref_with_id.h @@ -296,6 +296,11 @@ friend void DereferenceVersionedRefWithId<>(T* r); // it will be recycled automatically and T::BeforeRecycled() will be called. int Dereference(); + // Increase the reference count by 1. + void AddReference() { + _versioned_ref.fetch_add(1, butil::memory_order_release); + } + // Make this socket addressable again. // If nref is less than `at_least_nref', VersionedRefWithId was // abandoned during revival and cannot be revived. diff --git a/test/brpc_socket_unittest.cpp b/test/brpc_socket_unittest.cpp index f278c46b..e83eef7e 100644 --- a/test/brpc_socket_unittest.cpp +++ b/test/brpc_socket_unittest.cpp @@ -504,7 +504,6 @@ TEST_F(SocketTest, not_health_check_when_nref_hits_0) { { brpc::SocketUniquePtr s; ASSERT_EQ(0, brpc::Socket::Address(id, &s)); - s->SetHCRelatedRefHeld(); // set held status global_sock = s.get(); ASSERT_TRUE(s.get()); ASSERT_EQ(-1, s->fd()); @@ -542,6 +541,7 @@ TEST_F(SocketTest, not_health_check_when_nref_hits_0) { #endif ASSERT_TRUE(src.empty()); ASSERT_EQ(-1, s->fd()); + s->ReleaseHCRelatedReference(); } // StartHealthCheck is possibly still running. Spin until global_sock // is NULL(set in CheckRecycle::BeforeRecycle). Notice that you should @@ -650,12 +650,14 @@ TEST_F(SocketTest, health_check) { options.user = new CheckRecycle; options.health_check_interval_s = kCheckInteval/*s*/; ASSERT_EQ(0, brpc::Socket::Create(options, &id)); - brpc::SocketUniquePtr s; - ASSERT_EQ(0, brpc::Socket::Address(id, &s)); - - s->SetHCRelatedRefHeld(); // set held status - global_sock = s.get(); - ASSERT_TRUE(s.get()); + brpc::Socket* s = NULL; + { + brpc::SocketUniquePtr ptr; + ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)); + s = ptr.get(); + } + global_sock = s; + ASSERT_NE(nullptr, s); ASSERT_EQ(-1, s->fd()); ASSERT_EQ(point, s->remote_side()); ASSERT_EQ(id, s->id()); @@ -763,7 +765,7 @@ TEST_F(SocketTest, health_check) { ASSERT_NE(0, ptr->fd()); } - s.release()->Dereference(); + s->ReleaseHCRelatedReference(); // Must stop messenger before SetFailed the id otherwise StartHealthCheck // still has chance to get reconnected and revive the id. @@ -779,7 +781,8 @@ TEST_F(SocketTest, health_check) { bthread_usleep(1000); ASSERT_LT(butil::gettimeofday_us(), start_time + 1000000L); } - ASSERT_EQ(-1, brpc::Socket::Status(id)); + nref = 0; + ASSERT_EQ(-1, brpc::Socket::Status(id, &nref)) << "nref=" << nref; // The id is invalid. brpc::SocketUniquePtr ptr; ASSERT_EQ(-1, brpc::Socket::Address(id, &ptr)); diff --git a/test/brpc_ssl_unittest.cpp b/test/brpc_ssl_unittest.cpp index e101f534..3c64ba76 100644 --- a/test/brpc_ssl_unittest.cpp +++ b/test/brpc_ssl_unittest.cpp @@ -26,6 +26,9 @@ #include <butil/macros.h> #include <butil/fd_guard.h> #include <butil/files/scoped_file.h> +#include <brpc/policy/baidu_rpc_meta.pb.h> +#include <brpc/policy/baidu_rpc_protocol.h> +#include <brpc/policy/most_common_message.h> #include "brpc/global.h" #include "brpc/socket.h" #include "brpc/server.h" @@ -54,11 +57,11 @@ const std::string EXP_RESPONSE = "world"; class EchoServiceImpl : public test::EchoService { public: EchoServiceImpl() : count(0) {} - virtual ~EchoServiceImpl() { g_delete = true; } - virtual void Echo(google::protobuf::RpcController* cntl_base, - const test::EchoRequest* request, - test::EchoResponse* response, - google::protobuf::Closure* done) { + ~EchoServiceImpl() override { g_delete = true; } + void Echo(google::protobuf::RpcController* cntl_base, + const test::EchoRequest* request, + test::EchoResponse* response, + google::protobuf::Closure* done) override { brpc::ClosureGuard done_guard(done); brpc::Controller* cntl = (brpc::Controller*)cntl_base; count.fetch_add(1, butil::memory_order_relaxed); @@ -207,7 +210,7 @@ TEST_F(SSLTest, force_ssl) { test::EchoService_Stub stub(&channel); test::EchoResponse res; stub.Echo(&cntl, &req, &res, NULL); - EXPECT_EQ(EXP_RESPONSE, res.message()) << cntl.ErrorText(); + ASSERT_EQ(EXP_RESPONSE, res.message()) << cntl.ErrorText(); } { @@ -218,13 +221,98 @@ TEST_F(SSLTest, force_ssl) { test::EchoService_Stub stub(&channel); test::EchoResponse res; stub.Echo(&cntl, &req, &res, NULL); - EXPECT_TRUE(cntl.Failed()); + ASSERT_TRUE(cntl.Failed()); } ASSERT_EQ(0, server.Stop(0)); ASSERT_EQ(0, server.Join()); } +void ProcessResponse(brpc::InputMessageBase* msg_base) { + brpc::DestroyingPtr<brpc::policy::MostCommonMessage> msg( + static_cast<brpc::policy::MostCommonMessage*>(msg_base)); + brpc::policy::RpcMeta meta; + ASSERT_TRUE(brpc::ParsePbFromIOBuf(&meta, msg->meta)); + const brpc::policy::RpcResponseMeta &response_meta = meta.response(); + ASSERT_EQ(0, response_meta.error_code()) << response_meta.error_text(); + + const brpc::CallId cid = { static_cast<uint64_t>(meta.correlation_id()) }; + brpc::Controller* cntl = NULL; + ASSERT_EQ(0, bthread_id_lock(cid, (void**)&cntl)); + ASSERT_NE(nullptr, cntl); + ASSERT_TRUE(brpc::ParsePbFromIOBuf(cntl->response(), msg->payload)); + ASSERT_EQ(0, bthread_id_unlock_and_destroy(cid)); +} + +TEST_F(SSLTest, connect_on_create) { + brpc::Protocol dummy_protocol = { + brpc::policy::ParseRpcMessage, brpc::SerializeRequestDefault, + brpc::policy::PackRpcRequest,NULL, ProcessResponse, + NULL, NULL, NULL, brpc::CONNECTION_TYPE_ALL, "ssl_ut_baidu" + }; + ASSERT_EQ(0, RegisterProtocol((brpc::ProtocolType)30, dummy_protocol)); + + brpc::InputMessageHandler dummy_handler ={ + dummy_protocol.parse, dummy_protocol.process_response, + NULL, NULL, dummy_protocol.name + }; + brpc::InputMessenger messenger; + ASSERT_EQ(0, messenger.AddHandler(dummy_handler)); + + const int port = 8613; + brpc::Server server; + brpc::ServerOptions server_options; + server_options.force_ssl = true; + + brpc::CertInfo cert; + cert.certificate = "cert1.crt"; + cert.private_key = "cert1.key"; + server_options.mutable_ssl_options()->default_cert = cert; + + EchoServiceImpl echo_svc; + ASSERT_EQ(0, server.AddService( + &echo_svc, brpc::SERVER_DOESNT_OWN_SERVICE)); + ASSERT_EQ(0, server.Start(port, &server_options)); + + // Create client socket. + brpc::SocketOptions socket_options; + butil::EndPoint ep(butil::IP_ANY, port); + socket_options.remote_side = ep; + socket_options.connect_on_create = true; + socket_options.on_edge_triggered_events = brpc::InputMessenger::OnNewMessages; + socket_options.user = &messenger; + brpc::ChannelSSLOptions ssl_options; + SSL_CTX* raw_ctx = brpc::CreateClientSSLContext(ssl_options); + ASSERT_NE(nullptr, raw_ctx); + std::shared_ptr<brpc::SocketSSLContext> ssl_ctx + = std::make_shared<brpc::SocketSSLContext>(); + ssl_ctx->raw_ctx = raw_ctx; + socket_options.initial_ssl_ctx = ssl_ctx; + + brpc::SocketId socket_id; + ASSERT_EQ(0, brpc::Socket::Create(socket_options, &socket_id)); + brpc::SocketUniquePtr ptr; + ASSERT_EQ(0, brpc::Socket::Address(socket_id, &ptr)); + + test::EchoRequest req; + req.set_message(EXP_REQUEST); + for (int i = 0; i < 100; ++i) { + test::EchoResponse res; + butil::IOBuf request_buf; + butil::IOBuf request_body; + brpc::Controller cntl; + cntl._response = &res; + const brpc::CallId correlation_id = cntl.call_id(); + brpc::SerializeRequestDefault(&request_body, &cntl, &req); + brpc::policy::PackRpcRequest(&request_buf, NULL, correlation_id.value, + test::EchoService_Stub::descriptor()->method(0), + &cntl, request_body, NULL); + ASSERT_EQ(0, ptr->Write(&request_buf)); + brpc::Join(correlation_id); + ASSERT_EQ(EXP_RESPONSE, res.message()); + } +} + void CheckCert(const char* cname, const char* cert) { const int port = 8613; brpc::Channel channel; --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org