This is an automated email from the ASF dual-hosted git repository. guangmingchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push: new 0c4f2a89 Fix some unstable UTs (#2928) 0c4f2a89 is described below commit 0c4f2a897cf159cb900ac4a86636fde0b72395a2 Author: Bright Chen <chenguangmin...@foxmail.com> AuthorDate: Fri Mar 28 23:13:42 2025 +0800 Fix some unstable UTs (#2928) --- src/brpc/socket.cpp | 2 +- test/brpc_builtin_service_unittest.cpp | 17 +++++- test/brpc_redis_unittest.cpp | 76 ++++++++--------------- test/brpc_socket_unittest.cpp | 107 ++++++++++++++++++++++----------- test/brpc_streaming_rpc_unittest.cpp | 28 ++++----- test/bthread_fd_unittest.cpp | 2 +- test/bthread_unittest.cpp | 107 +++++++++++++++++++++------------ test/endpoint_unittest.cpp | 4 +- 8 files changed, 194 insertions(+), 149 deletions(-) diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp index 5e969fc8..e7cc336a 100644 --- a/src/brpc/socket.cpp +++ b/src/brpc/socket.cpp @@ -812,7 +812,7 @@ int Socket::OnCreated(const SocketOptions& options) { } // Must be the last one! Internal fields of this Socket may be accessed // just after calling ResetFileDescriptor. - if (ResetFileDescriptor(options.fd) != 0) { + if (ResetFileDescriptor(fd) != 0) { const int saved_errno = errno; PLOG(ERROR) << "Fail to ResetFileDescriptor"; SetFailed(saved_errno, "Fail to ResetFileDescriptor: %s", diff --git a/test/brpc_builtin_service_unittest.cpp b/test/brpc_builtin_service_unittest.cpp index 638e5d4c..76b99811 100644 --- a/test/brpc_builtin_service_unittest.cpp +++ b/test/brpc_builtin_service_unittest.cpp @@ -841,8 +841,10 @@ void* dummy_bthread(void*) { #ifdef BRPC_BTHREAD_TRACER +bool g_bthread_trace_start = false; bool g_bthread_trace_stop = false; void* bthread_trace(void*) { + g_bthread_trace_start = true; while (!g_bthread_trace_stop) { bthread_usleep(1000 * 100); } @@ -883,9 +885,13 @@ TEST_F(BuiltinServiceTest, bthreads) { } #ifdef BRPC_BTHREAD_TRACER - { + bool ok = false; + for (int i = 0; i < 10; ++i) { bthread_t th; EXPECT_EQ(0, bthread_start_background(&th, NULL, bthread_trace, NULL)); + while (!g_bthread_trace_start) { + bthread_usleep(1000 * 10); + } ClosureChecker done; brpc::Controller cntl; std::string id_string; @@ -895,9 +901,14 @@ TEST_F(BuiltinServiceTest, bthreads) { service.default_method(&cntl, &req, &res, &done); g_bthread_trace_stop = true; EXPECT_FALSE(cntl.Failed()); - CheckContent(cntl, "stop=0"); - CheckContent(cntl, "bthread_trace"); + const std::string& content = cntl.response_attachment().to_string(); + ok = content.find("stop=0") != std::string::npos && + content.find("bthread_trace") != std::string::npos; + if (ok) { + break; + } } + ASSERT_TRUE(ok); #endif // BRPC_BTHREAD_TRACER } diff --git a/test/brpc_redis_unittest.cpp b/test/brpc_redis_unittest.cpp index 017d5c7e..7a48d19a 100644 --- a/test/brpc_redis_unittest.cpp +++ b/test/brpc_redis_unittest.cpp @@ -814,10 +814,9 @@ std::unordered_map<std::string, int64_t> int_map; class RedisServiceImpl : public brpc::RedisService { public: - RedisServiceImpl() + RedisServiceImpl(std::string password) : _batch_count(0) - , _user("user1") - , _password("password1") {} + , _password(std::move(password)) {} brpc::RedisCommandHandlerResult OnBatched(const std::vector<butil::StringPiece>& args, brpc::RedisReply* output, bool flush_batched) { @@ -867,21 +866,19 @@ public: std::vector<std::vector<std::string> > _batched_command; int _batch_count; - std::string _user; std::string _password; }; class AuthSession : public brpc::Destroyable { public: - explicit AuthSession(const std::string& user_name, const std::string& password) - : _user_name(user_name), _password(password) {} + explicit AuthSession(std::string password) + : _password(std::move(password)) {} void Destroy() override { delete this; } - const std::string _user_name; const std::string _password; }; @@ -894,17 +891,16 @@ public: const std::vector<butil::StringPiece>& args, brpc::RedisReply* output, bool flush_batched) { - if (args.size() < 2) { + if (args.size() < 1) { output->SetError("ERR wrong number of arguments for 'AUTH' command"); return brpc::REDIS_CMD_HANDLED; } - const std::string user(args[1].data(), args[1].size()); - const std::string password(args[2].data(), args[2].size()); - if (_rs->_user != user || _rs->_password != password) { + const std::string password(args[1].data(), args[1].size()); + if (_rs->_password != password) { output->SetError("ERR invalid username/password"); return brpc::REDIS_CMD_HANDLED; } - auto auth_session = new AuthSession(user, password); + auto auth_session = new AuthSession(password); ctx->reset_session(auth_session); output->SetStatus("OK"); return brpc::REDIS_CMD_HANDLED; @@ -929,7 +925,7 @@ public: return brpc::REDIS_CMD_HANDLED; } AuthSession* session = static_cast<AuthSession*>(ctx->session); - if (!session || (session->_password != _rs->_password) || (session->_user_name != _rs->_user)) { + if (!session || (session->_password != _rs->_password)) { output->SetError("ERR no auth"); return brpc::REDIS_CMD_HANDLED; } @@ -971,7 +967,7 @@ public: return brpc::REDIS_CMD_HANDLED; } AuthSession* session = static_cast<AuthSession*>(ctx->session); - if (!session || (session->_password != _rs->_password) || (session->_user_name != _rs->_user)) { + if (session->_password != _rs->_password) { output->SetError("ERR no auth"); return brpc::REDIS_CMD_HANDLED; } @@ -1015,7 +1011,7 @@ public: return brpc::REDIS_CMD_HANDLED; } AuthSession* session = static_cast<AuthSession*>(ctx->session); - if (!session || (session->_password != _rs->_password) || (session->_user_name != _rs->_user)) { + if (session->_password != _rs->_password) { output->SetError("ERR no auth"); return brpc::REDIS_CMD_HANDLED; } @@ -1036,9 +1032,10 @@ private: }; TEST_F(RedisTest, server_sanity) { + std::string password = GeneratePassword(); brpc::Server server; brpc::ServerOptions server_options; - RedisServiceImpl* rsimpl = new RedisServiceImpl; + RedisServiceImpl* rsimpl = new RedisServiceImpl(password); GetCommandHandler *gh = new GetCommandHandler(rsimpl); SetCommandHandler *sh = new SetCommandHandler(rsimpl); AuthCommandHandler *ah = new AuthCommandHandler(rsimpl); @@ -1053,21 +1050,13 @@ TEST_F(RedisTest, server_sanity) { brpc::ChannelOptions options; options.protocol = brpc::PROTOCOL_REDIS; + options.auth = new brpc::policy::RedisAuthenticator(password); brpc::Channel channel; ASSERT_EQ(0, channel.Init("127.0.0.1", server.listen_address().port, &options)); brpc::RedisRequest request; brpc::RedisResponse response; brpc::Controller cntl; - ASSERT_TRUE(request.AddCommand("auth user1 password1")); - channel.CallMethod(NULL, &cntl, &request, &response, NULL); - ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText(); - ASSERT_EQ(1, response.reply_size()); - ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(0).type()); - ASSERT_STREQ("OK", response.reply(0).c_str()); - request.Clear(); - response.Clear(); - cntl.Reset(); ASSERT_TRUE(request.AddCommand("get hello")); ASSERT_TRUE(request.AddCommand("get hello2")); ASSERT_TRUE(request.AddCommand("set key1 value1")); @@ -1122,13 +1111,6 @@ TEST_F(RedisTest, server_sanity) { void* incr_thread(void* arg) { brpc::Channel* c = static_cast<brpc::Channel*>(arg); - // do auth - brpc::RedisRequest auth_req; - brpc::RedisResponse auth_resp; - brpc::Controller auth_cntl; - EXPECT_TRUE(auth_req.AddCommand("auth user1 password1")); - c->CallMethod(NULL, &auth_cntl, &auth_req, &auth_resp, NULL); - EXPECT_FALSE(auth_cntl.Failed()) << auth_cntl.ErrorText(); for (int i = 0; i < 5000; ++i) { brpc::RedisRequest request; brpc::RedisResponse response; @@ -1137,16 +1119,17 @@ void* incr_thread(void* arg) { c->CallMethod(NULL, &cntl, &request, &response, NULL); EXPECT_FALSE(cntl.Failed()) << cntl.ErrorText(); EXPECT_EQ(1, response.reply_size()); - EXPECT_TRUE(response.reply(0).is_integer()); + EXPECT_TRUE(response.reply(0).is_integer()) << response.reply(0); } return NULL; } TEST_F(RedisTest, server_concurrency) { + std::string password = GeneratePassword(); int N = 10; brpc::Server server; brpc::ServerOptions server_options; - RedisServiceImpl* rsimpl = new RedisServiceImpl; + RedisServiceImpl* rsimpl = new RedisServiceImpl(password); AuthCommandHandler *ah = new AuthCommandHandler(rsimpl); IncrCommandHandler *ih = new IncrCommandHandler(rsimpl); rsimpl->AddCommandHandler("incr", ih); @@ -1158,6 +1141,7 @@ TEST_F(RedisTest, server_concurrency) { brpc::ChannelOptions options; options.protocol = brpc::PROTOCOL_REDIS; options.connection_type = "pooled"; + options.auth = new brpc::policy::RedisAuthenticator(password); std::vector<bthread_t> bths; std::vector<brpc::Channel*> channels; for (int i = 0; i < N; ++i) { @@ -1228,9 +1212,10 @@ public: }; TEST_F(RedisTest, server_command_continue) { + std::string password = GeneratePassword(); brpc::Server server; brpc::ServerOptions server_options; - RedisServiceImpl* rsimpl = new RedisServiceImpl; + RedisServiceImpl* rsimpl = new RedisServiceImpl(password); rsimpl->AddCommandHandler("auth", new AuthCommandHandler(rsimpl)); rsimpl->AddCommandHandler("get", new GetCommandHandler(rsimpl)); rsimpl->AddCommandHandler("set", new SetCommandHandler(rsimpl)); @@ -1242,16 +1227,9 @@ TEST_F(RedisTest, server_command_continue) { brpc::ChannelOptions options; options.protocol = brpc::PROTOCOL_REDIS; + options.auth = new brpc::policy::RedisAuthenticator(password); brpc::Channel channel; ASSERT_EQ(0, channel.Init("127.0.0.1", server.listen_address().port, &options)); - // do auth - brpc::RedisRequest auth_req; - brpc::RedisResponse auth_resp; - brpc::Controller auth_cntl; - ASSERT_TRUE(auth_req.AddCommand("auth user1 password1")); - channel.CallMethod(NULL, &auth_cntl, &auth_req, &auth_resp, NULL); - ASSERT_FALSE(auth_cntl.Failed()) << auth_cntl.ErrorText(); - { brpc::RedisRequest request; brpc::RedisResponse response; @@ -1311,9 +1289,10 @@ TEST_F(RedisTest, server_command_continue) { } TEST_F(RedisTest, server_handle_pipeline) { + std::string password = GeneratePassword(); brpc::Server server; brpc::ServerOptions server_options; - RedisServiceImpl* rsimpl = new RedisServiceImpl; + RedisServiceImpl* rsimpl = new RedisServiceImpl(password); GetCommandHandler* getch = new GetCommandHandler(rsimpl, true); SetCommandHandler* setch = new SetCommandHandler(rsimpl, true); AuthCommandHandler* authch = new AuthCommandHandler(rsimpl); @@ -1327,20 +1306,13 @@ TEST_F(RedisTest, server_handle_pipeline) { brpc::ChannelOptions options; options.protocol = brpc::PROTOCOL_REDIS; + options.auth = new brpc::policy::RedisAuthenticator(password); brpc::Channel channel; ASSERT_EQ(0, channel.Init("127.0.0.1", server.listen_address().port, &options)); brpc::RedisRequest request; brpc::RedisResponse response; brpc::Controller cntl; - ASSERT_TRUE(request.AddCommand("auth user1 password1")); - channel.CallMethod(NULL, &cntl, &request, &response, NULL); - ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText(); - ASSERT_EQ(1, response.reply_size()); - ASSERT_STREQ("OK", response.reply(0).c_str()); - request.Clear(); - response.Clear(); - cntl.Reset(); ASSERT_TRUE(request.AddCommand("set key1 v1")); ASSERT_TRUE(request.AddCommand("set key2 v2")); ASSERT_TRUE(request.AddCommand("set key3 v3")); diff --git a/test/brpc_socket_unittest.cpp b/test/brpc_socket_unittest.cpp index 78b83503..0f35863f 100644 --- a/test/brpc_socket_unittest.cpp +++ b/test/brpc_socket_unittest.cpp @@ -335,10 +335,17 @@ TEST_F(SocketTest, single_threaded_connect_and_write) { EchoProcessHuluRequest, NULL, NULL, "dummy_hulu" } }; + int listening_fd = -1; butil::EndPoint point(butil::IP_ANY, 7878); - int listening_fd = tcp_listen(point); - ASSERT_TRUE(listening_fd > 0); - butil::make_non_blocking(listening_fd); + for (int i = 0; i < 100; ++i) { + point.port += i; + listening_fd = tcp_listen(point); + if (listening_fd >= 0) { + break; + } + } + ASSERT_GT(listening_fd, 0) << berror(); + ASSERT_EQ(0, butil::make_non_blocking(listening_fd)); ASSERT_EQ(0, messenger->AddHandler(pairs[0])); ASSERT_EQ(0, messenger->StartAccept(listening_fd, -1, NULL, false)); @@ -1130,6 +1137,7 @@ TEST_F(SocketTest, keepalive) { brpc::SocketUniquePtr ptr; ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)); CheckNoKeepalive(ptr->fd()); + ASSERT_EQ(0, ptr->SetFailed()); } int keepalive_idle = 1; @@ -1148,6 +1156,7 @@ TEST_F(SocketTest, keepalive) { ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)); CheckKeepalive(ptr->fd(), true, default_keepalive_idle, default_keepalive_interval, default_keepalive_count); + ASSERT_EQ(0, ptr->SetFailed()); } // Enable keepalive and set keepalive idle. @@ -1157,8 +1166,7 @@ TEST_F(SocketTest, keepalive) { brpc::SocketOptions options; options.fd = sockfd; options.keepalive_options = std::make_shared<brpc::SocketKeepaliveOptions>(); - options.keepalive_options->keepalive_idle_s - = keepalive_idle; + options.keepalive_options->keepalive_idle_s = keepalive_idle; brpc::SocketId id; ASSERT_EQ(0, brpc::Socket::Create(options, &id)); brpc::SocketUniquePtr ptr; @@ -1166,6 +1174,7 @@ TEST_F(SocketTest, keepalive) { CheckKeepalive(ptr->fd(), true, keepalive_idle, default_keepalive_interval, default_keepalive_count); + ASSERT_EQ(0, ptr->SetFailed()); } // Enable keepalive and set keepalive interval. @@ -1175,14 +1184,14 @@ TEST_F(SocketTest, keepalive) { brpc::SocketOptions options; options.fd = sockfd; options.keepalive_options = std::make_shared<brpc::SocketKeepaliveOptions>(); - options.keepalive_options->keepalive_interval_s - = keepalive_interval; + options.keepalive_options->keepalive_interval_s = keepalive_interval; brpc::SocketId id; ASSERT_EQ(0, brpc::Socket::Create(options, &id)); brpc::SocketUniquePtr ptr; ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)); CheckKeepalive(ptr->fd(), true, default_keepalive_idle, keepalive_interval, default_keepalive_count); + ASSERT_EQ(0, ptr->SetFailed()); } // Enable keepalive and set keepalive count. @@ -1199,6 +1208,7 @@ TEST_F(SocketTest, keepalive) { ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)); CheckKeepalive(ptr->fd(), true, default_keepalive_idle, default_keepalive_interval, keepalive_count); + ASSERT_EQ(0, ptr->SetFailed()); } // Enable keepalive and set keepalive idle, interval, count. @@ -1217,10 +1227,25 @@ TEST_F(SocketTest, keepalive) { ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)); CheckKeepalive(ptr->fd(), true, keepalive_idle, keepalive_interval, keepalive_count); + ASSERT_EQ(0, ptr->SetFailed()); } } TEST_F(SocketTest, keepalive_input_message) { + brpc::Acceptor* messenger = new brpc::Acceptor; + int listening_fd = -1; + butil::EndPoint point(butil::IP_ANY, 7878); + for (int i = 0; i < 100; ++i) { + point.port += i; + listening_fd = tcp_listen(point); + if (listening_fd >= 0) { + break; + } + } + ASSERT_GT(listening_fd, 0) << berror(); + ASSERT_EQ(0, butil::make_non_blocking(listening_fd)); + ASSERT_EQ(0, messenger->StartAccept(listening_fd, -1, NULL, false)); + int default_keepalive = 0; int default_keepalive_idle = 0; int default_keepalive_interval = 0; @@ -1234,76 +1259,81 @@ TEST_F(SocketTest, keepalive_input_message) { // Disable keepalive. { - int sockfd = socket(AF_INET, SOCK_STREAM, 0); - ASSERT_GT(sockfd, 0); brpc::SocketOptions options; - options.fd = sockfd; + options.remote_side = point; + options.connect_on_create = true; brpc::SocketId id = brpc::INVALID_SOCKET_ID; ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()->Create(options, &id)); brpc::SocketUniquePtr ptr; ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)) << "id=" << id; + ASSERT_GT(ptr->fd(), 0); CheckNoKeepalive(ptr->fd()); + ASSERT_EQ(0, ptr->SetFailed()); } // Enable keepalive. brpc::FLAGS_socket_keepalive = true; { - int sockfd = socket(AF_INET, SOCK_STREAM, 0); - ASSERT_GT(sockfd, 0); brpc::SocketOptions options; - options.fd = sockfd; + options.remote_side = point; + options.connect_on_create = true; brpc::SocketId id = brpc::INVALID_SOCKET_ID; ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()->Create(options, &id)); brpc::SocketUniquePtr ptr; ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)) << "id=" << id; + ASSERT_GT(ptr->fd(), 0); CheckKeepalive(ptr->fd(), true, default_keepalive_idle, default_keepalive_interval, default_keepalive_count); + ASSERT_EQ(0, ptr->SetFailed()); } // Enable keepalive and set keepalive idle. brpc::FLAGS_socket_keepalive_idle_s = 10; { - int sockfd = socket(AF_INET, SOCK_STREAM, 0); - ASSERT_GT(sockfd, 0); brpc::SocketOptions options; - options.fd = sockfd; + options.remote_side = point; + options.connect_on_create = true; brpc::SocketId id = brpc::INVALID_SOCKET_ID; ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()->Create(options, &id)); brpc::SocketUniquePtr ptr; ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)) << "id=" << id; + ASSERT_GT(ptr->fd(), 0); CheckKeepalive(ptr->fd(), true, brpc::FLAGS_socket_keepalive_idle_s, default_keepalive_interval, default_keepalive_count); + ASSERT_EQ(0, ptr->SetFailed()); } // Enable keepalive and set keepalive idle, interval. brpc::FLAGS_socket_keepalive_interval_s = 10; { - int sockfd = socket(AF_INET, SOCK_STREAM, 0); - ASSERT_GT(sockfd, 0); brpc::SocketOptions options; - options.fd = sockfd; + options.remote_side = point; + options.connect_on_create = true; brpc::SocketId id = brpc::INVALID_SOCKET_ID; ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()->Create(options, &id)); brpc::SocketUniquePtr ptr; ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)) << "id=" << id; + ASSERT_GT(ptr->fd(), 0); CheckKeepalive(ptr->fd(), true, brpc::FLAGS_socket_keepalive_idle_s, brpc::FLAGS_socket_keepalive_interval_s, default_keepalive_count); + ASSERT_EQ(0, ptr->SetFailed()); } // Enable keepalive and set keepalive idle, interval, count. brpc::FLAGS_socket_keepalive_count = 10; { - int sockfd = socket(AF_INET, SOCK_STREAM, 0); - ASSERT_GT(sockfd, 0); brpc::SocketOptions options; - options.fd = sockfd; + options.remote_side = point; + options.connect_on_create = true; brpc::SocketId id = brpc::INVALID_SOCKET_ID; ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()->Create(options, &id)); brpc::SocketUniquePtr ptr; ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)) << "id=" << id; + ASSERT_GT(ptr->fd(), 0); CheckKeepalive(ptr->fd(), true, brpc::FLAGS_socket_keepalive_idle_s, brpc::FLAGS_socket_keepalive_interval_s, brpc::FLAGS_socket_keepalive_count); + ASSERT_EQ(0, ptr->SetFailed()); } // Options of keepalive set by user have priority over Gflags. @@ -1311,56 +1341,58 @@ TEST_F(SocketTest, keepalive_input_message) { int keepalive_interval = 2; int keepalive_count = 2; { - int sockfd = socket(AF_INET, SOCK_STREAM, 0); - ASSERT_GT(sockfd, 0); brpc::SocketOptions options; - options.fd = sockfd; + options.remote_side = point; + options.connect_on_create = true; options.keepalive_options = std::make_shared<brpc::SocketKeepaliveOptions>(); options.keepalive_options->keepalive_idle_s = keepalive_idle; brpc::SocketId id = brpc::INVALID_SOCKET_ID; ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()->Create(options, &id)); brpc::SocketUniquePtr ptr; ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)) << "id=" << id; + ASSERT_GT(ptr->fd(), 0); CheckKeepalive(ptr->fd(), true, keepalive_idle, brpc::FLAGS_socket_keepalive_interval_s, brpc::FLAGS_socket_keepalive_count); + ASSERT_EQ(0, ptr->SetFailed()); } { - int sockfd = socket(AF_INET, SOCK_STREAM, 0); - ASSERT_GT(sockfd, 0); brpc::SocketOptions options; - options.fd = sockfd; + options.remote_side = point; + options.connect_on_create = true; options.keepalive_options = std::make_shared<brpc::SocketKeepaliveOptions>(); options.keepalive_options->keepalive_interval_s = keepalive_interval; brpc::SocketId id = brpc::INVALID_SOCKET_ID; ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()->Create(options, &id)); brpc::SocketUniquePtr ptr; ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)) << "id=" << id; + ASSERT_GT(ptr->fd(), 0); CheckKeepalive(ptr->fd(), true, brpc::FLAGS_socket_keepalive_idle_s, keepalive_interval, brpc::FLAGS_socket_keepalive_count); + ASSERT_EQ(0, ptr->SetFailed()); } { - int sockfd = socket(AF_INET, SOCK_STREAM, 0); - ASSERT_GT(sockfd, 0); brpc::SocketOptions options; - options.fd = sockfd; + options.remote_side = point; + options.connect_on_create = true; options.keepalive_options = std::make_shared<brpc::SocketKeepaliveOptions>(); options.keepalive_options->keepalive_count = keepalive_count; brpc::SocketId id = brpc::INVALID_SOCKET_ID; ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()->Create(options, &id)); brpc::SocketUniquePtr ptr; ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)) << "id=" << id; + ASSERT_GT(ptr->fd(), 0); CheckKeepalive(ptr->fd(), true, brpc::FLAGS_socket_keepalive_idle_s, brpc::FLAGS_socket_keepalive_interval_s, keepalive_count); + ASSERT_EQ(0, ptr->SetFailed()); } { - int sockfd = socket(AF_INET, SOCK_STREAM, 0); - ASSERT_GT(sockfd, 0); brpc::SocketOptions options; - options.fd = sockfd; + options.remote_side = point; + options.connect_on_create = true; options.keepalive_options = std::make_shared<brpc::SocketKeepaliveOptions>(); options.keepalive_options->keepalive_idle_s = keepalive_idle; options.keepalive_options->keepalive_interval_s = keepalive_interval; @@ -1369,9 +1401,16 @@ TEST_F(SocketTest, keepalive_input_message) { ASSERT_EQ(0, brpc::get_or_new_client_side_messenger()->Create(options, &id)); brpc::SocketUniquePtr ptr; ASSERT_EQ(0, brpc::Socket::Address(id, &ptr)) << "id=" << id; + ASSERT_GT(ptr->fd(), 0); CheckKeepalive(ptr->fd(), true, keepalive_idle, keepalive_interval, keepalive_count); + ASSERT_EQ(0, ptr->SetFailed()); } + + messenger->StopAccept(0); + ASSERT_EQ(-1, messenger->listened_fd()); + ASSERT_EQ(-1, fcntl(listening_fd, F_GETFD)); + ASSERT_EQ(EBADF, errno); } #if defined(OS_LINUX) diff --git a/test/brpc_streaming_rpc_unittest.cpp b/test/brpc_streaming_rpc_unittest.cpp index b0dd4a39..056ea9a9 100644 --- a/test/brpc_streaming_rpc_unittest.cpp +++ b/test/brpc_streaming_rpc_unittest.cpp @@ -240,7 +240,6 @@ TEST_F(StreamingRpcTest, block) { out.append(&dummy, sizeof(dummy)); ASSERT_EQ(EAGAIN, brpc::StreamWrite(request_stream, out)); hc.block = false; - ASSERT_EQ(0, brpc::StreamWait(request_stream, NULL)); // wait flushing all the pending messages while (handler._expected_next_value != N) { usleep(100); @@ -249,6 +248,7 @@ TEST_F(StreamingRpcTest, block) { hc.block = true; // async wait for (int i = N; i < N + N; ++i) { + ASSERT_EQ(0, brpc::StreamWait(request_stream, NULL)); int network = htonl(i); butil::IOBuf out; out.append(&network, sizeof(network)); @@ -432,18 +432,12 @@ TEST_F(StreamingRpcTest, idle_timeout) { class PingPongHandler : public brpc::StreamInputHandler { public: - explicit PingPongHandler() - : _expected_next_value(0) - , _failed(false) - , _stopped(false) - , _idle_times(0) - { - } int on_received_messages(brpc::StreamId id, butil::IOBuf *const messages[], size_t size) override { if (size != 1) { - _failed = true; + LOG(INFO) << "size=" << size; + _error = true; return 0; } for (size_t i = 0; i < size; ++i) { @@ -451,7 +445,7 @@ public: int network = 0; messages[i]->cutn(&network, sizeof(int)); if ((int)ntohl(network) != _expected_next_value) { - _failed = true; + _error = true; } int send_back = ntohl(network) + 1; _expected_next_value = send_back + 1; @@ -481,14 +475,16 @@ public: _failed = true; } + bool error() const { return _error; } bool failed() const { return _failed; } bool stopped() const { return _stopped; } int idle_times() const { return _idle_times; } private: - int _expected_next_value; - bool _failed; - bool _stopped; - int _idle_times; + int _expected_next_value{0}; + bool _error{false}; + bool _failed{false}; + bool _stopped{false}; + int _idle_times{0}; }; TEST_F(StreamingRpcTest, ping_pong) { @@ -524,8 +520,8 @@ TEST_F(StreamingRpcTest, ping_pong) { while (!resh.stopped() || !reqh.stopped()) { usleep(100); } - ASSERT_FALSE(resh.failed()); - ASSERT_FALSE(reqh.failed()); + ASSERT_FALSE(resh.error()); + ASSERT_FALSE(reqh.error()); ASSERT_EQ(0, resh.idle_times()); ASSERT_EQ(0, reqh.idle_times()); } diff --git a/test/bthread_fd_unittest.cpp b/test/bthread_fd_unittest.cpp index 91b89c21..fac6a4f2 100644 --- a/test/bthread_fd_unittest.cpp +++ b/test/bthread_fd_unittest.cpp @@ -616,7 +616,7 @@ void TestConnectInterruptImpl(bool timed) { int64_t connect_ms = butil::cpuwide_time_ms() - start_ms; LOG(INFO) << "Connect to " << ep << ", cost " << connect_ms << "ms"; - timespec abstime = butil::milliseconds_from_now(connect_ms + 100); + timespec abstime = butil::milliseconds_from_now(connect_ms * 10); rc = bthread_timed_connect( sockfd, (struct sockaddr*) &serv_addr, serv_addr_size, &abstime); diff --git a/test/bthread_unittest.cpp b/test/bthread_unittest.cpp index 1fbefb27..eaa16fa5 100644 --- a/test/bthread_unittest.cpp +++ b/test/bthread_unittest.cpp @@ -600,8 +600,9 @@ TEST_F(BthreadTest, test_span) { test_son_parent_span, &multi_p2)); ASSERT_EQ(0, bthread_join(multi_th1, NULL)); ASSERT_EQ(0, bthread_join(multi_th2, NULL)); - ASSERT_EQ(multi_p1, targets[0]); - ASSERT_EQ(multi_p2, targets[1]); + ASSERT_NE(multi_p1, multi_p2); + ASSERT_NE(std::find(targets, targets + 4, multi_p1), targets + 4); + ASSERT_NE(std::find(targets, targets + 4, multi_p2), targets + 4); } void* dummy_thread(void*) { @@ -628,48 +629,74 @@ TEST_F(BthreadTest, yield_single_thread) { } #ifdef BRPC_BTHREAD_TRACER -TEST_F(BthreadTest, trace) { - start = false; - stop = false; - bthread_t th; - ASSERT_EQ(0, bthread_start_urgent(&th, NULL, spin_and_log, (void*)1)); - while (!start) { - usleep(10 * 1000); - } - bthread::FLAGS_enable_fast_unwind = false; - std::string st = bthread::stack_trace(th); - LOG(INFO) << "fast_unwind spin_and_log stack trace:\n" << st; - ASSERT_NE(std::string::npos, st.find("spin_and_log")); - - bthread::FLAGS_enable_fast_unwind = true; - st = bthread::stack_trace(th); - LOG(INFO) << "spin_and_log stack trace:\n" << st; - ASSERT_NE(std::string::npos, st.find("spin_and_log")); - stop = true; - ASSERT_EQ(0, bthread_join(th, NULL)); +void spin_and_log_trace() { + bool ok = false; + for (int i = 0; i < 10; ++i) { + start = false; + stop = false; + bthread_t th; + ASSERT_EQ(0, bthread_start_urgent(&th, NULL, spin_and_log, (void*)1)); + while (!start) { + usleep(10 * 1000); + } + bthread::FLAGS_enable_fast_unwind = false; + std::string st1 = bthread::stack_trace(th); + LOG(INFO) << "spin_and_log stack trace:\n" << st1; + + bthread::FLAGS_enable_fast_unwind = true; + std::string st2 = bthread::stack_trace(th); + LOG(INFO) << "fast_unwind spin_and_log stack trace:\n" << st2; + stop = true; + ASSERT_EQ(0, bthread_join(th, NULL)); + + std::string st3 = bthread::stack_trace(th); + LOG(INFO) << "ended bthread stack trace:\n" << st3; + ASSERT_NE(std::string::npos, st3.find("not exist now")); - start = false; - stop = false; - ASSERT_EQ(0, bthread_start_urgent(&th, NULL, repeated_sleep, (void*)1)); - while (!start) { - usleep(10 * 1000); + ok = st1.find("spin_and_log") != std::string::npos && + st2.find("spin_and_log") != std::string::npos; + if (ok) { + break; + } } - bthread::FLAGS_enable_fast_unwind = false; - st = bthread::stack_trace(th); - LOG(INFO) << "fast_unwind repeated_sleep stack trace:\n" << st; - ASSERT_NE(std::string::npos, st.find("repeated_sleep")); - - bthread::FLAGS_enable_fast_unwind = true; - st = bthread::stack_trace(th); - LOG(INFO) << "repeated_sleep stack trace:\n" << st; - ASSERT_NE(std::string::npos, st.find("repeated_sleep")); - stop = true; - ASSERT_EQ(0, bthread_join(th, NULL)); + ASSERT_TRUE(ok); +} + +void repeated_sleep_trace() { + bool ok = false; + for (int i = 0; i < 10; ++i) { + start = false; + stop = false; + bthread_t th; + ASSERT_EQ(0, bthread_start_urgent(&th, NULL, repeated_sleep, (void*)1)); + while (!start) { + usleep(10 * 1000); + } + bthread::FLAGS_enable_fast_unwind = false; + std::string st1 = bthread::stack_trace(th); + LOG(INFO) << "repeated_sleep stack trace:\n" << st1; - st = bthread::stack_trace(th); - LOG(INFO) << "ended bthread stack trace:\n" << st; - ASSERT_NE(std::string::npos, st.find("not exist now")); + bthread::FLAGS_enable_fast_unwind = true; + std::string st2 = bthread::stack_trace(th); + LOG(INFO) << "fast_unwind repeated_sleep stack trace:\n" << st2; + stop = true; + ASSERT_EQ(0, bthread_join(th, NULL)); + + std::string st3 = bthread::stack_trace(th); + LOG(INFO) << "ended bthread stack trace:\n" << st3; + ASSERT_NE(std::string::npos, st3.find("not exist now")); + ok = st1.find("repeated_sleep") != std::string::npos && + st2.find("repeated_sleep") != std::string::npos; + if (ok) { + break; + } + } + ASSERT_TRUE(ok); +} +TEST_F(BthreadTest, trace) { + spin_and_log_trace(); + repeated_sleep_trace(); } #endif // BRPC_BTHREAD_TRACER diff --git a/test/endpoint_unittest.cpp b/test/endpoint_unittest.cpp index 6ea45c04..cf471315 100644 --- a/test/endpoint_unittest.cpp +++ b/test/endpoint_unittest.cpp @@ -499,7 +499,7 @@ TEST(EndPointTest, tcp_connect) { ASSERT_LE(0, sockfd) << "errno=" << errno; } { - butil::fd_guard sockfd(butil::tcp_connect(ep1, NULL, 1)); + butil::fd_guard sockfd(butil::tcp_connect(ep2, NULL, 1)); ASSERT_EQ(-1, sockfd) << "errno=" << errno; ASSERT_EQ(ETIMEDOUT, errno); } @@ -553,7 +553,7 @@ void TestConnectInterruptImpl(bool timed) { int64_t connect_ms = butil::cpuwide_time_ms() - start_ms; LOG(INFO) << "Connect to " << ep << ", cost " << connect_ms << "ms"; - timespec abstime = butil::milliseconds_from_now(connect_ms * 2); + timespec abstime = butil::milliseconds_from_now(connect_ms * 10); rc = butil::pthread_timed_connect( sockfd, (struct sockaddr*) &serv_addr, serv_addr_size, &abstime); --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org