This is an automated email from the ASF dual-hosted git repository.
wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-brpc.git
The following commit(s) were added to refs/heads/master by this push:
new f4cd44a6 limit total streams oncomsume bytes in one socket (#1958)
f4cd44a6 is described below
commit f4cd44a690738ad089080d51e2939fdb0323d8c4
Author: baiyang <[email protected]>
AuthorDate: Thu Nov 10 17:31:29 2022 +0800
limit total streams oncomsume bytes in one socket (#1958)
---
src/brpc/socket.cpp | 8 +++++++
src/brpc/socket.h | 1 +
src/brpc/stream.cpp | 57 +++++++++++++++++++++++++++++++++++++++++---------
src/brpc/stream.h | 8 ++++++-
src/brpc/stream_impl.h | 1 +
5 files changed, 64 insertions(+), 11 deletions(-)
diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp
index fb5ae19d..b477127a 100644
--- a/src/brpc/socket.cpp
+++ b/src/brpc/socket.cpp
@@ -85,6 +85,10 @@ DEFINE_int64(socket_max_unwritten_bytes, 64 * 1024 * 1024,
"Max unwritten bytes in each socket, if the limit is reached,"
" Socket.Write fails with EOVERCROWDED");
+DEFINE_int64(socket_max_streams_unconsumed_bytes, 0,
+ "Max stream receivers' unconsumed bytes in one socket,"
+ " it used in stream for receiver buffer control.");
+
DEFINE_int32(max_connection_pool_size, 100,
"Max number of pooled connections to a single endpoint");
BRPC_VALIDATE_GFLAG(max_connection_pool_size, PassValidate);
@@ -459,6 +463,7 @@ Socket::Socket(Forbidden)
, _epollout_butex(NULL)
, _write_head(NULL)
, _stream_set(NULL)
+ , _total_streams_unconsumed_size(0)
, _ninflight_app_health_check(0)
{
CreateVarsOnce();
@@ -660,6 +665,7 @@ int Socket::Create(const SocketOptions& options, SocketId*
id) {
m->_error_code = 0;
m->_error_text.clear();
m->_agent_socket_id.store(INVALID_SOCKET_ID, butil::memory_order_relaxed);
+ m->_total_streams_unconsumed_size.store(0, butil::memory_order_relaxed);
m->_ninflight_app_health_check.store(0, butil::memory_order_relaxed);
// NOTE: last two params are useless in bthread > r32787
const int rc = bthread_id_list_init(&m->_id_wait_list, 512, 512);
@@ -2234,6 +2240,8 @@ void Socket::DebugSocket(std::ostream& os, SocketId id) {
<< "\nlogoff_flag=" <<
ptr->_logoff_flag.load(butil::memory_order_relaxed)
<< "\n_additional_ref_status="
<< ptr->_additional_ref_status.load(butil::memory_order_relaxed)
+ << "\ntotal_streams_buffer_size="
+ << ptr->_total_streams_unconsumed_size.load(butil::memory_order_relaxed)
<< "\nninflight_app_health_check="
<< ptr->_ninflight_app_health_check.load(butil::memory_order_relaxed)
<< "\nagent_socket_id=";
diff --git a/src/brpc/socket.h b/src/brpc/socket.h
index ac3d4deb..014e79b0 100644
--- a/src/brpc/socket.h
+++ b/src/brpc/socket.h
@@ -870,6 +870,7 @@ private:
butil::Mutex _stream_mutex;
std::set<StreamId> *_stream_set;
+ butil::atomic<int64_t> _total_streams_unconsumed_size;
butil::atomic<int64_t> _ninflight_app_health_check;
};
diff --git a/src/brpc/stream.cpp b/src/brpc/stream.cpp
index 67b3541a..d8466d2a 100644
--- a/src/brpc/stream.cpp
+++ b/src/brpc/stream.cpp
@@ -35,6 +35,7 @@
namespace brpc {
DECLARE_bool(usercode_in_pthread);
+DECLARE_int64(socket_max_streams_unconsumed_bytes);
const static butil::IOBuf *TIMEOUT_TASK = (butil::IOBuf*)-1L;
@@ -45,6 +46,7 @@ Stream::Stream()
, _closed(false)
, _produced(0)
, _remote_consumed(0)
+ , _cur_buf_size(0)
, _local_consumed(0)
, _parse_rpc_response(false)
, _pending_buf(NULL)
@@ -72,6 +74,16 @@ int Stream::Create(const StreamOptions &options,
s->_connected = false;
s->_options = options;
s->_closed = false;
+ s->_cur_buf_size = options.max_buf_size;
+ if (options.max_buf_size > 0 && options.min_buf_size >
options.max_buf_size) {
+ // set 0 if min_buf_size is invalid.
+ s->_options.min_buf_size = 0;
+ LOG(WARNING) << "options.min_buf_size is larger than
options.max_buf_size, it will be set to 0.";
+ }
+ if (FLAGS_socket_max_streams_unconsumed_bytes > 0 &&
s->_options.min_buf_size > 0) {
+ s->_cur_buf_size = s->_options.min_buf_size;
+ }
+
if (remote_settings != NULL) {
s->_remote_settings.MergeFrom(*remote_settings);
s->_parse_rpc_response = false;
@@ -260,9 +272,9 @@ void Stream::TriggerOnConnectIfNeed() {
}
int Stream::AppendIfNotFull(const butil::IOBuf &data) {
- if (_options.max_buf_size > 0) {
+ if (_cur_buf_size > 0) {
std::unique_lock<bthread_mutex_t> lck(_congestion_control_mutex);
- if (_produced >= _remote_consumed + (size_t)_options.max_buf_size) {
+ if (_produced >= _remote_consumed + _cur_buf_size) {
const size_t saved_produced = _produced;
const size_t saved_remote_consumed = _remote_consumed;
lck.unlock();
@@ -270,25 +282,30 @@ int Stream::AppendIfNotFull(const butil::IOBuf &data) {
<< "_produced=" << saved_produced
<< " _remote_consumed=" << saved_remote_consumed
<< " gap=" << saved_produced - saved_remote_consumed
- << " max_buf_size=" << _options.max_buf_size;
+ << " max_buf_size=" << _cur_buf_size;
return 1;
}
_produced += data.length();
}
+
+ size_t data_length = data.length();
butil::IOBuf copied_data(data);
const int rc = _fake_socket_weak_ref->Write(&copied_data);
if (rc != 0) {
// Stream may be closed by peer before
LOG(WARNING) << "Fail to write to _fake_socket, " << berror();
BAIDU_SCOPED_LOCK(_congestion_control_mutex);
- _produced -= data.length();
+ _produced -= data_length;
return -1;
}
+ if (FLAGS_socket_max_streams_unconsumed_bytes > 0) {
+ _host_socket->_total_streams_unconsumed_size += data_length;
+ }
return 0;
}
void Stream::SetRemoteConsumed(size_t new_remote_consumed) {
- CHECK(_options.max_buf_size > 0);
+ CHECK(_cur_buf_size > 0);
bthread_id_list_t tmplist;
bthread_id_list_init(&tmplist, 0, 0);
bthread_mutex_lock(&_congestion_control_mutex);
@@ -296,9 +313,28 @@ void Stream::SetRemoteConsumed(size_t new_remote_consumed)
{
bthread_mutex_unlock(&_congestion_control_mutex);
return;
}
- const bool was_full = _produced >= _remote_consumed +
(size_t)_options.max_buf_size;
+ const bool was_full = _produced >= _remote_consumed + _cur_buf_size;
+
+ if (FLAGS_socket_max_streams_unconsumed_bytes > 0) {
+ _host_socket->_total_streams_unconsumed_size -= new_remote_consumed -
_remote_consumed;
+ if (_host_socket->_total_streams_unconsumed_size >
FLAGS_socket_max_streams_unconsumed_bytes) {
+ if (_options.min_buf_size > 0) {
+ _cur_buf_size = _options.min_buf_size;
+ } else {
+ _cur_buf_size /= 2;
+ }
+ LOG(INFO) << "stream consumers on socket " << _host_socket->id()
<< " is crowded, " << "cut stream " << id() << " buffer to " << _cur_buf_size;
+ } else if (_produced >= new_remote_consumed + _cur_buf_size &&
(_options.max_buf_size <= 0 || _cur_buf_size < (size_t)_options.max_buf_size)) {
+ if (_options.max_buf_size > 0 && _cur_buf_size * 2 >
(size_t)_options.max_buf_size) {
+ _cur_buf_size = _options.max_buf_size;
+ } else {
+ _cur_buf_size *= 2;
+ }
+ }
+ }
+
_remote_consumed = new_remote_consumed;
- const bool is_full = _produced >= _remote_consumed +
(size_t)_options.max_buf_size;
+ const bool is_full = _produced >= _remote_consumed + _cur_buf_size;
if (was_full && !is_full) {
bthread_id_list_swap(&tmplist, &_writable_wait_list);
}
@@ -374,8 +410,8 @@ void Stream::Wait(void (*on_writable)(StreamId, void*,
int), void* arg,
}
}
bthread_mutex_lock(&_congestion_control_mutex);
- if (_options.max_buf_size <= 0
- || _produced < _remote_consumed + (size_t)_options.max_buf_size) {
+ if (_cur_buf_size <= 0
+ || _produced < _remote_consumed + _cur_buf_size) {
bthread_mutex_unlock(&_congestion_control_mutex);
CHECK_EQ(0, TriggerOnWritable(wait_id, wm, 0));
return;
@@ -524,6 +560,7 @@ int Stream::Consume(void *meta,
bthread::TaskIterator<butil::IOBuf*>& iter) {
}
}
mb.flush();
+
if (s->_remote_settings.need_feedback() && mb.total_length() > 0) {
s->_local_consumed += mb.total_length();
s->SendFeedback();
@@ -560,7 +597,7 @@ int Stream::SetHostSocket(Socket *host_socket) {
void Stream::FillSettings(StreamSettings *settings) {
settings->set_stream_id(id());
- settings->set_need_feedback(_options.max_buf_size > 0);
+ settings->set_need_feedback(_cur_buf_size > 0);
settings->set_writable(_options.handler != NULL);
}
diff --git a/src/brpc/stream.h b/src/brpc/stream.h
index 53a5fc4b..fbf2d51d 100644
--- a/src/brpc/stream.h
+++ b/src/brpc/stream.h
@@ -49,12 +49,18 @@ public:
struct StreamOptions {
StreamOptions()
- : max_buf_size(2 * 1024 * 1024)
+ : min_buf_size(1024 * 1024)
+ , max_buf_size(2 * 1024 * 1024)
, idle_timeout_ms(-1)
, messages_in_batch(128)
, handler(NULL)
{}
+ // stream max buffer size limit in [min_buf_size, max_buf_size]
+ // If |min_buf_size| <= 0, there's no min size limit of buf size
+ // default: 1048576 (1M)
+ int min_buf_size;
+
// The max size of unconsumed data allowed at remote side.
// If |max_buf_size| <= 0, there's no limit of buf size
// default: 2097152 (2M)
diff --git a/src/brpc/stream_impl.h b/src/brpc/stream_impl.h
index 4adb9ac9..259f0b77 100644
--- a/src/brpc/stream_impl.h
+++ b/src/brpc/stream_impl.h
@@ -114,6 +114,7 @@ friend class MessageBatcher;
bthread_mutex_t _congestion_control_mutex;
size_t _produced;
size_t _remote_consumed;
+ size_t _cur_buf_size;
bthread_id_list_t _writable_wait_list;
int64_t _local_consumed;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]