This is an automated email from the ASF dual-hosted git repository.

guangmingchen pushed a commit to branch release-1.12.1
in repository https://gitbox.apache.org/repos/asf/brpc.git

commit 56d349b167be4daf76bcb17216c8853f7eb3f6ec
Author: Bright Chen <chenguangmin...@foxmail.com>
AuthorDate: Sun Jan 12 19:08:46 2025 +0800

    Support tcp user timeout of client
---
 src/brpc/input_messenger.cpp     |  17 ++-
 src/brpc/socket.cpp              |  49 +++++--
 src/brpc/socket.h                |  52 ++++----
 src/brpc/socket_inl.h            |  15 ---
 src/brpc/versioned_ref_with_id.h |   4 +-
 test/brpc_socket_unittest.cpp    | 279 ++++++++++++++++++++-------------------
 6 files changed, 222 insertions(+), 194 deletions(-)

diff --git a/src/brpc/input_messenger.cpp b/src/brpc/input_messenger.cpp
index b28c7044..afb8fb3f 100644
--- a/src/brpc/input_messenger.cpp
+++ b/src/brpc/input_messenger.cpp
@@ -57,13 +57,20 @@ DEFINE_bool(socket_keepalive, false,
             "Enable keepalive of sockets if this value is true");
 
 DEFINE_int32(socket_keepalive_idle_s, -1,
-             "Set idle time of sockets before keepalive if this value is 
positive");
+             "Set idle time for socket keepalive in seconds if this value is 
positive");
 
 DEFINE_int32(socket_keepalive_interval_s, -1,
-             "Set interval of sockets between keepalives if this value is 
positive");
+             "Set interval between keepalives in seconds if this value is 
positive");
 
 DEFINE_int32(socket_keepalive_count, -1,
-             "Set number of keepalives of sockets before close if this value 
is positive");
+             "Set number of keepalives before death if this value is 
positive");
+
+DEFINE_int32(socket_tcp_user_timeout_ms, -1,
+             "If this value is positive, set number of milliseconds that 
transmitted "
+             "data may remain unacknowledged, or bufferred data may remain 
untransmitted "
+             "(due to zero window size) before TCP will forcibly close the 
corresponding "
+             "connection and return ETIMEDOUT to the application. Only linux 
supports "
+             "TCP_USER_TIMEOUT.");
 
 DECLARE_bool(usercode_in_pthread);
 DECLARE_bool(usercode_in_coroutine);
@@ -501,6 +508,7 @@ int InputMessenger::Create(const butil::EndPoint& 
remote_side,
         options.keepalive_options->keepalive_count
             = FLAGS_socket_keepalive_count;
     }
+    options.tcp_user_timeout_ms = FLAGS_socket_tcp_user_timeout_ms;
     return Socket::Create(options, id);
 }
 
@@ -535,6 +543,9 @@ int InputMessenger::Create(SocketOptions options, SocketId* 
id) {
                 = FLAGS_socket_keepalive_count;
         }
     }
+    if (options.tcp_user_timeout_ms <= 0) {
+        options.tcp_user_timeout_ms = FLAGS_socket_tcp_user_timeout_ms;
+    }
     return Socket::Create(options, id);
 }
 
diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp
index 0201a978..ac4c6892 100644
--- a/src/brpc/socket.cpp
+++ b/src/brpc/socket.cpp
@@ -492,6 +492,7 @@ Socket::Socket(Forbidden f)
     , _stream_set(NULL)
     , _total_streams_unconsumed_size(0)
     , _ninflight_app_health_check(0)
+    , _tcp_user_timeout_ms(-1)
     , _http_request_method(HTTP_METHOD_GET) {
     CreateVarsOnce();
     pthread_mutex_init(&_id_wait_list_mutex, NULL);
@@ -597,6 +598,21 @@ int Socket::ResetFileDescriptor(int fd) {
     // turn off nagling.
     // OK to fail, namely unix domain socket does not support this.
     butil::make_no_delay(fd);
+
+    SetSocketOptions(fd);
+
+    if (_on_edge_triggered_events) {
+        if (_io_event.AddConsumer(fd) != 0) {
+            PLOG(ERROR) << "Fail to add SocketId=" << id() 
+                        << " into EventDispatcher";
+            _fd.store(-1, butil::memory_order_release);
+            return -1;
+        }
+    }
+    return 0;
+}
+
+void Socket::SetSocketOptions(int fd) {
     if (_tos > 0 &&
         setsockopt(fd, IPPROTO_IP, IP_TOS, &_tos, sizeof(_tos)) != 0) {
         PLOG(ERROR) << "Fail to set tos of fd=" << fd << " to " << _tos;
@@ -618,27 +634,21 @@ int Socket::ResetFileDescriptor(int fd) {
         }
     }
 
-    EnableKeepaliveIfNeeded(fd);
-
-    if (_on_edge_triggered_events) {
-        if (_io_event.AddConsumer(fd) != 0) {
-            PLOG(ERROR) << "Fail to add SocketId=" << id() 
-                        << " into EventDispatcher";
-            _fd.store(-1, butil::memory_order_release);
-            return -1;
+#if defined(OS_LINUX)
+    if (_tcp_user_timeout_ms > 0) {
+        if (setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT,
+                       &_tcp_user_timeout_ms, sizeof(_tcp_user_timeout_ms)) != 
0) {
+            PLOG(ERROR) << "Fail to set TCP_USER_TIMEOUT of fd=" << fd;
         }
     }
-    return 0;
-}
+#endif
 
-void Socket::EnableKeepaliveIfNeeded(int fd) {
     if (!_keepalive_options) {
         return;
     }
 
     int keepalive = 1;
-    if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &keepalive,
-                   sizeof(keepalive)) != 0) {
+    if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &keepalive, 
sizeof(keepalive)) != 0) {
         PLOG(ERROR) << "Fail to set keepalive of fd=" << fd;
         return;
     }
@@ -782,6 +792,7 @@ int Socket::OnCreated(const SocketOptions& options) {
     _last_writetime_us.store(cpuwide_now, butil::memory_order_relaxed);
     _unwritten_bytes.store(0, butil::memory_order_relaxed);
     _keepalive_options = options.keepalive_options;
+    _tcp_user_timeout_ms = options.tcp_user_timeout_ms;
     CHECK(NULL == _write_head.load(butil::memory_order_relaxed));
     _is_write_shutdown = false;
     int fd = options.fd;
@@ -1388,7 +1399,7 @@ int Socket::CheckConnected(int sockfd) {
         butil::EndPoint local_point;
         CHECK_EQ(0, butil::get_local_side(sockfd, &local_point));
         LOG(INFO) << "Connected to " << remote_side()
-                  << " via fd=" << (int)sockfd << " SocketId=" << id()
+                  << " via fd=" << sockfd << " SocketId=" << id()
                   << " local_side=" << local_point;
     }
 
@@ -2501,6 +2512,16 @@ void Socket::DebugSocket(std::ostream& os, SocketId id) {
 #endif
     }
 
+#if defined(OS_LINUX)
+    {
+        int tcp_user_timeout = 0;
+        socklen_t len = sizeof(tcp_user_timeout);
+        if (getsockopt(fd, SOL_TCP, TCP_USER_TIMEOUT, &tcp_user_timeout, &len) 
== 0) {
+            os << "\ntcp_user_timeout=" << tcp_user_timeout;
+        }
+    }
+#endif
+
 #if defined(OS_MACOSX)
     struct tcp_connection_info ti;
     socklen_t len = sizeof(ti);
diff --git a/src/brpc/socket.h b/src/brpc/socket.h
index 2a6ab748..a84c0abd 100644
--- a/src/brpc/socket.h
+++ b/src/brpc/socket.h
@@ -234,60 +234,57 @@ struct SocketSSLContext {
 };
 
 struct SocketKeepaliveOptions {
-    SocketKeepaliveOptions()
-        : keepalive_idle_s(-1)
-        , keepalive_interval_s(-1)
-        , keepalive_count(-1)
-        {}
     // Start keeplives after this period.
-    int keepalive_idle_s;
+    int keepalive_idle_s{-1};
     // Interval between keepalives.
-    int keepalive_interval_s;
+    int keepalive_interval_s{-1};
     // Number of keepalives before death.
-    int keepalive_count;
+    int keepalive_count{-1};
 };
 
 // TODO: Comment fields
 struct SocketOptions {
-    SocketOptions();
-
     // If `fd' is non-negative, set `fd' to be non-blocking and take the
     // ownership. Socket will close the fd(if needed) and call
     // user->BeforeRecycle() before recycling.
-    int fd;
+    int fd{-1};
     butil::EndPoint remote_side;
     // If `connect_on_create' is true and `fd' is less than 0,
     // a client connection will be established to remote_side()
     // regarding deadline `connect_abstime' when Socket is being created.
     // Default: false, means that a connection will be established
     // on first write.
-    bool connect_on_create;
+    bool connect_on_create{false};
     // Default: NULL, means no timeout.
-    const timespec* connect_abstime;
-    SocketUser* user;
+    const timespec* connect_abstime{NULL};
+    SocketUser* user{NULL};
     // When *edge-triggered* events happen on the file descriptor, callback
     // `on_edge_triggered_events' will be called. Inside the callback, user
     // shall read fd() in non-blocking mode until all data has been read
     // or EAGAIN is met, otherwise the callback will not be called again
     // until new data arrives. The callback will not be called from more than
     // one thread at any time.
-    void (*on_edge_triggered_events)(Socket*);
-    int health_check_interval_s;
+    void (*on_edge_triggered_events)(Socket*){NULL};
+    int health_check_interval_s{-1};
     // Only accept ssl connection.
-    bool force_ssl;
+    bool force_ssl{false};
     std::shared_ptr<SocketSSLContext> initial_ssl_ctx;
-    bool use_rdma;
-    bthread_keytable_pool_t* keytable_pool;
-    SocketConnection* conn;
+    bool use_rdma{false};
+    bthread_keytable_pool_t* keytable_pool{NULL};
+    SocketConnection* conn{NULL};
     std::shared_ptr<AppConnect> app_connect;
     // The created socket will set parsing_context with this value.
-    Destroyable* initial_parsing_context;
+    Destroyable* initial_parsing_context{NULL};
 
     // Socket keepalive related options.
     // Refer to `SocketKeepaliveOptions' for details.
     std::shared_ptr<SocketKeepaliveOptions> keepalive_options;
+    // https://github.com/apache/brpc/issues/1154
+    // https://github.com/grpc/grpc/pull/16419/files
+    // Only linux supports TCP_USER_TIMEOUT.
+    int tcp_user_timeout_ms{ -1};
     // Tag of this socket
-    bthread_tag_t bthread_tag;
+    bthread_tag_t bthread_tag{BTHREAD_TAG_DEFAULT};
 };
 
 // Abstractions on reading from and writing into file descriptors.
@@ -725,7 +722,7 @@ private:
 
     int ResetFileDescriptor(int fd);
 
-    void EnableKeepaliveIfNeeded(int fd);
+    void SetSocketOptions(int fd);
 
     // Wait until nref hits `expected_nref' and reset some internal resources.
     int WaitAndReset(int32_t expected_nref);
@@ -973,6 +970,15 @@ private:
     // non-NULL means that keepalive is on.
     std::shared_ptr<SocketKeepaliveOptions> _keepalive_options;
 
+    // Only linux supports TCP_USER_TIMEOUT.
+    // When the value is greater than 0, it specifies the maximum
+    // amount of time in milliseconds that transmitted data may
+    // remain unacknowledged, or bufferred data may remain
+    // untransmitted (due to zero window size) before TCP will
+    // forcibly close the corresponding connection and return
+    // ETIMEDOUT to the application.
+    int _tcp_user_timeout_ms;
+
     HttpMethod _http_request_method;
 };
 
diff --git a/src/brpc/socket_inl.h b/src/brpc/socket_inl.h
index d704b900..ea8a392e 100644
--- a/src/brpc/socket_inl.h
+++ b/src/brpc/socket_inl.h
@@ -23,21 +23,6 @@
 
 namespace brpc {
 
-inline SocketOptions::SocketOptions()
-    : fd(-1)
-    , connect_on_create(false)
-    , connect_abstime(NULL)
-    , user(NULL)
-    , on_edge_triggered_events(NULL)
-    , health_check_interval_s(-1)
-    , force_ssl(false)
-    , use_rdma(false)
-    , keytable_pool(NULL)
-    , conn(NULL)
-    , app_connect(NULL)
-    , initial_parsing_context(NULL)
-    , bthread_tag(BTHREAD_TAG_DEFAULT) {}
-
 inline bool Socket::MoreReadEvents(int* progress) {
     // Fail to CAS means that new events arrived.
     return !_nevent.compare_exchange_strong(
diff --git a/src/brpc/versioned_ref_with_id.h b/src/brpc/versioned_ref_with_id.h
index 23ea0e0a..e7c3ef8d 100644
--- a/src/brpc/versioned_ref_with_id.h
+++ b/src/brpc/versioned_ref_with_id.h
@@ -211,7 +211,7 @@ public:
     // Create a VersionedRefWithId, put the identifier into `id'.
     // `args' will be passed to OnCreated() directly.
     // Returns 0 on success, -1 otherwise.
-    template<typename ... Args>
+    template<typename... Args>
     static int Create(VRefId* id, Args&&... args);
 
     // Place the VersionedRefWithId associated with identifier `id' into
@@ -350,7 +350,7 @@ void DereferenceVersionedRefWithId(T* r) {
 }
 
 template <typename T>
-template<typename ... Args>
+template<typename... Args>
 int VersionedRefWithId<T>::Create(VRefId* id, Args&&... args) {
     resource_id_t slot;
     T* const t = butil::get_resource(&slot, Forbidden());
diff --git a/test/brpc_socket_unittest.cpp b/test/brpc_socket_unittest.cpp
index 3f9b88ad..59075b2b 100644
--- a/test/brpc_socket_unittest.cpp
+++ b/test/brpc_socket_unittest.cpp
@@ -59,6 +59,7 @@ DECLARE_bool(socket_keepalive);
 DECLARE_int32(socket_keepalive_idle_s);
 DECLARE_int32(socket_keepalive_interval_s);
 DECLARE_int32(socket_keepalive_count);
+DECLARE_int32(socket_tcp_user_timeout_ms);
 }
 
 void EchoProcessHuluRequest(brpc::InputMessageBase* msg_base);
@@ -1113,16 +1114,15 @@ TEST_F(SocketTest, keepalive) {
     int default_keepalive_count = 0;
     {
         butil::fd_guard sockfd(socket(AF_INET, SOCK_STREAM, 0));
-        GetKeepaliveValue(sockfd,
-                          default_keepalive,
-                          default_keepalive_idle,
-                          default_keepalive_interval,
-                          default_keepalive_count);
+        ASSERT_GT(sockfd, 0);
+        GetKeepaliveValue(sockfd, default_keepalive, default_keepalive_idle,
+                          default_keepalive_interval, default_keepalive_count);
     }
 
     // Disable keepalive.
     {
-        butil::fd_guard sockfd(socket(AF_INET, SOCK_STREAM, 0));
+        int sockfd = socket(AF_INET, SOCK_STREAM, 0);
+        ASSERT_GT(sockfd, 0);
         brpc::SocketOptions options;
         options.fd = sockfd;
         brpc::SocketId id;
@@ -1130,7 +1130,6 @@ TEST_F(SocketTest, keepalive) {
         brpc::SocketUniquePtr ptr;
         ASSERT_EQ(0, brpc::Socket::Address(id, &ptr));
         CheckNoKeepalive(ptr->fd());
-        sockfd.release();
     }
 
     int keepalive_idle = 1;
@@ -1138,7 +1137,8 @@ TEST_F(SocketTest, keepalive) {
     int keepalive_count = 2;
     // Enable keepalive.
     {
-        butil::fd_guard sockfd(socket(AF_INET, SOCK_STREAM, 0));
+        int sockfd = socket(AF_INET, SOCK_STREAM, 0);
+        ASSERT_GT(sockfd, 0);
         brpc::SocketOptions options;
         options.fd = sockfd;
         options.keepalive_options = 
std::make_shared<brpc::SocketKeepaliveOptions>();
@@ -1146,17 +1146,14 @@ TEST_F(SocketTest, keepalive) {
         ASSERT_EQ(0, brpc::Socket::Create(options, &id));
         brpc::SocketUniquePtr ptr;
         ASSERT_EQ(0, brpc::Socket::Address(id, &ptr));
-        CheckKeepalive(ptr->fd(),
-                       true,
-                       default_keepalive_idle,
-                       default_keepalive_interval,
-                       default_keepalive_count);
-        sockfd.release();
+        CheckKeepalive(ptr->fd(), true, default_keepalive_idle,
+                       default_keepalive_interval, default_keepalive_count);
     }
 
     // Enable keepalive and set keepalive idle.
     {
-        butil::fd_guard sockfd(socket(AF_INET, SOCK_STREAM, 0));
+        int sockfd = socket(AF_INET, SOCK_STREAM, 0);
+        ASSERT_GT(sockfd, 0);
         brpc::SocketOptions options;
         options.fd = sockfd;
         options.keepalive_options = 
std::make_shared<brpc::SocketKeepaliveOptions>();
@@ -1166,17 +1163,15 @@ TEST_F(SocketTest, keepalive) {
         ASSERT_EQ(0, brpc::Socket::Create(options, &id));
         brpc::SocketUniquePtr ptr;
         ASSERT_EQ(0, brpc::Socket::Address(id, &ptr));
-        CheckKeepalive(ptr->fd(),
-                       true,
-                       keepalive_idle,
+        CheckKeepalive(ptr->fd(), true, keepalive_idle,
                        default_keepalive_interval,
                        default_keepalive_count);
-        sockfd.release();
     }
 
     // Enable keepalive and set keepalive interval.
     {
-        butil::fd_guard sockfd(socket(AF_INET, SOCK_STREAM, 0));
+        int sockfd = socket(AF_INET, SOCK_STREAM, 0);
+        ASSERT_GT(sockfd, 0);
         brpc::SocketOptions options;
         options.fd = sockfd;
         options.keepalive_options = 
std::make_shared<brpc::SocketKeepaliveOptions>();
@@ -1186,56 +1181,42 @@ TEST_F(SocketTest, keepalive) {
         ASSERT_EQ(0, brpc::Socket::Create(options, &id));
         brpc::SocketUniquePtr ptr;
         ASSERT_EQ(0, brpc::Socket::Address(id, &ptr));
-        CheckKeepalive(ptr->fd(),
-                       true,
-                       default_keepalive_idle,
-                       keepalive_interval,
-                       default_keepalive_count);
-        sockfd.release();
+        CheckKeepalive(ptr->fd(), true, default_keepalive_idle,
+                       keepalive_interval, default_keepalive_count);
     }
 
     // Enable keepalive and set keepalive count.
     {
-        butil::fd_guard sockfd(socket(AF_INET, SOCK_STREAM, 0));
+        int sockfd = socket(AF_INET, SOCK_STREAM, 0);
+        ASSERT_GT(sockfd, 0);
         brpc::SocketOptions options;
         options.fd = sockfd;
         options.keepalive_options = 
std::make_shared<brpc::SocketKeepaliveOptions>();
-        options.keepalive_options->keepalive_count
-            = keepalive_count;
+        options.keepalive_options->keepalive_count = keepalive_count;
         brpc::SocketId id;
         ASSERT_EQ(0, brpc::Socket::Create(options, &id));
         brpc::SocketUniquePtr ptr;
         ASSERT_EQ(0, brpc::Socket::Address(id, &ptr));
-        CheckKeepalive(ptr->fd(),
-                       true,
-                       default_keepalive_idle,
-                       default_keepalive_interval,
-                       keepalive_count);
-        sockfd.release();
+        CheckKeepalive(ptr->fd(), true, default_keepalive_idle,
+                       default_keepalive_interval, keepalive_count);
     }
 
     // Enable keepalive and set keepalive idle, interval, count.
     {
-        butil::fd_guard sockfd(socket(AF_INET, SOCK_STREAM, 0));
+        int sockfd = socket(AF_INET, SOCK_STREAM, 0);
+        ASSERT_GT(sockfd, 0);
         brpc::SocketOptions options;
         options.fd = sockfd;
         options.keepalive_options = 
std::make_shared<brpc::SocketKeepaliveOptions>();
-        options.keepalive_options->keepalive_idle_s
-            = keepalive_idle;
-        options.keepalive_options->keepalive_interval_s
-            = keepalive_interval;
-        options.keepalive_options->keepalive_count
-            = keepalive_count;
+        options.keepalive_options->keepalive_idle_s = keepalive_idle;
+        options.keepalive_options->keepalive_interval_s = keepalive_interval;
+        options.keepalive_options->keepalive_count = keepalive_count;
         brpc::SocketId id;
         ASSERT_EQ(0, brpc::Socket::Create(options, &id));
         brpc::SocketUniquePtr ptr;
         ASSERT_EQ(0, brpc::Socket::Address(id, &ptr));
-        CheckKeepalive(ptr->fd(),
-                       true,
-                       keepalive_idle,
-                       keepalive_interval,
-                       keepalive_count);
-        sockfd.release();
+        CheckKeepalive(ptr->fd(), true, keepalive_idle,
+                       keepalive_interval, keepalive_count);
     }
 }
 
@@ -1246,101 +1227,83 @@ TEST_F(SocketTest, keepalive_input_message) {
     int default_keepalive_count = 0;
     {
         butil::fd_guard sockfd(socket(AF_INET, SOCK_STREAM, 0));
-        GetKeepaliveValue(sockfd,
-                          default_keepalive,
-                          default_keepalive_idle,
-                          default_keepalive_interval,
-                          default_keepalive_count);
+        ASSERT_GT(sockfd, 0);
+        GetKeepaliveValue(sockfd, default_keepalive, default_keepalive_idle,
+                          default_keepalive_interval, default_keepalive_count);
     }
 
     // Disable keepalive.
     {
-        butil::fd_guard sockfd(socket(AF_INET, SOCK_STREAM, 0));
+        int sockfd = socket(AF_INET, SOCK_STREAM, 0);
+        ASSERT_GT(sockfd, 0);
         brpc::SocketOptions options;
         options.fd = sockfd;
         brpc::SocketId id;
-        ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()
-            ->Create(options, &id));
+        ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()->Create(options, 
&id));
         brpc::SocketUniquePtr ptr;
         ASSERT_EQ(0, brpc::Socket::Address(id, &ptr));
         CheckNoKeepalive(ptr->fd());
-        sockfd.release();
     }
 
     // Enable keepalive.
     brpc::FLAGS_socket_keepalive = true;
     {
-        butil::fd_guard sockfd(socket(AF_INET, SOCK_STREAM, 0));
+        int sockfd = socket(AF_INET, SOCK_STREAM, 0);
+        ASSERT_GT(sockfd, 0);
         brpc::SocketOptions options;
         options.fd = sockfd;
         brpc::SocketId id;
-        ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()
-            ->Create(options, &id));
+        ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()->Create(options, 
&id));
         brpc::SocketUniquePtr ptr;
         ASSERT_EQ(0, brpc::Socket::Address(id, &ptr));
-        CheckKeepalive(ptr->fd(),
-                       true,
-                       default_keepalive_idle,
-                       default_keepalive_interval,
-                       default_keepalive_count);
-        sockfd.release();
+        CheckKeepalive(ptr->fd(), true, default_keepalive_idle,
+                       default_keepalive_interval, default_keepalive_count);
     }
 
     // Enable keepalive and set keepalive idle.
     brpc::FLAGS_socket_keepalive_idle_s = 10;
     {
-        butil::fd_guard sockfd(socket(AF_INET, SOCK_STREAM, 0));
+        int sockfd = socket(AF_INET, SOCK_STREAM, 0);
+        ASSERT_GT(sockfd, 0);
         brpc::SocketOptions options;
         options.fd = sockfd;
         brpc::SocketId id;
-        ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()
-            ->Create(options, &id));
+        ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()->Create(options, 
&id));
         brpc::SocketUniquePtr ptr;
         ASSERT_EQ(0, brpc::Socket::Address(id, &ptr));
-        CheckKeepalive(ptr->fd(),
-                       true,
-                       brpc::FLAGS_socket_keepalive_idle_s,
-                       default_keepalive_interval,
-                       default_keepalive_count);
-        sockfd.release();
+        CheckKeepalive(ptr->fd(), true, brpc::FLAGS_socket_keepalive_idle_s,
+                       default_keepalive_interval, default_keepalive_count);
     }
 
     // Enable keepalive and set keepalive idle, interval.
     brpc::FLAGS_socket_keepalive_interval_s = 10;
     {
-        butil::fd_guard sockfd(socket(AF_INET, SOCK_STREAM, 0));
+        int sockfd = socket(AF_INET, SOCK_STREAM, 0);
+        ASSERT_GT(sockfd, 0);
         brpc::SocketOptions options;
         options.fd = sockfd;
         brpc::SocketId id;
-        ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()
-            ->Create(options, &id));
+        ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()->Create(options, 
&id));
         brpc::SocketUniquePtr ptr;
         ASSERT_EQ(0, brpc::Socket::Address(id, &ptr));
-        CheckKeepalive(ptr->fd(),
-                       true,
-                       brpc::FLAGS_socket_keepalive_idle_s,
-                       brpc::FLAGS_socket_keepalive_interval_s,
-                       default_keepalive_count);
-        sockfd.release();
+        CheckKeepalive(ptr->fd(), true, brpc::FLAGS_socket_keepalive_idle_s,
+                       brpc::FLAGS_socket_keepalive_interval_s, 
default_keepalive_count);
     }
 
     // Enable keepalive and set keepalive idle, interval, count.
     brpc::FLAGS_socket_keepalive_count = 10;
     {
-        butil::fd_guard sockfd(socket(AF_INET, SOCK_STREAM, 0));
+        int sockfd = socket(AF_INET, SOCK_STREAM, 0);
+        ASSERT_GT(sockfd, 0);
         brpc::SocketOptions options;
         options.fd = sockfd;
         brpc::SocketId id;
-        ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()
-            ->Create(options, &id));
+        ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()->Create(options, 
&id));
         brpc::SocketUniquePtr ptr;
         ASSERT_EQ(0, brpc::Socket::Address(id, &ptr));
-        CheckKeepalive(ptr->fd(),
-                       true,
-                       brpc::FLAGS_socket_keepalive_idle_s,
+        CheckKeepalive(ptr->fd(), true, brpc::FLAGS_socket_keepalive_idle_s,
                        brpc::FLAGS_socket_keepalive_interval_s,
                        brpc::FLAGS_socket_keepalive_count);
-        sockfd.release();
     }
 
     // Options of keepalive set by user have priority over Gflags.
@@ -1348,90 +1311,132 @@ TEST_F(SocketTest, keepalive_input_message) {
     int keepalive_interval = 2;
     int keepalive_count = 2;
     {
-        butil::fd_guard sockfd(socket(AF_INET, SOCK_STREAM, 0));
+        int sockfd = socket(AF_INET, SOCK_STREAM, 0);
+        ASSERT_GT(sockfd, 0);
         brpc::SocketOptions options;
         options.fd = sockfd;
         options.keepalive_options = 
std::make_shared<brpc::SocketKeepaliveOptions>();
-        options.keepalive_options->keepalive_idle_s
-            = keepalive_idle;
+        options.keepalive_options->keepalive_idle_s = keepalive_idle;
         brpc::SocketId id;
-        ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()
-            ->Create(options, &id));
+        ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()->Create(options, 
&id));
         brpc::SocketUniquePtr ptr;
         ASSERT_EQ(0, brpc::Socket::Address(id, &ptr));
-        CheckKeepalive(ptr->fd(),
-                       true,
-                       keepalive_idle,
+        CheckKeepalive(ptr->fd(), true, keepalive_idle,
                        brpc::FLAGS_socket_keepalive_interval_s,
                        brpc::FLAGS_socket_keepalive_count);
-        sockfd.release();
     }
 
     {
-        butil::fd_guard sockfd(socket(AF_INET, SOCK_STREAM, 0));
+        int sockfd = socket(AF_INET, SOCK_STREAM, 0);
+        ASSERT_GT(sockfd, 0);
         brpc::SocketOptions options;
         options.fd = sockfd;
         options.keepalive_options = 
std::make_shared<brpc::SocketKeepaliveOptions>();
-        options.keepalive_options->keepalive_interval_s
-            = keepalive_interval;
+        options.keepalive_options->keepalive_interval_s = keepalive_interval;
         brpc::SocketId id;
-        ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()
-            ->Create(options, &id));
+        ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()->Create(options, 
&id));
         brpc::SocketUniquePtr ptr;
         ASSERT_EQ(0, brpc::Socket::Address(id, &ptr));
-        CheckKeepalive(ptr->fd(),
-                       true,
-                       brpc::FLAGS_socket_keepalive_idle_s,
-                       keepalive_interval,
-                       brpc::FLAGS_socket_keepalive_count);
-        sockfd.release();
+        CheckKeepalive(ptr->fd(), true, brpc::FLAGS_socket_keepalive_idle_s,
+                       keepalive_interval, brpc::FLAGS_socket_keepalive_count);
     }
 
     {
-        butil::fd_guard sockfd(socket(AF_INET, SOCK_STREAM, 0));
+        int sockfd = socket(AF_INET, SOCK_STREAM, 0);
+        ASSERT_GT(sockfd, 0);
         brpc::SocketOptions options;
         options.fd = sockfd;
         options.keepalive_options = 
std::make_shared<brpc::SocketKeepaliveOptions>();
-        options.keepalive_options->keepalive_count
-            = keepalive_count;
+        options.keepalive_options->keepalive_count = keepalive_count;
         brpc::SocketId id;
-        ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()
-            ->Create(options, &id));
+        ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()->Create(options, 
&id));
         brpc::SocketUniquePtr ptr;
         ASSERT_EQ(0, brpc::Socket::Address(id, &ptr));
-        CheckKeepalive(ptr->fd(),
-                       true,
-                       brpc::FLAGS_socket_keepalive_idle_s,
-                       brpc::FLAGS_socket_keepalive_interval_s,
-                       keepalive_count);
-        sockfd.release();
+        CheckKeepalive(ptr->fd(), true, brpc::FLAGS_socket_keepalive_idle_s,
+                       brpc::FLAGS_socket_keepalive_interval_s, 
keepalive_count);
     }
 
     {
-        butil::fd_guard sockfd(socket(AF_INET, SOCK_STREAM, 0));
+        int sockfd = socket(AF_INET, SOCK_STREAM, 0);
+        ASSERT_GT(sockfd, 0);
         brpc::SocketOptions options;
         options.fd = sockfd;
         options.keepalive_options = 
std::make_shared<brpc::SocketKeepaliveOptions>();
-        options.keepalive_options->keepalive_idle_s
-            = keepalive_idle;
-        options.keepalive_options->keepalive_interval_s
-            = keepalive_interval;
-        options.keepalive_options->keepalive_count
-            = keepalive_count;
+        options.keepalive_options->keepalive_idle_s = keepalive_idle;
+        options.keepalive_options->keepalive_interval_s = keepalive_interval;
+        options.keepalive_options->keepalive_count = keepalive_count;
         brpc::SocketId id;
-        ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()
-            ->Create(options, &id));
+        ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()->Create(options, 
&id));
         brpc::SocketUniquePtr ptr;
         ASSERT_EQ(0, brpc::Socket::Address(id, &ptr));
-        CheckKeepalive(ptr->fd(),
-                       true,
-                       keepalive_idle,
-                       keepalive_interval,
-                       keepalive_count);
-        sockfd.release();
+        CheckKeepalive(ptr->fd(), true, keepalive_idle,
+                       keepalive_interval, keepalive_count);
     }
 }
 
+#if defined(OS_LINUX)
+void CheckTCPUserTimeout(int fd, int expect_tcp_user_timeout) {
+    int tcp_user_timeout = 0;
+    socklen_t len = sizeof(tcp_user_timeout);
+    ASSERT_EQ(0, getsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, 
&tcp_user_timeout, &len) );
+    ASSERT_EQ(tcp_user_timeout, expect_tcp_user_timeout);
+}
+
+TEST_F(SocketTest, tcp_user_timeout) {
+    {
+        int sockfd = socket(AF_INET, SOCK_STREAM, 0);
+        ASSERT_GT(sockfd, 0);
+        brpc::SocketOptions options;
+        options.fd = sockfd;
+        brpc::SocketId id;
+        ASSERT_EQ(0, brpc::Socket::Create(options, &id));
+        brpc::SocketUniquePtr ptr;
+        ASSERT_EQ(0, brpc::Socket::Address(id, &ptr));
+        CheckTCPUserTimeout(ptr->fd(), 0);
+    }
+
+    {
+        int tcp_user_timeout_ms = 1000;
+        int sockfd = socket(AF_INET, SOCK_STREAM, 0);
+        ASSERT_GT(sockfd, 0);
+        brpc::SocketOptions options;
+        options.fd = sockfd;
+        options.tcp_user_timeout_ms = tcp_user_timeout_ms;
+        brpc::SocketId id;
+        ASSERT_EQ(0, brpc::Socket::Create(options, &id));
+        brpc::SocketUniquePtr ptr;
+        ASSERT_EQ(0, brpc::Socket::Address(id, &ptr));
+        CheckTCPUserTimeout(ptr->fd(), tcp_user_timeout_ms);
+    }
+
+    brpc::FLAGS_socket_tcp_user_timeout_ms = 2000;
+    {
+        int sockfd = socket(AF_INET, SOCK_STREAM, 0);
+        ASSERT_GT(sockfd, 0);
+        brpc::SocketOptions options;
+        options.fd = sockfd;
+        brpc::SocketId id;
+        ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()->Create(options, 
&id));
+        brpc::SocketUniquePtr ptr;
+        ASSERT_EQ(0, brpc::Socket::Address(id, &ptr));
+        CheckTCPUserTimeout(ptr->fd(), brpc::FLAGS_socket_tcp_user_timeout_ms);
+    }
+    {
+        int tcp_user_timeout_ms = 3000;
+        int sockfd = socket(AF_INET, SOCK_STREAM, 0);
+        ASSERT_GT(sockfd, 0);
+        brpc::SocketOptions options;
+        options.fd = sockfd;
+        options.tcp_user_timeout_ms = tcp_user_timeout_ms;
+        brpc::SocketId id;
+        ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()->Create(options, 
&id));
+        brpc::SocketUniquePtr ptr;
+        ASSERT_EQ(0, brpc::Socket::Address(id, &ptr));
+        CheckTCPUserTimeout(ptr->fd(), tcp_user_timeout_ms);
+    }
+}
+#endif
+
 int HandleSocketSuccessWrite(bthread_id_t id, void* data, int error_code,
     const std::string& error_text) {
     auto success_count = static_cast<size_t*>(data);


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org
For additional commands, e-mail: dev-h...@brpc.apache.org

Reply via email to