This is an automated email from the ASF dual-hosted git repository.

guangmingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c4f2a89 Fix some unstable UTs (#2928)
0c4f2a89 is described below

commit 0c4f2a897cf159cb900ac4a86636fde0b72395a2
Author: Bright Chen <chenguangmin...@foxmail.com>
AuthorDate: Fri Mar 28 23:13:42 2025 +0800

    Fix some unstable UTs (#2928)
---
 src/brpc/socket.cpp                    |   2 +-
 test/brpc_builtin_service_unittest.cpp |  17 +++++-
 test/brpc_redis_unittest.cpp           |  76 ++++++++---------------
 test/brpc_socket_unittest.cpp          | 107 ++++++++++++++++++++++-----------
 test/brpc_streaming_rpc_unittest.cpp   |  28 ++++-----
 test/bthread_fd_unittest.cpp           |   2 +-
 test/bthread_unittest.cpp              | 107 +++++++++++++++++++++------------
 test/endpoint_unittest.cpp             |   4 +-
 8 files changed, 194 insertions(+), 149 deletions(-)

diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp
index 5e969fc8..e7cc336a 100644
--- a/src/brpc/socket.cpp
+++ b/src/brpc/socket.cpp
@@ -812,7 +812,7 @@ int Socket::OnCreated(const SocketOptions& options) {
     }
     // Must be the last one! Internal fields of this Socket may be accessed
     // just after calling ResetFileDescriptor.
-    if (ResetFileDescriptor(options.fd) != 0) {
+    if (ResetFileDescriptor(fd) != 0) {
         const int saved_errno = errno;
         PLOG(ERROR) << "Fail to ResetFileDescriptor";
         SetFailed(saved_errno, "Fail to ResetFileDescriptor: %s",
diff --git a/test/brpc_builtin_service_unittest.cpp 
b/test/brpc_builtin_service_unittest.cpp
index 638e5d4c..76b99811 100644
--- a/test/brpc_builtin_service_unittest.cpp
+++ b/test/brpc_builtin_service_unittest.cpp
@@ -841,8 +841,10 @@ void* dummy_bthread(void*) {
 
 
 #ifdef BRPC_BTHREAD_TRACER
+bool g_bthread_trace_start = false;
 bool g_bthread_trace_stop = false;
 void* bthread_trace(void*) {
+    g_bthread_trace_start = true;
     while (!g_bthread_trace_stop) {
         bthread_usleep(1000 * 100);
     }
@@ -883,9 +885,13 @@ TEST_F(BuiltinServiceTest, bthreads) {
     }
 
 #ifdef BRPC_BTHREAD_TRACER
-    {
+    bool ok = false;
+    for (int i = 0; i < 10; ++i) {
         bthread_t th;
         EXPECT_EQ(0, bthread_start_background(&th, NULL, bthread_trace, NULL));
+        while (!g_bthread_trace_start) {
+            bthread_usleep(1000 * 10);
+        }
         ClosureChecker done;
         brpc::Controller cntl;
         std::string id_string;
@@ -895,9 +901,14 @@ TEST_F(BuiltinServiceTest, bthreads) {
         service.default_method(&cntl, &req, &res, &done);
         g_bthread_trace_stop = true;
         EXPECT_FALSE(cntl.Failed());
-        CheckContent(cntl, "stop=0");
-        CheckContent(cntl, "bthread_trace");
+        const std::string& content = cntl.response_attachment().to_string();
+        ok = content.find("stop=0") != std::string::npos &&
+             content.find("bthread_trace") != std::string::npos;
+        if (ok) {
+            break;
+        }
     }
+    ASSERT_TRUE(ok);
 #endif // BRPC_BTHREAD_TRACER
 }
 
diff --git a/test/brpc_redis_unittest.cpp b/test/brpc_redis_unittest.cpp
index 017d5c7e..7a48d19a 100644
--- a/test/brpc_redis_unittest.cpp
+++ b/test/brpc_redis_unittest.cpp
@@ -814,10 +814,9 @@ std::unordered_map<std::string, int64_t> int_map;
 
 class RedisServiceImpl : public brpc::RedisService {
 public:
-    RedisServiceImpl()
+    RedisServiceImpl(std::string password)
         : _batch_count(0)
-        , _user("user1")
-        , _password("password1") {}
+        , _password(std::move(password)) {}
 
     brpc::RedisCommandHandlerResult OnBatched(const 
std::vector<butil::StringPiece>& args,
                    brpc::RedisReply* output, bool flush_batched) {
@@ -867,21 +866,19 @@ public:
 
     std::vector<std::vector<std::string> > _batched_command;
     int _batch_count;
-    std::string _user;
     std::string _password;
 };
 
 
 class AuthSession : public brpc::Destroyable {
 public:
-    explicit AuthSession(const std::string& user_name, const std::string& 
password)
-        : _user_name(user_name), _password(password) {}  
+    explicit AuthSession(std::string password)
+        :  _password(std::move(password)) {}
 
     void Destroy() override {
         delete this;
     }   
 
-    const std::string _user_name;
     const std::string _password;
 };
 
@@ -894,17 +891,16 @@ public:
                                         const std::vector<butil::StringPiece>& 
args,
                                         brpc::RedisReply* output,
                                         bool flush_batched) {
-        if (args.size() < 2) {
+        if (args.size() < 1) {
             output->SetError("ERR wrong number of arguments for 'AUTH' 
command");
             return brpc::REDIS_CMD_HANDLED;
         }
-        const std::string user(args[1].data(), args[1].size());
-        const std::string password(args[2].data(), args[2].size());
-        if (_rs->_user != user || _rs->_password != password) {
+        const std::string password(args[1].data(), args[1].size());
+        if (_rs->_password != password) {
             output->SetError("ERR invalid username/password");
             return brpc::REDIS_CMD_HANDLED;
         }
-        auto auth_session = new AuthSession(user, password);
+        auto auth_session = new AuthSession(password);
         ctx->reset_session(auth_session);
         output->SetStatus("OK");
         return brpc::REDIS_CMD_HANDLED;
@@ -929,7 +925,7 @@ public:
             return brpc::REDIS_CMD_HANDLED;
         }
         AuthSession* session = static_cast<AuthSession*>(ctx->session);
-        if (!session || (session->_password != _rs->_password) || 
(session->_user_name != _rs->_user)) {
+        if (!session || (session->_password != _rs->_password)) {
             output->SetError("ERR no auth");
             return brpc::REDIS_CMD_HANDLED;
         }
@@ -971,7 +967,7 @@ public:
             return brpc::REDIS_CMD_HANDLED;
         }
         AuthSession* session = static_cast<AuthSession*>(ctx->session);
-        if (!session || (session->_password != _rs->_password) || 
(session->_user_name != _rs->_user)) {
+        if (session->_password != _rs->_password) {
                output->SetError("ERR no auth");
             return brpc::REDIS_CMD_HANDLED;
         }
@@ -1015,7 +1011,7 @@ public:
             return brpc::REDIS_CMD_HANDLED;
         }
         AuthSession* session = static_cast<AuthSession*>(ctx->session);
-        if (!session || (session->_password != _rs->_password) || 
(session->_user_name != _rs->_user)) {
+        if (session->_password != _rs->_password) {
             output->SetError("ERR no auth");
             return brpc::REDIS_CMD_HANDLED;
         }
@@ -1036,9 +1032,10 @@ private:
 };
 
 TEST_F(RedisTest, server_sanity) {
+    std::string password = GeneratePassword();
     brpc::Server server;
     brpc::ServerOptions server_options;
-    RedisServiceImpl* rsimpl = new RedisServiceImpl;
+    RedisServiceImpl* rsimpl = new RedisServiceImpl(password);
     GetCommandHandler *gh = new GetCommandHandler(rsimpl);
     SetCommandHandler *sh = new SetCommandHandler(rsimpl);
     AuthCommandHandler *ah = new AuthCommandHandler(rsimpl);
@@ -1053,21 +1050,13 @@ TEST_F(RedisTest, server_sanity) {
 
     brpc::ChannelOptions options;
     options.protocol = brpc::PROTOCOL_REDIS;
+    options.auth = new brpc::policy::RedisAuthenticator(password);
     brpc::Channel channel;
     ASSERT_EQ(0, channel.Init("127.0.0.1", server.listen_address().port, 
&options));
 
     brpc::RedisRequest request;
     brpc::RedisResponse response;
     brpc::Controller cntl;
-    ASSERT_TRUE(request.AddCommand("auth user1 password1"));
-    channel.CallMethod(NULL, &cntl, &request, &response, NULL);
-    ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
-    ASSERT_EQ(1, response.reply_size());
-    ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(0).type());
-    ASSERT_STREQ("OK", response.reply(0).c_str());
-    request.Clear();
-    response.Clear();
-    cntl.Reset();
     ASSERT_TRUE(request.AddCommand("get hello"));
     ASSERT_TRUE(request.AddCommand("get hello2"));
     ASSERT_TRUE(request.AddCommand("set key1 value1"));
@@ -1122,13 +1111,6 @@ TEST_F(RedisTest, server_sanity) {
 
 void* incr_thread(void* arg) {
     brpc::Channel* c = static_cast<brpc::Channel*>(arg);
-    // do auth
-    brpc::RedisRequest auth_req;
-    brpc::RedisResponse auth_resp;
-    brpc::Controller auth_cntl;
-    EXPECT_TRUE(auth_req.AddCommand("auth user1 password1"));
-    c->CallMethod(NULL, &auth_cntl, &auth_req, &auth_resp, NULL);
-    EXPECT_FALSE(auth_cntl.Failed()) << auth_cntl.ErrorText();
     for (int i = 0; i < 5000; ++i) {
         brpc::RedisRequest request;
         brpc::RedisResponse response;
@@ -1137,16 +1119,17 @@ void* incr_thread(void* arg) {
         c->CallMethod(NULL, &cntl, &request, &response, NULL);
         EXPECT_FALSE(cntl.Failed()) << cntl.ErrorText();
         EXPECT_EQ(1, response.reply_size());
-        EXPECT_TRUE(response.reply(0).is_integer());
+        EXPECT_TRUE(response.reply(0).is_integer()) << response.reply(0);
     }
     return NULL;
 }
 
 TEST_F(RedisTest, server_concurrency) {
+    std::string password = GeneratePassword();
     int N = 10;
     brpc::Server server;
     brpc::ServerOptions server_options;
-    RedisServiceImpl* rsimpl = new RedisServiceImpl;
+    RedisServiceImpl* rsimpl = new RedisServiceImpl(password);
     AuthCommandHandler *ah = new AuthCommandHandler(rsimpl);
     IncrCommandHandler *ih = new IncrCommandHandler(rsimpl);
     rsimpl->AddCommandHandler("incr", ih);
@@ -1158,6 +1141,7 @@ TEST_F(RedisTest, server_concurrency) {
     brpc::ChannelOptions options;
     options.protocol = brpc::PROTOCOL_REDIS;
     options.connection_type = "pooled";
+    options.auth = new brpc::policy::RedisAuthenticator(password);
     std::vector<bthread_t> bths;
     std::vector<brpc::Channel*> channels;
     for (int i = 0; i < N; ++i) {
@@ -1228,9 +1212,10 @@ public:
 };
 
 TEST_F(RedisTest, server_command_continue) {
+    std::string password = GeneratePassword();
     brpc::Server server;
     brpc::ServerOptions server_options;
-    RedisServiceImpl* rsimpl = new RedisServiceImpl;
+    RedisServiceImpl* rsimpl = new RedisServiceImpl(password);
     rsimpl->AddCommandHandler("auth", new AuthCommandHandler(rsimpl));
     rsimpl->AddCommandHandler("get", new GetCommandHandler(rsimpl));
     rsimpl->AddCommandHandler("set", new SetCommandHandler(rsimpl));
@@ -1242,16 +1227,9 @@ TEST_F(RedisTest, server_command_continue) {
 
     brpc::ChannelOptions options;
     options.protocol = brpc::PROTOCOL_REDIS;
+    options.auth = new brpc::policy::RedisAuthenticator(password);
     brpc::Channel channel;
     ASSERT_EQ(0, channel.Init("127.0.0.1", server.listen_address().port, 
&options));
-    // do auth
-    brpc::RedisRequest auth_req;
-    brpc::RedisResponse auth_resp;
-    brpc::Controller auth_cntl;
-    ASSERT_TRUE(auth_req.AddCommand("auth user1 password1"));
-    channel.CallMethod(NULL, &auth_cntl, &auth_req, &auth_resp, NULL);
-    ASSERT_FALSE(auth_cntl.Failed()) << auth_cntl.ErrorText();
-
     {
         brpc::RedisRequest request;
         brpc::RedisResponse response;
@@ -1311,9 +1289,10 @@ TEST_F(RedisTest, server_command_continue) {
 }
 
 TEST_F(RedisTest, server_handle_pipeline) {
+    std::string password = GeneratePassword();
     brpc::Server server;
     brpc::ServerOptions server_options;
-    RedisServiceImpl* rsimpl = new RedisServiceImpl;
+    RedisServiceImpl* rsimpl = new RedisServiceImpl(password);
     GetCommandHandler* getch = new GetCommandHandler(rsimpl, true);
     SetCommandHandler* setch = new SetCommandHandler(rsimpl, true);
     AuthCommandHandler* authch = new AuthCommandHandler(rsimpl);
@@ -1327,20 +1306,13 @@ TEST_F(RedisTest, server_handle_pipeline) {
 
     brpc::ChannelOptions options;
     options.protocol = brpc::PROTOCOL_REDIS;
+    options.auth = new brpc::policy::RedisAuthenticator(password);
     brpc::Channel channel;
     ASSERT_EQ(0, channel.Init("127.0.0.1", server.listen_address().port, 
&options));
 
     brpc::RedisRequest request;
     brpc::RedisResponse response;
     brpc::Controller cntl;
-    ASSERT_TRUE(request.AddCommand("auth user1 password1"));
-    channel.CallMethod(NULL, &cntl, &request, &response, NULL);
-    ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
-    ASSERT_EQ(1, response.reply_size());
-    ASSERT_STREQ("OK", response.reply(0).c_str());
-    request.Clear();
-    response.Clear();
-    cntl.Reset();
     ASSERT_TRUE(request.AddCommand("set key1 v1"));
     ASSERT_TRUE(request.AddCommand("set key2 v2"));
     ASSERT_TRUE(request.AddCommand("set key3 v3"));
diff --git a/test/brpc_socket_unittest.cpp b/test/brpc_socket_unittest.cpp
index 78b83503..0f35863f 100644
--- a/test/brpc_socket_unittest.cpp
+++ b/test/brpc_socket_unittest.cpp
@@ -335,10 +335,17 @@ TEST_F(SocketTest, single_threaded_connect_and_write) {
           EchoProcessHuluRequest, NULL, NULL, "dummy_hulu" }
     };
 
+    int listening_fd = -1;
     butil::EndPoint point(butil::IP_ANY, 7878);
-    int listening_fd = tcp_listen(point);
-    ASSERT_TRUE(listening_fd > 0);
-    butil::make_non_blocking(listening_fd);
+    for (int i = 0; i < 100; ++i) {
+        point.port += i;
+        listening_fd = tcp_listen(point);
+        if (listening_fd >= 0) {
+            break;
+        }
+    }
+    ASSERT_GT(listening_fd, 0) << berror();
+    ASSERT_EQ(0, butil::make_non_blocking(listening_fd));
     ASSERT_EQ(0, messenger->AddHandler(pairs[0]));
     ASSERT_EQ(0, messenger->StartAccept(listening_fd, -1, NULL, false));
 
@@ -1130,6 +1137,7 @@ TEST_F(SocketTest, keepalive) {
         brpc::SocketUniquePtr ptr;
         ASSERT_EQ(0, brpc::Socket::Address(id, &ptr));
         CheckNoKeepalive(ptr->fd());
+        ASSERT_EQ(0, ptr->SetFailed());
     }
 
     int keepalive_idle = 1;
@@ -1148,6 +1156,7 @@ TEST_F(SocketTest, keepalive) {
         ASSERT_EQ(0, brpc::Socket::Address(id, &ptr));
         CheckKeepalive(ptr->fd(), true, default_keepalive_idle,
                        default_keepalive_interval, default_keepalive_count);
+        ASSERT_EQ(0, ptr->SetFailed());
     }
 
     // Enable keepalive and set keepalive idle.
@@ -1157,8 +1166,7 @@ TEST_F(SocketTest, keepalive) {
         brpc::SocketOptions options;
         options.fd = sockfd;
         options.keepalive_options = 
std::make_shared<brpc::SocketKeepaliveOptions>();
-        options.keepalive_options->keepalive_idle_s
-            = keepalive_idle;
+        options.keepalive_options->keepalive_idle_s = keepalive_idle;
         brpc::SocketId id;
         ASSERT_EQ(0, brpc::Socket::Create(options, &id));
         brpc::SocketUniquePtr ptr;
@@ -1166,6 +1174,7 @@ TEST_F(SocketTest, keepalive) {
         CheckKeepalive(ptr->fd(), true, keepalive_idle,
                        default_keepalive_interval,
                        default_keepalive_count);
+        ASSERT_EQ(0, ptr->SetFailed());
     }
 
     // Enable keepalive and set keepalive interval.
@@ -1175,14 +1184,14 @@ TEST_F(SocketTest, keepalive) {
         brpc::SocketOptions options;
         options.fd = sockfd;
         options.keepalive_options = 
std::make_shared<brpc::SocketKeepaliveOptions>();
-        options.keepalive_options->keepalive_interval_s
-            = keepalive_interval;
+        options.keepalive_options->keepalive_interval_s = keepalive_interval;
         brpc::SocketId id;
         ASSERT_EQ(0, brpc::Socket::Create(options, &id));
         brpc::SocketUniquePtr ptr;
         ASSERT_EQ(0, brpc::Socket::Address(id, &ptr));
         CheckKeepalive(ptr->fd(), true, default_keepalive_idle,
                        keepalive_interval, default_keepalive_count);
+        ASSERT_EQ(0, ptr->SetFailed());
     }
 
     // Enable keepalive and set keepalive count.
@@ -1199,6 +1208,7 @@ TEST_F(SocketTest, keepalive) {
         ASSERT_EQ(0, brpc::Socket::Address(id, &ptr));
         CheckKeepalive(ptr->fd(), true, default_keepalive_idle,
                        default_keepalive_interval, keepalive_count);
+        ASSERT_EQ(0, ptr->SetFailed());
     }
 
     // Enable keepalive and set keepalive idle, interval, count.
@@ -1217,10 +1227,25 @@ TEST_F(SocketTest, keepalive) {
         ASSERT_EQ(0, brpc::Socket::Address(id, &ptr));
         CheckKeepalive(ptr->fd(), true, keepalive_idle,
                        keepalive_interval, keepalive_count);
+        ASSERT_EQ(0, ptr->SetFailed());
     }
 }
 
 TEST_F(SocketTest, keepalive_input_message) {
+    brpc::Acceptor* messenger = new brpc::Acceptor;
+    int listening_fd = -1;
+    butil::EndPoint point(butil::IP_ANY, 7878);
+    for (int i = 0; i < 100; ++i) {
+        point.port += i;
+        listening_fd = tcp_listen(point);
+        if (listening_fd >= 0) {
+            break;
+        }
+    }
+    ASSERT_GT(listening_fd, 0) << berror();
+    ASSERT_EQ(0, butil::make_non_blocking(listening_fd));
+    ASSERT_EQ(0, messenger->StartAccept(listening_fd, -1, NULL, false));
+
     int default_keepalive = 0;
     int default_keepalive_idle = 0;
     int default_keepalive_interval = 0;
@@ -1234,76 +1259,81 @@ TEST_F(SocketTest, keepalive_input_message) {
 
     // Disable keepalive.
     {
-        int sockfd = socket(AF_INET, SOCK_STREAM, 0);
-        ASSERT_GT(sockfd, 0);
         brpc::SocketOptions options;
-        options.fd = sockfd;
+        options.remote_side = point;
+        options.connect_on_create = true;
         brpc::SocketId id = brpc::INVALID_SOCKET_ID;
         ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()->Create(options, 
&id));
         brpc::SocketUniquePtr ptr;
         ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)) << "id=" << id;
+        ASSERT_GT(ptr->fd(), 0);
         CheckNoKeepalive(ptr->fd());
+        ASSERT_EQ(0, ptr->SetFailed());
     }
 
     // Enable keepalive.
     brpc::FLAGS_socket_keepalive = true;
     {
-        int sockfd = socket(AF_INET, SOCK_STREAM, 0);
-        ASSERT_GT(sockfd, 0);
         brpc::SocketOptions options;
-        options.fd = sockfd;
+        options.remote_side = point;
+        options.connect_on_create = true;
         brpc::SocketId id = brpc::INVALID_SOCKET_ID;
         ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()->Create(options, 
&id));
         brpc::SocketUniquePtr ptr;
         ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)) << "id=" << id;
+        ASSERT_GT(ptr->fd(), 0);
         CheckKeepalive(ptr->fd(), true, default_keepalive_idle,
                        default_keepalive_interval, default_keepalive_count);
+        ASSERT_EQ(0, ptr->SetFailed());
     }
 
     // Enable keepalive and set keepalive idle.
     brpc::FLAGS_socket_keepalive_idle_s = 10;
     {
-        int sockfd = socket(AF_INET, SOCK_STREAM, 0);
-        ASSERT_GT(sockfd, 0);
         brpc::SocketOptions options;
-        options.fd = sockfd;
+        options.remote_side = point;
+        options.connect_on_create = true;
         brpc::SocketId id = brpc::INVALID_SOCKET_ID;
         ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()->Create(options, 
&id));
         brpc::SocketUniquePtr ptr;
         ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)) << "id=" << id;
+        ASSERT_GT(ptr->fd(), 0);
         CheckKeepalive(ptr->fd(), true, brpc::FLAGS_socket_keepalive_idle_s,
                        default_keepalive_interval, default_keepalive_count);
+        ASSERT_EQ(0, ptr->SetFailed());
     }
 
     // Enable keepalive and set keepalive idle, interval.
     brpc::FLAGS_socket_keepalive_interval_s = 10;
     {
-        int sockfd = socket(AF_INET, SOCK_STREAM, 0);
-        ASSERT_GT(sockfd, 0);
         brpc::SocketOptions options;
-        options.fd = sockfd;
+        options.remote_side = point;
+        options.connect_on_create = true;
         brpc::SocketId id = brpc::INVALID_SOCKET_ID;
         ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()->Create(options, 
&id));
         brpc::SocketUniquePtr ptr;
         ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)) << "id=" << id;
+        ASSERT_GT(ptr->fd(), 0);
         CheckKeepalive(ptr->fd(), true, brpc::FLAGS_socket_keepalive_idle_s,
                        brpc::FLAGS_socket_keepalive_interval_s, 
default_keepalive_count);
+        ASSERT_EQ(0, ptr->SetFailed());
     }
 
     // Enable keepalive and set keepalive idle, interval, count.
     brpc::FLAGS_socket_keepalive_count = 10;
     {
-        int sockfd = socket(AF_INET, SOCK_STREAM, 0);
-        ASSERT_GT(sockfd, 0);
         brpc::SocketOptions options;
-        options.fd = sockfd;
+        options.remote_side = point;
+        options.connect_on_create = true;
         brpc::SocketId id = brpc::INVALID_SOCKET_ID;
         ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()->Create(options, 
&id));
         brpc::SocketUniquePtr ptr;
         ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)) << "id=" << id;
+        ASSERT_GT(ptr->fd(), 0);
         CheckKeepalive(ptr->fd(), true, brpc::FLAGS_socket_keepalive_idle_s,
                        brpc::FLAGS_socket_keepalive_interval_s,
                        brpc::FLAGS_socket_keepalive_count);
+        ASSERT_EQ(0, ptr->SetFailed());
     }
 
     // Options of keepalive set by user have priority over Gflags.
@@ -1311,56 +1341,58 @@ TEST_F(SocketTest, keepalive_input_message) {
     int keepalive_interval = 2;
     int keepalive_count = 2;
     {
-        int sockfd = socket(AF_INET, SOCK_STREAM, 0);
-        ASSERT_GT(sockfd, 0);
         brpc::SocketOptions options;
-        options.fd = sockfd;
+        options.remote_side = point;
+        options.connect_on_create = true;
         options.keepalive_options = 
std::make_shared<brpc::SocketKeepaliveOptions>();
         options.keepalive_options->keepalive_idle_s = keepalive_idle;
         brpc::SocketId id = brpc::INVALID_SOCKET_ID;
         ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()->Create(options, 
&id));
         brpc::SocketUniquePtr ptr;
         ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)) << "id=" << id;
+        ASSERT_GT(ptr->fd(), 0);
         CheckKeepalive(ptr->fd(), true, keepalive_idle,
                        brpc::FLAGS_socket_keepalive_interval_s,
                        brpc::FLAGS_socket_keepalive_count);
+        ASSERT_EQ(0, ptr->SetFailed());
     }
 
     {
-        int sockfd = socket(AF_INET, SOCK_STREAM, 0);
-        ASSERT_GT(sockfd, 0);
         brpc::SocketOptions options;
-        options.fd = sockfd;
+        options.remote_side = point;
+        options.connect_on_create = true;
         options.keepalive_options = 
std::make_shared<brpc::SocketKeepaliveOptions>();
         options.keepalive_options->keepalive_interval_s = keepalive_interval;
         brpc::SocketId id = brpc::INVALID_SOCKET_ID;
         ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()->Create(options, 
&id));
         brpc::SocketUniquePtr ptr;
         ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)) << "id=" << id;
+        ASSERT_GT(ptr->fd(), 0);
         CheckKeepalive(ptr->fd(), true, brpc::FLAGS_socket_keepalive_idle_s,
                        keepalive_interval, brpc::FLAGS_socket_keepalive_count);
+        ASSERT_EQ(0, ptr->SetFailed());
     }
 
     {
-        int sockfd = socket(AF_INET, SOCK_STREAM, 0);
-        ASSERT_GT(sockfd, 0);
         brpc::SocketOptions options;
-        options.fd = sockfd;
+        options.remote_side = point;
+        options.connect_on_create = true;
         options.keepalive_options = 
std::make_shared<brpc::SocketKeepaliveOptions>();
         options.keepalive_options->keepalive_count = keepalive_count;
         brpc::SocketId id = brpc::INVALID_SOCKET_ID;
         ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()->Create(options, 
&id));
         brpc::SocketUniquePtr ptr;
         ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)) << "id=" << id;
+        ASSERT_GT(ptr->fd(), 0);
         CheckKeepalive(ptr->fd(), true, brpc::FLAGS_socket_keepalive_idle_s,
                        brpc::FLAGS_socket_keepalive_interval_s, 
keepalive_count);
+        ASSERT_EQ(0, ptr->SetFailed());
     }
 
     {
-        int sockfd = socket(AF_INET, SOCK_STREAM, 0);
-        ASSERT_GT(sockfd, 0);
         brpc::SocketOptions options;
-        options.fd = sockfd;
+        options.remote_side = point;
+        options.connect_on_create = true;
         options.keepalive_options = 
std::make_shared<brpc::SocketKeepaliveOptions>();
         options.keepalive_options->keepalive_idle_s = keepalive_idle;
         options.keepalive_options->keepalive_interval_s = keepalive_interval;
@@ -1369,9 +1401,16 @@ TEST_F(SocketTest, keepalive_input_message) {
         ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()->Create(options, 
&id));
         brpc::SocketUniquePtr ptr;
         ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)) << "id=" << id;
+        ASSERT_GT(ptr->fd(), 0);
         CheckKeepalive(ptr->fd(), true, keepalive_idle,
                        keepalive_interval, keepalive_count);
+        ASSERT_EQ(0, ptr->SetFailed());
     }
+
+    messenger->StopAccept(0);
+    ASSERT_EQ(-1, messenger->listened_fd());
+    ASSERT_EQ(-1, fcntl(listening_fd, F_GETFD));
+    ASSERT_EQ(EBADF, errno);
 }
 
 #if defined(OS_LINUX)
diff --git a/test/brpc_streaming_rpc_unittest.cpp 
b/test/brpc_streaming_rpc_unittest.cpp
index b0dd4a39..056ea9a9 100644
--- a/test/brpc_streaming_rpc_unittest.cpp
+++ b/test/brpc_streaming_rpc_unittest.cpp
@@ -240,7 +240,6 @@ TEST_F(StreamingRpcTest, block) {
     out.append(&dummy, sizeof(dummy));
     ASSERT_EQ(EAGAIN, brpc::StreamWrite(request_stream, out));
     hc.block = false;
-    ASSERT_EQ(0, brpc::StreamWait(request_stream, NULL));
     // wait flushing all the pending messages
     while (handler._expected_next_value != N) {
         usleep(100);
@@ -249,6 +248,7 @@ TEST_F(StreamingRpcTest, block) {
     hc.block = true;
     // async wait
     for (int i = N; i < N + N; ++i) {
+        ASSERT_EQ(0, brpc::StreamWait(request_stream, NULL));
         int network = htonl(i);
         butil::IOBuf out;
         out.append(&network, sizeof(network));
@@ -432,18 +432,12 @@ TEST_F(StreamingRpcTest, idle_timeout) {
 
 class PingPongHandler : public brpc::StreamInputHandler {
 public:
-    explicit PingPongHandler()
-        : _expected_next_value(0)
-        , _failed(false)
-        , _stopped(false)
-        , _idle_times(0)
-    {
-    }
     int on_received_messages(brpc::StreamId id,
                              butil::IOBuf *const messages[],
                              size_t size) override {
         if (size != 1) {
-            _failed = true;
+            LOG(INFO) << "size=" << size;
+            _error = true;
             return 0;
         }
         for (size_t i = 0; i < size; ++i) {
@@ -451,7 +445,7 @@ public:
             int network = 0;
             messages[i]->cutn(&network, sizeof(int));
             if ((int)ntohl(network) != _expected_next_value) {
-                _failed = true;
+                _error = true;
             }
             int send_back = ntohl(network) + 1;
             _expected_next_value = send_back + 1;
@@ -481,14 +475,16 @@ public:
         _failed = true;
     }
 
+    bool error() const { return _error; }
     bool failed() const { return _failed; }
     bool stopped() const { return _stopped; }
     int idle_times() const { return _idle_times; }
 private:
-    int _expected_next_value;
-    bool _failed;
-    bool _stopped;
-    int _idle_times;
+    int _expected_next_value{0};
+    bool _error{false};
+    bool _failed{false};
+    bool _stopped{false};
+    int _idle_times{0};
 };
 
 TEST_F(StreamingRpcTest, ping_pong) {
@@ -524,8 +520,8 @@ TEST_F(StreamingRpcTest, ping_pong) {
     while (!resh.stopped() || !reqh.stopped()) {
         usleep(100);
     }
-    ASSERT_FALSE(resh.failed());
-    ASSERT_FALSE(reqh.failed());
+    ASSERT_FALSE(resh.error());
+    ASSERT_FALSE(reqh.error());
     ASSERT_EQ(0, resh.idle_times());
     ASSERT_EQ(0, reqh.idle_times());
 }
diff --git a/test/bthread_fd_unittest.cpp b/test/bthread_fd_unittest.cpp
index 91b89c21..fac6a4f2 100644
--- a/test/bthread_fd_unittest.cpp
+++ b/test/bthread_fd_unittest.cpp
@@ -616,7 +616,7 @@ void TestConnectInterruptImpl(bool timed) {
         int64_t connect_ms = butil::cpuwide_time_ms() - start_ms;
         LOG(INFO) << "Connect to " << ep << ", cost " << connect_ms << "ms";
 
-        timespec abstime = butil::milliseconds_from_now(connect_ms + 100);
+        timespec abstime = butil::milliseconds_from_now(connect_ms * 10);
         rc = bthread_timed_connect(
             sockfd, (struct sockaddr*) &serv_addr,
             serv_addr_size, &abstime);
diff --git a/test/bthread_unittest.cpp b/test/bthread_unittest.cpp
index 1fbefb27..eaa16fa5 100644
--- a/test/bthread_unittest.cpp
+++ b/test/bthread_unittest.cpp
@@ -600,8 +600,9 @@ TEST_F(BthreadTest, test_span) {
                                           test_son_parent_span, &multi_p2));
     ASSERT_EQ(0, bthread_join(multi_th1, NULL));
     ASSERT_EQ(0, bthread_join(multi_th2, NULL));
-    ASSERT_EQ(multi_p1, targets[0]);
-    ASSERT_EQ(multi_p2, targets[1]);
+    ASSERT_NE(multi_p1, multi_p2);
+    ASSERT_NE(std::find(targets, targets + 4, multi_p1), targets + 4);
+    ASSERT_NE(std::find(targets, targets + 4, multi_p2), targets + 4);
 }
 
 void* dummy_thread(void*) {
@@ -628,48 +629,74 @@ TEST_F(BthreadTest, yield_single_thread) {
 }
 
 #ifdef BRPC_BTHREAD_TRACER
-TEST_F(BthreadTest, trace) {
-    start = false;
-    stop = false;
-    bthread_t th;
-    ASSERT_EQ(0, bthread_start_urgent(&th, NULL, spin_and_log, (void*)1));
-    while (!start) {
-        usleep(10 * 1000);
-    }
-    bthread::FLAGS_enable_fast_unwind = false;
-    std::string st = bthread::stack_trace(th);
-    LOG(INFO) << "fast_unwind spin_and_log stack trace:\n" << st;
-    ASSERT_NE(std::string::npos, st.find("spin_and_log"));
-
-    bthread::FLAGS_enable_fast_unwind = true;
-    st = bthread::stack_trace(th);
-    LOG(INFO) << "spin_and_log stack trace:\n" << st;
-    ASSERT_NE(std::string::npos, st.find("spin_and_log"));
-    stop = true;
-    ASSERT_EQ(0, bthread_join(th, NULL));
+void spin_and_log_trace() {
+    bool ok = false;
+    for (int i = 0; i < 10; ++i) {
+        start = false;
+        stop = false;
+        bthread_t th;
+        ASSERT_EQ(0, bthread_start_urgent(&th, NULL, spin_and_log, (void*)1));
+        while (!start) {
+            usleep(10 * 1000);
+        }
+        bthread::FLAGS_enable_fast_unwind = false;
+        std::string st1 = bthread::stack_trace(th);
+        LOG(INFO) << "spin_and_log stack trace:\n" << st1;
+
+        bthread::FLAGS_enable_fast_unwind = true;
+        std::string st2 = bthread::stack_trace(th);
+        LOG(INFO) << "fast_unwind spin_and_log stack trace:\n" << st2;
+        stop = true;
+        ASSERT_EQ(0, bthread_join(th, NULL));
+
+        std::string st3 = bthread::stack_trace(th);
+        LOG(INFO) << "ended bthread stack trace:\n" << st3;
+        ASSERT_NE(std::string::npos, st3.find("not exist now"));
 
-    start = false;
-    stop = false;
-    ASSERT_EQ(0, bthread_start_urgent(&th, NULL, repeated_sleep, (void*)1));
-    while (!start) {
-        usleep(10 * 1000);
+        ok = st1.find("spin_and_log") != std::string::npos &&
+             st2.find("spin_and_log") != std::string::npos;
+        if (ok) {
+            break;
+        }
     }
-    bthread::FLAGS_enable_fast_unwind = false;
-    st = bthread::stack_trace(th);
-    LOG(INFO) << "fast_unwind repeated_sleep stack trace:\n" << st;
-    ASSERT_NE(std::string::npos, st.find("repeated_sleep"));
-
-    bthread::FLAGS_enable_fast_unwind = true;
-    st = bthread::stack_trace(th);
-    LOG(INFO) << "repeated_sleep stack trace:\n" << st;
-    ASSERT_NE(std::string::npos, st.find("repeated_sleep"));
-    stop = true;
-    ASSERT_EQ(0, bthread_join(th, NULL));
+    ASSERT_TRUE(ok);
+}
+
+void repeated_sleep_trace() {
+    bool ok = false;
+    for (int i = 0; i < 10; ++i) {
+        start = false;
+        stop = false;
+        bthread_t th;
+        ASSERT_EQ(0, bthread_start_urgent(&th, NULL, repeated_sleep, 
(void*)1));
+        while (!start) {
+            usleep(10 * 1000);
+        }
+        bthread::FLAGS_enable_fast_unwind = false;
+        std::string st1 = bthread::stack_trace(th);
+        LOG(INFO) << "repeated_sleep stack trace:\n" << st1;
 
-    st = bthread::stack_trace(th);
-    LOG(INFO) << "ended bthread stack trace:\n" << st;
-    ASSERT_NE(std::string::npos, st.find("not exist now"));
+        bthread::FLAGS_enable_fast_unwind = true;
+        std::string st2 = bthread::stack_trace(th);
+        LOG(INFO) << "fast_unwind repeated_sleep stack trace:\n" << st2;
+        stop = true;
+        ASSERT_EQ(0, bthread_join(th, NULL));
+
+        std::string st3 = bthread::stack_trace(th);
+        LOG(INFO) << "ended bthread stack trace:\n" << st3;
+        ASSERT_NE(std::string::npos, st3.find("not exist now"));
+        ok = st1.find("repeated_sleep") != std::string::npos &&
+             st2.find("repeated_sleep") != std::string::npos;
+        if (ok) {
+            break;
+        }
+    }
+    ASSERT_TRUE(ok);
+}
 
+TEST_F(BthreadTest, trace) {
+    spin_and_log_trace();
+    repeated_sleep_trace();
 }
 #endif // BRPC_BTHREAD_TRACER
 
diff --git a/test/endpoint_unittest.cpp b/test/endpoint_unittest.cpp
index 6ea45c04..cf471315 100644
--- a/test/endpoint_unittest.cpp
+++ b/test/endpoint_unittest.cpp
@@ -499,7 +499,7 @@ TEST(EndPointTest, tcp_connect) {
         ASSERT_LE(0, sockfd) << "errno=" << errno;
     }
     {
-        butil::fd_guard sockfd(butil::tcp_connect(ep1, NULL, 1));
+        butil::fd_guard sockfd(butil::tcp_connect(ep2, NULL, 1));
         ASSERT_EQ(-1, sockfd) << "errno=" << errno;
         ASSERT_EQ(ETIMEDOUT, errno);
     }
@@ -553,7 +553,7 @@ void TestConnectInterruptImpl(bool timed) {
         int64_t connect_ms = butil::cpuwide_time_ms() - start_ms;
         LOG(INFO) << "Connect to " << ep << ", cost " << connect_ms << "ms";
 
-        timespec abstime = butil::milliseconds_from_now(connect_ms * 2);
+        timespec abstime = butil::milliseconds_from_now(connect_ms * 10);
         rc = butil::pthread_timed_connect(
             sockfd, (struct sockaddr*) &serv_addr,
             serv_addr_size, &abstime);


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org
For additional commands, e-mail: dev-h...@brpc.apache.org

Reply via email to