This is an automated email from the ASF dual-hosted git repository.
wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new d1d61758 Make batch create stream SendFeedback thread safe (#3215)
d1d61758 is described below
commit d1d61758c81899cd39e3221c5c914c973b68c8d8
Author: Jenrry You <[email protected]>
AuthorDate: Thu Mar 26 15:13:22 2026 +0800
Make batch create stream SendFeedback thread safe (#3215)
---
src/brpc/stream.cpp | 34 ++++-
src/brpc/stream_impl.h | 7 +-
test/brpc_streaming_rpc_unittest.cpp | 236 ++++++++++++++++++++++++++++++++++-
3 files changed, 267 insertions(+), 10 deletions(-)
diff --git a/src/brpc/stream.cpp b/src/brpc/stream.cpp
index 2a443054..a2a106a8 100644
--- a/src/brpc/stream.cpp
+++ b/src/brpc/stream.cpp
@@ -52,6 +52,7 @@ Stream::Stream()
, _remote_consumed(0)
, _cur_buf_size(0)
, _local_consumed(0)
+ , _atomic_local_consumed(0)
, _parse_rpc_response(false)
, _pending_buf(NULL)
, _start_idle_timer_us(0)
@@ -287,7 +288,7 @@ void Stream::SetConnected(const StreamSettings*
remote_settings) {
CHECK(_host_socket != NULL);
RPC_VLOG << "stream=" << id() << " is connected to stream_id="
<< _remote_settings.stream_id() << " at host_socket=" <<
*_host_socket;
- _connected = true;
+ _connected.store(true, butil::memory_order_release);
_connect_meta.ec = 0;
TriggerOnConnectIfNeed();
if (remote_settings == NULL) {
@@ -295,6 +296,13 @@ void Stream::SetConnected(const StreamSettings*
remote_settings) {
// Client-side timer would triggered in Consume after received the
first
// message which is the very RPC response
StartIdleTimer();
+ } else {
+ // send first feedback for client-side stream if it already consumed
data
+ if (_remote_settings.need_feedback()) {
+ auto consumed_bytes =
_atomic_local_consumed.load(butil::memory_order_acquire);
+ if (consumed_bytes > 0)
+ SendFeedback(consumed_bytes);
+ }
}
}
@@ -620,20 +628,34 @@ int Stream::Consume(void *meta,
bthread::TaskIterator<butil::IOBuf*>& iter) {
}
mb.flush();
- if (s->_remote_settings.need_feedback() && mb.total_length() > 0) {
- s->_local_consumed += mb.total_length();
- s->SendFeedback();
+ auto total_length = mb.total_length();
+ if (total_length > 0) {
+ // fast path for connected stream
+ if (s->_connected.load(butil::memory_order_acquire)){
+ if (s->_remote_settings.need_feedback()) {
+ s->_local_consumed += total_length;
+ s->SendFeedback(s->_local_consumed);
+ }
+ } else {
+ // Under the scenario of batch creation of Streams, there is
concurrency between SetConnected and Consume for the same stream,
+ // and it is necessary to ensure the memory order.
+ s->_local_consumed =
s->_atomic_local_consumed.fetch_add(total_length, butil::memory_order_release)
+ total_length;
+ if (s->_connected.load(butil::memory_order_acquire) &&
s->_remote_settings.need_feedback()) {
+ s->SendFeedback(s->_local_consumed);
+ }
+ }
}
+
s->StartIdleTimer();
return 0;
}
-void Stream::SendFeedback() {
+void Stream::SendFeedback(int64_t _consumed_bytes) {
StreamFrameMeta fm;
fm.set_frame_type(FRAME_TYPE_FEEDBACK);
fm.set_stream_id(_remote_settings.stream_id());
fm.set_source_stream_id(id());
- fm.mutable_feedback()->set_consumed_size(_local_consumed);
+ fm.mutable_feedback()->set_consumed_size(_consumed_bytes);
butil::IOBuf out;
policy::PackStreamMessage(&out, fm, NULL);
WriteToHostSocket(&out);
diff --git a/src/brpc/stream_impl.h b/src/brpc/stream_impl.h
index 5ff7cb04..284b33ca 100644
--- a/src/brpc/stream_impl.h
+++ b/src/brpc/stream_impl.h
@@ -81,7 +81,7 @@ friend struct butil::DefaultDeleter<Stream>;
void TriggerOnConnectIfNeed();
void Wait(void (*on_writable)(StreamId, void*, int), void* arg,
const timespec* due_time, bool new_thread, bthread_id_t
*join_id);
- void SendFeedback();
+ void SendFeedback(int64_t _consumed_bytes);
void StartIdleTimer();
void StopIdleTimer();
void HandleRpcResponse(butil::IOBuf* response_buffer);
@@ -115,7 +115,7 @@ friend struct butil::DefaultDeleter<Stream>;
bthread_mutex_t _connect_mutex;
ConnectMeta _connect_meta;
- bool _connected;
+ butil::atomic<bool> _connected;
bool _closed;
int _error_code;
std::string _error_text;
@@ -127,7 +127,8 @@ friend struct butil::DefaultDeleter<Stream>;
bthread_id_list_t _writable_wait_list;
int64_t _local_consumed;
- StreamSettings _remote_settings;
+ butil::atomic<int64_t> _atomic_local_consumed;
+ StreamSettings _remote_settings;
bool _parse_rpc_response;
bthread::ExecutionQueueId<butil::IOBuf*> _consumer_queue;
diff --git a/test/brpc_streaming_rpc_unittest.cpp
b/test/brpc_streaming_rpc_unittest.cpp
index 056ea9a9..ecb88c61 100644
--- a/test/brpc_streaming_rpc_unittest.cpp
+++ b/test/brpc_streaming_rpc_unittest.cpp
@@ -20,10 +20,12 @@
// Date: 2015/10/22 16:28:44
#include <gtest/gtest.h>
+#include <atomic>
#include "brpc/server.h"
#include "brpc/controller.h"
#include "brpc/channel.h"
+#include "brpc/callback.h"
#include "brpc/socket.h"
#include "brpc/stream_impl.h"
#include "brpc/policy/streaming_rpc_protocol.h"
@@ -54,7 +56,7 @@ public:
const ::test::EchoRequest* request,
::test::EchoResponse* response,
::google::protobuf::Closure* done) {
- brpc::ClosureGuard done_gurad(done);
+ brpc::ClosureGuard done_guard(done);
response->set_message(request->message());
brpc::Controller* cntl = (brpc::Controller*)controller;
brpc::StreamId response_stream;
@@ -78,6 +80,158 @@ protected:
test::EchoResponse response;
};
+struct BatchStreamFeedbackRaceState {
+ brpc::StreamId server_first_stream_id{brpc::INVALID_STREAM_ID};
+ brpc::StreamId server_extra_stream_id{brpc::INVALID_STREAM_ID};
+ brpc::StreamId client_extra_stream_id{brpc::INVALID_STREAM_ID};
+
+ std::atomic<int> server_first_write_rc{-1};
+ std::atomic<int> server_second_write_rc{-1};
+ std::atomic<bool> client_got_first_msg{false};
+ std::atomic<bool> client_got_second_msg{false};
+ std::atomic<bool> server_write_done{false};
+ std::atomic<bool> rpc_done{false};
+
+ bthread_t server_send_tid{0};
+ std::atomic<bool> server_send_started{false};
+};
+
+class BatchStreamClientHandler : public brpc::StreamInputHandler {
+public:
+ explicit BatchStreamClientHandler(BatchStreamFeedbackRaceState* state)
+ : _state(state) {}
+
+ int on_received_messages(brpc::StreamId id,
+ butil::IOBuf* const messages[],
+ size_t size) override {
+ if (id != _state->client_extra_stream_id) {
+ // This test only cares about extra stream in batch creation.
+ return 0;
+ }
+ for (size_t i = 0; i < size; ++i) {
+ const size_t len = messages[i]->length();
+ messages[i]->clear();
+ // First payload: 64 bytes. Second payload: 1 byte.
+ if (len == 64) {
+ _state->client_got_first_msg.store(true,
std::memory_order_release);
+ } else if (len == 1) {
+ _state->client_got_second_msg.store(true,
std::memory_order_release);
+ }
+ }
+ return 0;
+ }
+
+ void on_idle_timeout(brpc::StreamId /*id*/) override {}
+
+ void on_closed(brpc::StreamId /*id*/) override {}
+
+ void on_failed(brpc::StreamId /*id*/, int /*error_code*/, const
std::string& /*error_text*/) override {}
+
+private:
+ BatchStreamFeedbackRaceState* _state;
+};
+
+static void* SendTwoMessagesOnServerExtraStream(void* arg) {
+ auto* state = static_cast<BatchStreamFeedbackRaceState*>(arg);
+ const brpc::StreamId sid = state->server_extra_stream_id;
+
+ // Wait until server-side stream is connected.
+ const int64_t connect_deadline_us = butil::gettimeofday_us() + 2 * 1000 *
1000L;
+ bool connected = false;
+ while (butil::gettimeofday_us() < connect_deadline_us) {
+ brpc::SocketUniquePtr ptr;
+ if (brpc::Socket::Address(sid, &ptr) == 0) {
+ brpc::Stream* s = static_cast<brpc::Stream*>(ptr->conn());
+ if (s->_host_socket != NULL && s->_connected) {
+ connected = true;
+ break;
+ }
+ }
+ usleep(1000);
+ }
+
+ if (!connected) {
+ state->server_first_write_rc.store(ETIMEDOUT,
std::memory_order_relaxed);
+ state->server_second_write_rc.store(ETIMEDOUT,
std::memory_order_relaxed);
+ state->server_write_done.store(true, std::memory_order_release);
+ return NULL;
+ }
+
+ // 1) Send a payload exactly equal to max_buf_size(64).
+ {
+ std::string payload(64, 'a');
+ butil::IOBuf out;
+ out.append(payload);
+ state->server_first_write_rc.store(brpc::StreamWrite(sid, out),
std::memory_order_relaxed);
+ }
+
+ // 2) Then send another byte. This write should become writable only after
+ // client sends FEEDBACK with consumed_size >= 64.
+ const int64_t write_deadline_us = butil::gettimeofday_us() + 2 * 1000 *
1000L;
+ int rc = -1;
+ while (butil::gettimeofday_us() < write_deadline_us) {
+ butil::IOBuf out;
+ out.append("b", 1);
+ rc = brpc::StreamWrite(sid, out);
+ if (rc == 0) {
+ break;
+ }
+ if (rc != EAGAIN) {
+ break;
+ }
+ const timespec duetime = butil::milliseconds_from_now(100);
+ (void)brpc::StreamWait(sid, &duetime);
+ }
+ state->server_second_write_rc.store(rc, std::memory_order_relaxed);
+ state->server_write_done.store(true, std::memory_order_release);
+ return NULL;
+}
+
+class MyServiceWithBatchStream : public test::EchoService {
+public:
+ MyServiceWithBatchStream(const brpc::StreamOptions& options,
+ BatchStreamFeedbackRaceState* state)
+ : _options(options), _state(state) {}
+
+ void Echo(::google::protobuf::RpcController* controller,
+ const ::test::EchoRequest* request,
+ ::test::EchoResponse* response,
+ ::google::protobuf::Closure* done) override {
+ brpc::ClosureGuard done_guard(done);
+ response->set_message(request->message());
+ brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
+
+ brpc::StreamIds response_streams;
+ ASSERT_EQ(0, brpc::StreamAccept(response_streams, *cntl, &_options));
+ ASSERT_EQ(2u, response_streams.size());
+ _state->server_first_stream_id = response_streams[0];
+ _state->server_extra_stream_id = response_streams[1];
+
+ bthread_t tid;
+ ASSERT_EQ(0, bthread_start_background(
+ &tid, &BTHREAD_ATTR_NORMAL,
+ SendTwoMessagesOnServerExtraStream, _state));
+ _state->server_send_tid = tid;
+ _state->server_send_started.store(true, std::memory_order_release);
+ }
+
+private:
+ brpc::StreamOptions _options;
+ BatchStreamFeedbackRaceState* _state;
+};
+
+static void SetAtomicTrue(std::atomic<bool>* f) {
+ f->store(true, std::memory_order_release);
+}
+
+static bool WaitForTrue(const std::atomic<bool>& f, int timeout_ms) {
+ const int64_t deadline_us = butil::gettimeofday_us() + (int64_t)timeout_ms
* 1000L;
+ while (!f.load(std::memory_order_acquire) && butil::gettimeofday_us() <
deadline_us) {
+ usleep(1000);
+ }
+ return f.load(std::memory_order_acquire);
+}
+
TEST_F(StreamingRpcTest, sanity) {
brpc::Server server;
MyServiceWithStream service;
@@ -98,6 +252,86 @@ TEST_F(StreamingRpcTest, sanity) {
server.Join();
}
+TEST_F(StreamingRpcTest, batch_create_stream_feedback_race) {
+ BatchStreamFeedbackRaceState state;
+ BatchStreamClientHandler client_handler(&state);
+
+ brpc::StreamOptions server_stream_opt;
+ // Make server-side sender sensitive to FEEDBACK quickly.
+ server_stream_opt.max_buf_size = 16;
+
+ brpc::Server server;
+ MyServiceWithBatchStream service(server_stream_opt, &state);
+ ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE));
+ ASSERT_EQ(0, server.Start(9007, NULL));
+
+ brpc::Channel channel;
+ ASSERT_EQ(0, channel.Init("127.0.0.1:9007", NULL));
+
+ brpc::Controller cntl;
+ brpc::StreamIds request_streams;
+ brpc::StreamOptions client_stream_opt;
+ client_stream_opt.handler = &client_handler;
+ client_stream_opt.max_buf_size = 0;
+ ASSERT_EQ(0, brpc::StreamCreate(request_streams, 2, cntl,
&client_stream_opt));
+ ASSERT_EQ(2u, request_streams.size());
+ state.client_extra_stream_id = request_streams[1];
+
+ // Block SetConnected() on the extra stream to enlarge the race window.
+ brpc::SocketUniquePtr client_extra_ptr;
+ ASSERT_EQ(0, brpc::Socket::Address(state.client_extra_stream_id,
&client_extra_ptr));
+ brpc::Stream* client_extra_stream =
static_cast<brpc::Stream*>(client_extra_ptr->conn());
+ bthread_mutex_lock(&client_extra_stream->_connect_mutex);
+ struct UnlockGuard {
+ bthread_mutex_t* m;
+ ~UnlockGuard() {
+ if (m) {
+ bthread_mutex_unlock(m);
+ }
+ }
+ } unlock_guard{&client_extra_stream->_connect_mutex};
+
+ BRPC_SCOPE_EXIT {
+ if (state.server_extra_stream_id != brpc::INVALID_STREAM_ID) {
+ brpc::StreamClose(state.server_extra_stream_id);
+ }
+ if (state.server_first_stream_id != brpc::INVALID_STREAM_ID) {
+ brpc::StreamClose(state.server_first_stream_id);
+ }
+ for (auto sid : request_streams) {
+ brpc::StreamClose(sid);
+ }
+
+ if (state.server_send_tid) {
+ bthread_join(state.server_send_tid, NULL);
+ }
+ server.Stop(0);
+ server.Join();
+ };
+
+ test::EchoService_Stub stub(&channel);
+ stub.Echo(&cntl, &request, &response, brpc::NewCallback(SetAtomicTrue,
&state.rpc_done));
+
+ // Wait until client consumes the first 64B payload on extra stream.
+ ASSERT_TRUE(WaitForTrue(state.client_got_first_msg, 2000));
+
+ // Unblock SetConnected(); the fix in PR 3215 should send the first
FEEDBACK
+ // with consumed_size=64 here, making server-side stream writable again.
+ bthread_mutex_unlock(&client_extra_stream->_connect_mutex);
+ unlock_guard.m = NULL;
+
+ ASSERT_TRUE(WaitForTrue(state.rpc_done, 2000));
+ ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
+
+ // Wait for server-side send thread to be started.
+ ASSERT_TRUE(WaitForTrue(state.server_send_started, 2000));
+
+ ASSERT_TRUE(WaitForTrue(state.server_write_done, 2000));
+ ASSERT_EQ(0, state.server_first_write_rc.load(std::memory_order_relaxed));
+ ASSERT_EQ(0, state.server_second_write_rc.load(std::memory_order_relaxed));
+ ASSERT_TRUE(WaitForTrue(state.client_got_second_msg, 2000));
+}
+
struct HandlerControl {
HandlerControl()
: block(false)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]