Copilot commented on code in PR #3215:
URL: https://github.com/apache/brpc/pull/3215#discussion_r2888724175


##########
test/brpc_streaming_rpc_unittest.cpp:
##########
@@ -20,13 +20,16 @@
 // Date: 2015/10/22 16:28:44
 
 #include <gtest/gtest.h>
+#include <atomic>
 #include "brpc/server.h"
 
 #include "brpc/controller.h"
 #include "brpc/channel.h"
+#include "brpc/callback.h"
 #include "brpc/socket.h"
 #include "brpc/stream_impl.h"
 #include "brpc/policy/streaming_rpc_protocol.h"
+#include "bthread/countdown_event.h"

Review Comment:
   `bthread/countdown_event.h` is included but not used in this test file, 
which adds unnecessary dependencies and compile time. Please remove it (or add 
the intended usage if it was meant for synchronization).
   ```suggestion
   
   ```



##########
test/brpc_streaming_rpc_unittest.cpp:
##########
@@ -78,6 +81,158 @@ class StreamingRpcTest : public testing::Test {
     test::EchoResponse response;
 };
 
+struct BatchStreamFeedbackRaceState {
+    brpc::StreamId server_first_stream_id{brpc::INVALID_STREAM_ID};
+    brpc::StreamId server_extra_stream_id{brpc::INVALID_STREAM_ID};
+    brpc::StreamId client_extra_stream_id{brpc::INVALID_STREAM_ID};
+
+    std::atomic<int> server_first_write_rc{-1};
+    std::atomic<int> server_second_write_rc{-1};
+    std::atomic<bool> client_got_first_msg{false};
+    std::atomic<bool> client_got_second_msg{false};
+    std::atomic<bool> server_write_done{false};
+    std::atomic<bool> rpc_done{false};
+
+    bthread_t server_send_tid{0};
+    std::atomic<bool> server_send_started{false};
+};
+
+class BatchStreamClientHandler : public brpc::StreamInputHandler {
+public:
+    explicit BatchStreamClientHandler(BatchStreamFeedbackRaceState* state)
+        : _state(state) {}
+
+    int on_received_messages(brpc::StreamId id,
+                             butil::IOBuf* const messages[],
+                             size_t size) override {
+        if (id != _state->client_extra_stream_id) {
+            // This test only cares about extra stream in batch creation.
+            return 0;
+        }
+        for (size_t i = 0; i < size; ++i) {
+            const size_t len = messages[i]->length();
+            messages[i]->clear();
+            // First payload: 64 bytes. Second payload: 1 byte.
+            if (len == 64) {
+                _state->client_got_first_msg.store(true, 
std::memory_order_release);
+            } else if (len == 1) {
+                _state->client_got_second_msg.store(true, 
std::memory_order_release);
+            }
+        }
+        return 0;
+    }
+
+    void on_idle_timeout(brpc::StreamId /*id*/) override {}
+
+    void on_closed(brpc::StreamId /*id*/) override {}
+
+    void on_failed(brpc::StreamId /*id*/, int /*error_code*/, const 
std::string& /*error_text*/) override {}
+
+private:
+    BatchStreamFeedbackRaceState* _state;
+};
+
+static void* SendTwoMessagesOnServerExtraStream(void* arg) {
+    auto* state = static_cast<BatchStreamFeedbackRaceState*>(arg);
+    const brpc::StreamId sid = state->server_extra_stream_id;
+
+    // Wait until server-side stream is connected.
+    const int64_t connect_deadline_us = butil::gettimeofday_us() + 2 * 1000 * 
1000L;
+    bool connected = false;
+    while (butil::gettimeofday_us() < connect_deadline_us) {
+        brpc::SocketUniquePtr ptr;
+        if (brpc::Socket::Address(sid, &ptr) == 0) {
+            brpc::Stream* s = static_cast<brpc::Stream*>(ptr->conn());
+            if (s->_host_socket != NULL && s->_connected) {
+                connected = true;
+                break;
+            }
+        }
+        usleep(1000);
+    }
+
+    if (!connected) {
+        state->server_first_write_rc.store(ETIMEDOUT, 
std::memory_order_relaxed);
+        state->server_second_write_rc.store(ETIMEDOUT, 
std::memory_order_relaxed);
+        state->server_write_done.store(true, std::memory_order_release);
+        return NULL;
+    }
+
+    // 1) Send a payload exactly equal to max_buf_size(64).
+    {
+        std::string payload(64, 'a');
+        butil::IOBuf out;
+        out.append(payload);
+        state->server_first_write_rc.store(brpc::StreamWrite(sid, out), 
std::memory_order_relaxed);
+    }
+
+    // 2) Then send another byte. This write should become writable only after
+    // client sends FEEDBACK with consumed_size >= 64.
+    const int64_t write_deadline_us = butil::gettimeofday_us() + 2 * 1000 * 
1000L;
+    int rc = -1;
+    while (butil::gettimeofday_us() < write_deadline_us) {
+        butil::IOBuf out;
+        out.append("b", 1);
+        rc = brpc::StreamWrite(sid, out);
+        if (rc == 0) {
+            break;
+        }
+        if (rc != EAGAIN) {
+            break;
+        }
+        const timespec duetime = butil::milliseconds_from_now(100);
+        (void)brpc::StreamWait(sid, &duetime);
+    }
+    state->server_second_write_rc.store(rc, std::memory_order_relaxed);
+    state->server_write_done.store(true, std::memory_order_release);
+    return NULL;
+}
+
+class MyServiceWithBatchStream : public test::EchoService {
+public:
+    MyServiceWithBatchStream(const brpc::StreamOptions& options,
+                             BatchStreamFeedbackRaceState* state)
+        : _options(options), _state(state) {}
+
+    void Echo(::google::protobuf::RpcController* controller,
+              const ::test::EchoRequest* request,
+              ::test::EchoResponse* response,
+              ::google::protobuf::Closure* done) override {
+        brpc::ClosureGuard done_gurad(done);
+        response->set_message(request->message());

Review Comment:
   Typo in variable name: `done_gurad` should be `done_guard` for clarity and 
consistency with `brpc::ClosureGuard`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to