This is an automated email from the ASF dual-hosted git repository. guangmingchen pushed a commit to branch release-1.12 in repository https://gitbox.apache.org/repos/asf/brpc.git
commit e47e3d2c3811c4151d714678be0f269771a19408 Author: Bright Chen <chenguangmin...@foxmail.com> AuthorDate: Sun Jan 12 19:08:46 2025 +0800 Support tcp user timeout of client --- src/brpc/input_messenger.cpp | 17 ++- src/brpc/socket.cpp | 49 +++++-- src/brpc/socket.h | 52 ++++---- src/brpc/socket_inl.h | 15 --- src/brpc/versioned_ref_with_id.h | 4 +- test/brpc_socket_unittest.cpp | 279 ++++++++++++++++++++------------------- 6 files changed, 222 insertions(+), 194 deletions(-) diff --git a/src/brpc/input_messenger.cpp b/src/brpc/input_messenger.cpp index b28c7044..afb8fb3f 100644 --- a/src/brpc/input_messenger.cpp +++ b/src/brpc/input_messenger.cpp @@ -57,13 +57,20 @@ DEFINE_bool(socket_keepalive, false, "Enable keepalive of sockets if this value is true"); DEFINE_int32(socket_keepalive_idle_s, -1, - "Set idle time of sockets before keepalive if this value is positive"); + "Set idle time for socket keepalive in seconds if this value is positive"); DEFINE_int32(socket_keepalive_interval_s, -1, - "Set interval of sockets between keepalives if this value is positive"); + "Set interval between keepalives in seconds if this value is positive"); DEFINE_int32(socket_keepalive_count, -1, - "Set number of keepalives of sockets before close if this value is positive"); + "Set number of keepalives before death if this value is positive"); + +DEFINE_int32(socket_tcp_user_timeout_ms, -1, + "If this value is positive, set number of milliseconds that transmitted " + "data may remain unacknowledged, or bufferred data may remain untransmitted " + "(due to zero window size) before TCP will forcibly close the corresponding " + "connection and return ETIMEDOUT to the application. Only linux supports " + "TCP_USER_TIMEOUT."); DECLARE_bool(usercode_in_pthread); DECLARE_bool(usercode_in_coroutine); @@ -501,6 +508,7 @@ int InputMessenger::Create(const butil::EndPoint& remote_side, options.keepalive_options->keepalive_count = FLAGS_socket_keepalive_count; } + options.tcp_user_timeout_ms = FLAGS_socket_tcp_user_timeout_ms; return Socket::Create(options, id); } @@ -535,6 +543,9 @@ int InputMessenger::Create(SocketOptions options, SocketId* id) { = FLAGS_socket_keepalive_count; } } + if (options.tcp_user_timeout_ms <= 0) { + options.tcp_user_timeout_ms = FLAGS_socket_tcp_user_timeout_ms; + } return Socket::Create(options, id); } diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp index 0201a978..ac4c6892 100644 --- a/src/brpc/socket.cpp +++ b/src/brpc/socket.cpp @@ -492,6 +492,7 @@ Socket::Socket(Forbidden f) , _stream_set(NULL) , _total_streams_unconsumed_size(0) , _ninflight_app_health_check(0) + , _tcp_user_timeout_ms(-1) , _http_request_method(HTTP_METHOD_GET) { CreateVarsOnce(); pthread_mutex_init(&_id_wait_list_mutex, NULL); @@ -597,6 +598,21 @@ int Socket::ResetFileDescriptor(int fd) { // turn off nagling. // OK to fail, namely unix domain socket does not support this. butil::make_no_delay(fd); + + SetSocketOptions(fd); + + if (_on_edge_triggered_events) { + if (_io_event.AddConsumer(fd) != 0) { + PLOG(ERROR) << "Fail to add SocketId=" << id() + << " into EventDispatcher"; + _fd.store(-1, butil::memory_order_release); + return -1; + } + } + return 0; +} + +void Socket::SetSocketOptions(int fd) { if (_tos > 0 && setsockopt(fd, IPPROTO_IP, IP_TOS, &_tos, sizeof(_tos)) != 0) { PLOG(ERROR) << "Fail to set tos of fd=" << fd << " to " << _tos; @@ -618,27 +634,21 @@ int Socket::ResetFileDescriptor(int fd) { } } - EnableKeepaliveIfNeeded(fd); - - if (_on_edge_triggered_events) { - if (_io_event.AddConsumer(fd) != 0) { - PLOG(ERROR) << "Fail to add SocketId=" << id() - << " into EventDispatcher"; - _fd.store(-1, butil::memory_order_release); - return -1; +#if defined(OS_LINUX) + if (_tcp_user_timeout_ms > 0) { + if (setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, + &_tcp_user_timeout_ms, sizeof(_tcp_user_timeout_ms)) != 0) { + PLOG(ERROR) << "Fail to set TCP_USER_TIMEOUT of fd=" << fd; } } - return 0; -} +#endif -void Socket::EnableKeepaliveIfNeeded(int fd) { if (!_keepalive_options) { return; } int keepalive = 1; - if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &keepalive, - sizeof(keepalive)) != 0) { + if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &keepalive, sizeof(keepalive)) != 0) { PLOG(ERROR) << "Fail to set keepalive of fd=" << fd; return; } @@ -782,6 +792,7 @@ int Socket::OnCreated(const SocketOptions& options) { _last_writetime_us.store(cpuwide_now, butil::memory_order_relaxed); _unwritten_bytes.store(0, butil::memory_order_relaxed); _keepalive_options = options.keepalive_options; + _tcp_user_timeout_ms = options.tcp_user_timeout_ms; CHECK(NULL == _write_head.load(butil::memory_order_relaxed)); _is_write_shutdown = false; int fd = options.fd; @@ -1388,7 +1399,7 @@ int Socket::CheckConnected(int sockfd) { butil::EndPoint local_point; CHECK_EQ(0, butil::get_local_side(sockfd, &local_point)); LOG(INFO) << "Connected to " << remote_side() - << " via fd=" << (int)sockfd << " SocketId=" << id() + << " via fd=" << sockfd << " SocketId=" << id() << " local_side=" << local_point; } @@ -2501,6 +2512,16 @@ void Socket::DebugSocket(std::ostream& os, SocketId id) { #endif } +#if defined(OS_LINUX) + { + int tcp_user_timeout = 0; + socklen_t len = sizeof(tcp_user_timeout); + if (getsockopt(fd, SOL_TCP, TCP_USER_TIMEOUT, &tcp_user_timeout, &len) == 0) { + os << "\ntcp_user_timeout=" << tcp_user_timeout; + } + } +#endif + #if defined(OS_MACOSX) struct tcp_connection_info ti; socklen_t len = sizeof(ti); diff --git a/src/brpc/socket.h b/src/brpc/socket.h index 2a6ab748..a84c0abd 100644 --- a/src/brpc/socket.h +++ b/src/brpc/socket.h @@ -234,60 +234,57 @@ struct SocketSSLContext { }; struct SocketKeepaliveOptions { - SocketKeepaliveOptions() - : keepalive_idle_s(-1) - , keepalive_interval_s(-1) - , keepalive_count(-1) - {} // Start keeplives after this period. - int keepalive_idle_s; + int keepalive_idle_s{-1}; // Interval between keepalives. - int keepalive_interval_s; + int keepalive_interval_s{-1}; // Number of keepalives before death. - int keepalive_count; + int keepalive_count{-1}; }; // TODO: Comment fields struct SocketOptions { - SocketOptions(); - // If `fd' is non-negative, set `fd' to be non-blocking and take the // ownership. Socket will close the fd(if needed) and call // user->BeforeRecycle() before recycling. - int fd; + int fd{-1}; butil::EndPoint remote_side; // If `connect_on_create' is true and `fd' is less than 0, // a client connection will be established to remote_side() // regarding deadline `connect_abstime' when Socket is being created. // Default: false, means that a connection will be established // on first write. - bool connect_on_create; + bool connect_on_create{false}; // Default: NULL, means no timeout. - const timespec* connect_abstime; - SocketUser* user; + const timespec* connect_abstime{NULL}; + SocketUser* user{NULL}; // When *edge-triggered* events happen on the file descriptor, callback // `on_edge_triggered_events' will be called. Inside the callback, user // shall read fd() in non-blocking mode until all data has been read // or EAGAIN is met, otherwise the callback will not be called again // until new data arrives. The callback will not be called from more than // one thread at any time. - void (*on_edge_triggered_events)(Socket*); - int health_check_interval_s; + void (*on_edge_triggered_events)(Socket*){NULL}; + int health_check_interval_s{-1}; // Only accept ssl connection. - bool force_ssl; + bool force_ssl{false}; std::shared_ptr<SocketSSLContext> initial_ssl_ctx; - bool use_rdma; - bthread_keytable_pool_t* keytable_pool; - SocketConnection* conn; + bool use_rdma{false}; + bthread_keytable_pool_t* keytable_pool{NULL}; + SocketConnection* conn{NULL}; std::shared_ptr<AppConnect> app_connect; // The created socket will set parsing_context with this value. - Destroyable* initial_parsing_context; + Destroyable* initial_parsing_context{NULL}; // Socket keepalive related options. // Refer to `SocketKeepaliveOptions' for details. std::shared_ptr<SocketKeepaliveOptions> keepalive_options; + // https://github.com/apache/brpc/issues/1154 + // https://github.com/grpc/grpc/pull/16419/files + // Only linux supports TCP_USER_TIMEOUT. + int tcp_user_timeout_ms{ -1}; // Tag of this socket - bthread_tag_t bthread_tag; + bthread_tag_t bthread_tag{BTHREAD_TAG_DEFAULT}; }; // Abstractions on reading from and writing into file descriptors. @@ -725,7 +722,7 @@ private: int ResetFileDescriptor(int fd); - void EnableKeepaliveIfNeeded(int fd); + void SetSocketOptions(int fd); // Wait until nref hits `expected_nref' and reset some internal resources. int WaitAndReset(int32_t expected_nref); @@ -973,6 +970,15 @@ private: // non-NULL means that keepalive is on. std::shared_ptr<SocketKeepaliveOptions> _keepalive_options; + // Only linux supports TCP_USER_TIMEOUT. + // When the value is greater than 0, it specifies the maximum + // amount of time in milliseconds that transmitted data may + // remain unacknowledged, or bufferred data may remain + // untransmitted (due to zero window size) before TCP will + // forcibly close the corresponding connection and return + // ETIMEDOUT to the application. + int _tcp_user_timeout_ms; + HttpMethod _http_request_method; }; diff --git a/src/brpc/socket_inl.h b/src/brpc/socket_inl.h index d704b900..ea8a392e 100644 --- a/src/brpc/socket_inl.h +++ b/src/brpc/socket_inl.h @@ -23,21 +23,6 @@ namespace brpc { -inline SocketOptions::SocketOptions() - : fd(-1) - , connect_on_create(false) - , connect_abstime(NULL) - , user(NULL) - , on_edge_triggered_events(NULL) - , health_check_interval_s(-1) - , force_ssl(false) - , use_rdma(false) - , keytable_pool(NULL) - , conn(NULL) - , app_connect(NULL) - , initial_parsing_context(NULL) - , bthread_tag(BTHREAD_TAG_DEFAULT) {} - inline bool Socket::MoreReadEvents(int* progress) { // Fail to CAS means that new events arrived. return !_nevent.compare_exchange_strong( diff --git a/src/brpc/versioned_ref_with_id.h b/src/brpc/versioned_ref_with_id.h index 23ea0e0a..e7c3ef8d 100644 --- a/src/brpc/versioned_ref_with_id.h +++ b/src/brpc/versioned_ref_with_id.h @@ -211,7 +211,7 @@ public: // Create a VersionedRefWithId, put the identifier into `id'. // `args' will be passed to OnCreated() directly. // Returns 0 on success, -1 otherwise. - template<typename ... Args> + template<typename... Args> static int Create(VRefId* id, Args&&... args); // Place the VersionedRefWithId associated with identifier `id' into @@ -350,7 +350,7 @@ void DereferenceVersionedRefWithId(T* r) { } template <typename T> -template<typename ... Args> +template<typename... Args> int VersionedRefWithId<T>::Create(VRefId* id, Args&&... args) { resource_id_t slot; T* const t = butil::get_resource(&slot, Forbidden()); diff --git a/test/brpc_socket_unittest.cpp b/test/brpc_socket_unittest.cpp index 3f9b88ad..59075b2b 100644 --- a/test/brpc_socket_unittest.cpp +++ b/test/brpc_socket_unittest.cpp @@ -59,6 +59,7 @@ DECLARE_bool(socket_keepalive); DECLARE_int32(socket_keepalive_idle_s); DECLARE_int32(socket_keepalive_interval_s); DECLARE_int32(socket_keepalive_count); +DECLARE_int32(socket_tcp_user_timeout_ms); } void EchoProcessHuluRequest(brpc::InputMessageBase* msg_base); @@ -1113,16 +1114,15 @@ TEST_F(SocketTest, keepalive) { int default_keepalive_count = 0; { butil::fd_guard sockfd(socket(AF_INET, SOCK_STREAM, 0)); - GetKeepaliveValue(sockfd, - default_keepalive, - default_keepalive_idle, - default_keepalive_interval, - default_keepalive_count); + ASSERT_GT(sockfd, 0); + GetKeepaliveValue(sockfd, default_keepalive, default_keepalive_idle, + default_keepalive_interval, default_keepalive_count); } // Disable keepalive. { - butil::fd_guard sockfd(socket(AF_INET, SOCK_STREAM, 0)); + int sockfd = socket(AF_INET, SOCK_STREAM, 0); + ASSERT_GT(sockfd, 0); brpc::SocketOptions options; options.fd = sockfd; brpc::SocketId id; @@ -1130,7 +1130,6 @@ TEST_F(SocketTest, keepalive) { brpc::SocketUniquePtr ptr; ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)); CheckNoKeepalive(ptr->fd()); - sockfd.release(); } int keepalive_idle = 1; @@ -1138,7 +1137,8 @@ TEST_F(SocketTest, keepalive) { int keepalive_count = 2; // Enable keepalive. { - butil::fd_guard sockfd(socket(AF_INET, SOCK_STREAM, 0)); + int sockfd = socket(AF_INET, SOCK_STREAM, 0); + ASSERT_GT(sockfd, 0); brpc::SocketOptions options; options.fd = sockfd; options.keepalive_options = std::make_shared<brpc::SocketKeepaliveOptions>(); @@ -1146,17 +1146,14 @@ TEST_F(SocketTest, keepalive) { ASSERT_EQ(0, brpc::Socket::Create(options, &id)); brpc::SocketUniquePtr ptr; ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)); - CheckKeepalive(ptr->fd(), - true, - default_keepalive_idle, - default_keepalive_interval, - default_keepalive_count); - sockfd.release(); + CheckKeepalive(ptr->fd(), true, default_keepalive_idle, + default_keepalive_interval, default_keepalive_count); } // Enable keepalive and set keepalive idle. { - butil::fd_guard sockfd(socket(AF_INET, SOCK_STREAM, 0)); + int sockfd = socket(AF_INET, SOCK_STREAM, 0); + ASSERT_GT(sockfd, 0); brpc::SocketOptions options; options.fd = sockfd; options.keepalive_options = std::make_shared<brpc::SocketKeepaliveOptions>(); @@ -1166,17 +1163,15 @@ TEST_F(SocketTest, keepalive) { ASSERT_EQ(0, brpc::Socket::Create(options, &id)); brpc::SocketUniquePtr ptr; ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)); - CheckKeepalive(ptr->fd(), - true, - keepalive_idle, + CheckKeepalive(ptr->fd(), true, keepalive_idle, default_keepalive_interval, default_keepalive_count); - sockfd.release(); } // Enable keepalive and set keepalive interval. { - butil::fd_guard sockfd(socket(AF_INET, SOCK_STREAM, 0)); + int sockfd = socket(AF_INET, SOCK_STREAM, 0); + ASSERT_GT(sockfd, 0); brpc::SocketOptions options; options.fd = sockfd; options.keepalive_options = std::make_shared<brpc::SocketKeepaliveOptions>(); @@ -1186,56 +1181,42 @@ TEST_F(SocketTest, keepalive) { ASSERT_EQ(0, brpc::Socket::Create(options, &id)); brpc::SocketUniquePtr ptr; ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)); - CheckKeepalive(ptr->fd(), - true, - default_keepalive_idle, - keepalive_interval, - default_keepalive_count); - sockfd.release(); + CheckKeepalive(ptr->fd(), true, default_keepalive_idle, + keepalive_interval, default_keepalive_count); } // Enable keepalive and set keepalive count. { - butil::fd_guard sockfd(socket(AF_INET, SOCK_STREAM, 0)); + int sockfd = socket(AF_INET, SOCK_STREAM, 0); + ASSERT_GT(sockfd, 0); brpc::SocketOptions options; options.fd = sockfd; options.keepalive_options = std::make_shared<brpc::SocketKeepaliveOptions>(); - options.keepalive_options->keepalive_count - = keepalive_count; + options.keepalive_options->keepalive_count = keepalive_count; brpc::SocketId id; ASSERT_EQ(0, brpc::Socket::Create(options, &id)); brpc::SocketUniquePtr ptr; ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)); - CheckKeepalive(ptr->fd(), - true, - default_keepalive_idle, - default_keepalive_interval, - keepalive_count); - sockfd.release(); + CheckKeepalive(ptr->fd(), true, default_keepalive_idle, + default_keepalive_interval, keepalive_count); } // Enable keepalive and set keepalive idle, interval, count. { - butil::fd_guard sockfd(socket(AF_INET, SOCK_STREAM, 0)); + int sockfd = socket(AF_INET, SOCK_STREAM, 0); + ASSERT_GT(sockfd, 0); brpc::SocketOptions options; options.fd = sockfd; options.keepalive_options = std::make_shared<brpc::SocketKeepaliveOptions>(); - options.keepalive_options->keepalive_idle_s - = keepalive_idle; - options.keepalive_options->keepalive_interval_s - = keepalive_interval; - options.keepalive_options->keepalive_count - = keepalive_count; + options.keepalive_options->keepalive_idle_s = keepalive_idle; + options.keepalive_options->keepalive_interval_s = keepalive_interval; + options.keepalive_options->keepalive_count = keepalive_count; brpc::SocketId id; ASSERT_EQ(0, brpc::Socket::Create(options, &id)); brpc::SocketUniquePtr ptr; ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)); - CheckKeepalive(ptr->fd(), - true, - keepalive_idle, - keepalive_interval, - keepalive_count); - sockfd.release(); + CheckKeepalive(ptr->fd(), true, keepalive_idle, + keepalive_interval, keepalive_count); } } @@ -1246,101 +1227,83 @@ TEST_F(SocketTest, keepalive_input_message) { int default_keepalive_count = 0; { butil::fd_guard sockfd(socket(AF_INET, SOCK_STREAM, 0)); - GetKeepaliveValue(sockfd, - default_keepalive, - default_keepalive_idle, - default_keepalive_interval, - default_keepalive_count); + ASSERT_GT(sockfd, 0); + GetKeepaliveValue(sockfd, default_keepalive, default_keepalive_idle, + default_keepalive_interval, default_keepalive_count); } // Disable keepalive. { - butil::fd_guard sockfd(socket(AF_INET, SOCK_STREAM, 0)); + int sockfd = socket(AF_INET, SOCK_STREAM, 0); + ASSERT_GT(sockfd, 0); brpc::SocketOptions options; options.fd = sockfd; brpc::SocketId id; - ASSERT_EQ(0, brpc::get_or_new_client_side_messenger() - ->Create(options, &id)); + ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()->Create(options, &id)); brpc::SocketUniquePtr ptr; ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)); CheckNoKeepalive(ptr->fd()); - sockfd.release(); } // Enable keepalive. brpc::FLAGS_socket_keepalive = true; { - butil::fd_guard sockfd(socket(AF_INET, SOCK_STREAM, 0)); + int sockfd = socket(AF_INET, SOCK_STREAM, 0); + ASSERT_GT(sockfd, 0); brpc::SocketOptions options; options.fd = sockfd; brpc::SocketId id; - ASSERT_EQ(0, brpc::get_or_new_client_side_messenger() - ->Create(options, &id)); + ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()->Create(options, &id)); brpc::SocketUniquePtr ptr; ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)); - CheckKeepalive(ptr->fd(), - true, - default_keepalive_idle, - default_keepalive_interval, - default_keepalive_count); - sockfd.release(); + CheckKeepalive(ptr->fd(), true, default_keepalive_idle, + default_keepalive_interval, default_keepalive_count); } // Enable keepalive and set keepalive idle. brpc::FLAGS_socket_keepalive_idle_s = 10; { - butil::fd_guard sockfd(socket(AF_INET, SOCK_STREAM, 0)); + int sockfd = socket(AF_INET, SOCK_STREAM, 0); + ASSERT_GT(sockfd, 0); brpc::SocketOptions options; options.fd = sockfd; brpc::SocketId id; - ASSERT_EQ(0, brpc::get_or_new_client_side_messenger() - ->Create(options, &id)); + ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()->Create(options, &id)); brpc::SocketUniquePtr ptr; ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)); - CheckKeepalive(ptr->fd(), - true, - brpc::FLAGS_socket_keepalive_idle_s, - default_keepalive_interval, - default_keepalive_count); - sockfd.release(); + CheckKeepalive(ptr->fd(), true, brpc::FLAGS_socket_keepalive_idle_s, + default_keepalive_interval, default_keepalive_count); } // Enable keepalive and set keepalive idle, interval. brpc::FLAGS_socket_keepalive_interval_s = 10; { - butil::fd_guard sockfd(socket(AF_INET, SOCK_STREAM, 0)); + int sockfd = socket(AF_INET, SOCK_STREAM, 0); + ASSERT_GT(sockfd, 0); brpc::SocketOptions options; options.fd = sockfd; brpc::SocketId id; - ASSERT_EQ(0, brpc::get_or_new_client_side_messenger() - ->Create(options, &id)); + ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()->Create(options, &id)); brpc::SocketUniquePtr ptr; ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)); - CheckKeepalive(ptr->fd(), - true, - brpc::FLAGS_socket_keepalive_idle_s, - brpc::FLAGS_socket_keepalive_interval_s, - default_keepalive_count); - sockfd.release(); + CheckKeepalive(ptr->fd(), true, brpc::FLAGS_socket_keepalive_idle_s, + brpc::FLAGS_socket_keepalive_interval_s, default_keepalive_count); } // Enable keepalive and set keepalive idle, interval, count. brpc::FLAGS_socket_keepalive_count = 10; { - butil::fd_guard sockfd(socket(AF_INET, SOCK_STREAM, 0)); + int sockfd = socket(AF_INET, SOCK_STREAM, 0); + ASSERT_GT(sockfd, 0); brpc::SocketOptions options; options.fd = sockfd; brpc::SocketId id; - ASSERT_EQ(0, brpc::get_or_new_client_side_messenger() - ->Create(options, &id)); + ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()->Create(options, &id)); brpc::SocketUniquePtr ptr; ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)); - CheckKeepalive(ptr->fd(), - true, - brpc::FLAGS_socket_keepalive_idle_s, + CheckKeepalive(ptr->fd(), true, brpc::FLAGS_socket_keepalive_idle_s, brpc::FLAGS_socket_keepalive_interval_s, brpc::FLAGS_socket_keepalive_count); - sockfd.release(); } // Options of keepalive set by user have priority over Gflags. @@ -1348,90 +1311,132 @@ TEST_F(SocketTest, keepalive_input_message) { int keepalive_interval = 2; int keepalive_count = 2; { - butil::fd_guard sockfd(socket(AF_INET, SOCK_STREAM, 0)); + int sockfd = socket(AF_INET, SOCK_STREAM, 0); + ASSERT_GT(sockfd, 0); brpc::SocketOptions options; options.fd = sockfd; options.keepalive_options = std::make_shared<brpc::SocketKeepaliveOptions>(); - options.keepalive_options->keepalive_idle_s - = keepalive_idle; + options.keepalive_options->keepalive_idle_s = keepalive_idle; brpc::SocketId id; - ASSERT_EQ(0, brpc::get_or_new_client_side_messenger() - ->Create(options, &id)); + ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()->Create(options, &id)); brpc::SocketUniquePtr ptr; ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)); - CheckKeepalive(ptr->fd(), - true, - keepalive_idle, + CheckKeepalive(ptr->fd(), true, keepalive_idle, brpc::FLAGS_socket_keepalive_interval_s, brpc::FLAGS_socket_keepalive_count); - sockfd.release(); } { - butil::fd_guard sockfd(socket(AF_INET, SOCK_STREAM, 0)); + int sockfd = socket(AF_INET, SOCK_STREAM, 0); + ASSERT_GT(sockfd, 0); brpc::SocketOptions options; options.fd = sockfd; options.keepalive_options = std::make_shared<brpc::SocketKeepaliveOptions>(); - options.keepalive_options->keepalive_interval_s - = keepalive_interval; + options.keepalive_options->keepalive_interval_s = keepalive_interval; brpc::SocketId id; - ASSERT_EQ(0, brpc::get_or_new_client_side_messenger() - ->Create(options, &id)); + ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()->Create(options, &id)); brpc::SocketUniquePtr ptr; ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)); - CheckKeepalive(ptr->fd(), - true, - brpc::FLAGS_socket_keepalive_idle_s, - keepalive_interval, - brpc::FLAGS_socket_keepalive_count); - sockfd.release(); + CheckKeepalive(ptr->fd(), true, brpc::FLAGS_socket_keepalive_idle_s, + keepalive_interval, brpc::FLAGS_socket_keepalive_count); } { - butil::fd_guard sockfd(socket(AF_INET, SOCK_STREAM, 0)); + int sockfd = socket(AF_INET, SOCK_STREAM, 0); + ASSERT_GT(sockfd, 0); brpc::SocketOptions options; options.fd = sockfd; options.keepalive_options = std::make_shared<brpc::SocketKeepaliveOptions>(); - options.keepalive_options->keepalive_count - = keepalive_count; + options.keepalive_options->keepalive_count = keepalive_count; brpc::SocketId id; - ASSERT_EQ(0, brpc::get_or_new_client_side_messenger() - ->Create(options, &id)); + ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()->Create(options, &id)); brpc::SocketUniquePtr ptr; ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)); - CheckKeepalive(ptr->fd(), - true, - brpc::FLAGS_socket_keepalive_idle_s, - brpc::FLAGS_socket_keepalive_interval_s, - keepalive_count); - sockfd.release(); + CheckKeepalive(ptr->fd(), true, brpc::FLAGS_socket_keepalive_idle_s, + brpc::FLAGS_socket_keepalive_interval_s, keepalive_count); } { - butil::fd_guard sockfd(socket(AF_INET, SOCK_STREAM, 0)); + int sockfd = socket(AF_INET, SOCK_STREAM, 0); + ASSERT_GT(sockfd, 0); brpc::SocketOptions options; options.fd = sockfd; options.keepalive_options = std::make_shared<brpc::SocketKeepaliveOptions>(); - options.keepalive_options->keepalive_idle_s - = keepalive_idle; - options.keepalive_options->keepalive_interval_s - = keepalive_interval; - options.keepalive_options->keepalive_count - = keepalive_count; + options.keepalive_options->keepalive_idle_s = keepalive_idle; + options.keepalive_options->keepalive_interval_s = keepalive_interval; + options.keepalive_options->keepalive_count = keepalive_count; brpc::SocketId id; - ASSERT_EQ(0, brpc::get_or_new_client_side_messenger() - ->Create(options, &id)); + ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()->Create(options, &id)); brpc::SocketUniquePtr ptr; ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)); - CheckKeepalive(ptr->fd(), - true, - keepalive_idle, - keepalive_interval, - keepalive_count); - sockfd.release(); + CheckKeepalive(ptr->fd(), true, keepalive_idle, + keepalive_interval, keepalive_count); } } +#if defined(OS_LINUX) +void CheckTCPUserTimeout(int fd, int expect_tcp_user_timeout) { + int tcp_user_timeout = 0; + socklen_t len = sizeof(tcp_user_timeout); + ASSERT_EQ(0, getsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &tcp_user_timeout, &len) ); + ASSERT_EQ(tcp_user_timeout, expect_tcp_user_timeout); +} + +TEST_F(SocketTest, tcp_user_timeout) { + { + int sockfd = socket(AF_INET, SOCK_STREAM, 0); + ASSERT_GT(sockfd, 0); + brpc::SocketOptions options; + options.fd = sockfd; + brpc::SocketId id; + ASSERT_EQ(0, brpc::Socket::Create(options, &id)); + brpc::SocketUniquePtr ptr; + ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)); + CheckTCPUserTimeout(ptr->fd(), 0); + } + + { + int tcp_user_timeout_ms = 1000; + int sockfd = socket(AF_INET, SOCK_STREAM, 0); + ASSERT_GT(sockfd, 0); + brpc::SocketOptions options; + options.fd = sockfd; + options.tcp_user_timeout_ms = tcp_user_timeout_ms; + brpc::SocketId id; + ASSERT_EQ(0, brpc::Socket::Create(options, &id)); + brpc::SocketUniquePtr ptr; + ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)); + CheckTCPUserTimeout(ptr->fd(), tcp_user_timeout_ms); + } + + brpc::FLAGS_socket_tcp_user_timeout_ms = 2000; + { + int sockfd = socket(AF_INET, SOCK_STREAM, 0); + ASSERT_GT(sockfd, 0); + brpc::SocketOptions options; + options.fd = sockfd; + brpc::SocketId id; + ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()->Create(options, &id)); + brpc::SocketUniquePtr ptr; + ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)); + CheckTCPUserTimeout(ptr->fd(), brpc::FLAGS_socket_tcp_user_timeout_ms); + } + { + int tcp_user_timeout_ms = 3000; + int sockfd = socket(AF_INET, SOCK_STREAM, 0); + ASSERT_GT(sockfd, 0); + brpc::SocketOptions options; + options.fd = sockfd; + options.tcp_user_timeout_ms = tcp_user_timeout_ms; + brpc::SocketId id; + ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()->Create(options, &id)); + brpc::SocketUniquePtr ptr; + ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)); + CheckTCPUserTimeout(ptr->fd(), tcp_user_timeout_ms); + } +} +#endif + int HandleSocketSuccessWrite(bthread_id_t id, void* data, int error_code, const std::string& error_text) { auto success_count = static_cast<size_t*>(data); --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org