This is an automated email from the ASF dual-hosted git repository.
wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new 3aa5dab6 Fix UAF in batch_create_stream_feedback_race unittest (#3305)
3aa5dab6 is described below
commit 3aa5dab6a2fd77825783884c98f5292912502635
Author: Bright Chen <[email protected]>
AuthorDate: Wed May 20 12:05:41 2026 +0800
Fix UAF in batch_create_stream_feedback_race unittest (#3305)
---
test/brpc_streaming_rpc_unittest.cpp | 32 ++++++++++++++++++++++++++++----
1 file changed, 28 insertions(+), 4 deletions(-)
diff --git a/test/brpc_streaming_rpc_unittest.cpp
b/test/brpc_streaming_rpc_unittest.cpp
index ecb88c61..0f8a3e56 100644
--- a/test/brpc_streaming_rpc_unittest.cpp
+++ b/test/brpc_streaming_rpc_unittest.cpp
@@ -91,6 +91,7 @@ struct BatchStreamFeedbackRaceState {
std::atomic<bool> client_got_second_msg{false};
std::atomic<bool> server_write_done{false};
std::atomic<bool> rpc_done{false};
+ std::atomic<int> client_closed_count{0};
bthread_t server_send_tid{0};
std::atomic<bool> server_send_started{false};
@@ -123,7 +124,9 @@ public:
void on_idle_timeout(brpc::StreamId /*id*/) override {}
- void on_closed(brpc::StreamId /*id*/) override {}
+ void on_closed(brpc::StreamId /*id*/) override {
+ _state->client_closed_count.fetch_add(1, std::memory_order_release);
+ }
void on_failed(brpc::StreamId /*id*/, int /*error_code*/, const
std::string& /*error_text*/) override {}
@@ -224,12 +227,17 @@ static void SetAtomicTrue(std::atomic<bool>* f) {
f->store(true, std::memory_order_release);
}
-static bool WaitForTrue(const std::atomic<bool>& f, int timeout_ms) {
+template <typename Pred>
+static bool WaitForTrue(Pred pred, int timeout_ms) {
const int64_t deadline_us = butil::gettimeofday_us() + (int64_t)timeout_ms
* 1000L;
- while (!f.load(std::memory_order_acquire) && butil::gettimeofday_us() <
deadline_us) {
+ while (!pred() && butil::gettimeofday_us() < deadline_us) {
usleep(1000);
}
- return f.load(std::memory_order_acquire);
+ return pred();
+}
+
+static bool WaitForTrue(const std::atomic<bool>& f, int timeout_ms) {
+ return WaitForTrue([&f]() { return f.load(std::memory_order_acquire); },
timeout_ms);
}
TEST_F(StreamingRpcTest, sanity) {
@@ -307,6 +315,22 @@ TEST_F(StreamingRpcTest,
batch_create_stream_feedback_race) {
}
server.Stop(0);
server.Join();
+
+ // Release the SocketUniquePtr held above so the fake socket can be
+ // recycled. Otherwise BeforeRecycle / on_closed for the extra stream
+ // is deferred until `client_extra_ptr` destructs at scope exit, which
+ // happens *after* `client_handler` and `state` are destroyed -> UAF
+ // inside Stream::Consume on Linux.
+ client_extra_ptr.reset();
+
+ // on_closed() runs asynchronously on each client stream's consumer
+ // bthread. Wait for both before letting handler/state go out of
+ // scope, otherwise Stream::Consume will dereference freed memory.
+ int expected_closed = request_streams.size();
+ WaitForTrue([&state, expected_closed]() {
+ return state.client_closed_count.load(std::memory_order_acquire)
+ >= expected_closed;
+ }, 2000);
};
test::EchoService_Stub stub(&channel);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]