This is an automated email from the ASF dual-hosted git repository.

wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 3aa5dab6 Fix UAF in batch_create_stream_feedback_race unittest (#3305)
3aa5dab6 is described below

commit 3aa5dab6a2fd77825783884c98f5292912502635
Author: Bright Chen <[email protected]>
AuthorDate: Wed May 20 12:05:41 2026 +0800

    Fix UAF in batch_create_stream_feedback_race unittest (#3305)
---
 test/brpc_streaming_rpc_unittest.cpp | 32 ++++++++++++++++++++++++++++----
 1 file changed, 28 insertions(+), 4 deletions(-)

diff --git a/test/brpc_streaming_rpc_unittest.cpp 
b/test/brpc_streaming_rpc_unittest.cpp
index ecb88c61..0f8a3e56 100644
--- a/test/brpc_streaming_rpc_unittest.cpp
+++ b/test/brpc_streaming_rpc_unittest.cpp
@@ -91,6 +91,7 @@ struct BatchStreamFeedbackRaceState {
     std::atomic<bool> client_got_second_msg{false};
     std::atomic<bool> server_write_done{false};
     std::atomic<bool> rpc_done{false};
+    std::atomic<int> client_closed_count{0};
 
     bthread_t server_send_tid{0};
     std::atomic<bool> server_send_started{false};
@@ -123,7 +124,9 @@ public:
 
     void on_idle_timeout(brpc::StreamId /*id*/) override {}
 
-    void on_closed(brpc::StreamId /*id*/) override {}
+    void on_closed(brpc::StreamId /*id*/) override {
+        _state->client_closed_count.fetch_add(1, std::memory_order_release);
+    }
 
     void on_failed(brpc::StreamId /*id*/, int /*error_code*/, const 
std::string& /*error_text*/) override {}
 
@@ -224,12 +227,17 @@ static void SetAtomicTrue(std::atomic<bool>* f) {
     f->store(true, std::memory_order_release);
 }
 
-static bool WaitForTrue(const std::atomic<bool>& f, int timeout_ms) {
+template <typename Pred>
+static bool WaitForTrue(Pred pred, int timeout_ms) {
     const int64_t deadline_us = butil::gettimeofday_us() + (int64_t)timeout_ms 
* 1000L;
-    while (!f.load(std::memory_order_acquire) && butil::gettimeofday_us() < 
deadline_us) {
+    while (!pred() && butil::gettimeofday_us() < deadline_us) {
         usleep(1000);
     }
-    return f.load(std::memory_order_acquire);
+    return pred();
+}
+
+static bool WaitForTrue(const std::atomic<bool>& f, int timeout_ms) {
+    return WaitForTrue([&f]() { return f.load(std::memory_order_acquire); }, 
timeout_ms);
 }
 
 TEST_F(StreamingRpcTest, sanity) {
@@ -307,6 +315,22 @@ TEST_F(StreamingRpcTest, 
batch_create_stream_feedback_race) {
         }
         server.Stop(0);
         server.Join();
+
+        // Release the SocketUniquePtr held above so the fake socket can be
+        // recycled. Otherwise BeforeRecycle / on_closed for the extra stream
+        // is deferred until `client_extra_ptr` destructs at scope exit, which
+        // happens *after* `client_handler` and `state` are destroyed -> UAF
+        // inside Stream::Consume on Linux.
+        client_extra_ptr.reset();
+
+        // on_closed() runs asynchronously on each client stream's consumer
+        // bthread. Wait for both before letting handler/state go out of
+        // scope, otherwise Stream::Consume will dereference freed memory.
+        int expected_closed = request_streams.size();
+        WaitForTrue([&state, expected_closed]() {
+            return state.client_closed_count.load(std::memory_order_acquire)
+                   >= expected_closed;
+        }, 2000);
     };
 
     test::EchoService_Stub stub(&channel);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to