This is an automated email from the ASF dual-hosted git repository.

wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new f4cd44a6 limit total streams oncomsume bytes in one socket (#1958)
f4cd44a6 is described below

commit f4cd44a690738ad089080d51e2939fdb0323d8c4
Author: baiyang <[email protected]>
AuthorDate: Thu Nov 10 17:31:29 2022 +0800

    limit total streams oncomsume bytes in one socket (#1958)
---
 src/brpc/socket.cpp    |  8 +++++++
 src/brpc/socket.h      |  1 +
 src/brpc/stream.cpp    | 57 +++++++++++++++++++++++++++++++++++++++++---------
 src/brpc/stream.h      |  8 ++++++-
 src/brpc/stream_impl.h |  1 +
 5 files changed, 64 insertions(+), 11 deletions(-)

diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp
index fb5ae19d..b477127a 100644
--- a/src/brpc/socket.cpp
+++ b/src/brpc/socket.cpp
@@ -85,6 +85,10 @@ DEFINE_int64(socket_max_unwritten_bytes, 64 * 1024 * 1024,
              "Max unwritten bytes in each socket, if the limit is reached,"
              " Socket.Write fails with EOVERCROWDED");
 
+DEFINE_int64(socket_max_streams_unconsumed_bytes, 0,
+             "Max stream receivers' unconsumed bytes in one socket,"
+             " it used in stream for receiver buffer control.");
+
 DEFINE_int32(max_connection_pool_size, 100,
              "Max number of pooled connections to a single endpoint");
 BRPC_VALIDATE_GFLAG(max_connection_pool_size, PassValidate);
@@ -459,6 +463,7 @@ Socket::Socket(Forbidden)
     , _epollout_butex(NULL)
     , _write_head(NULL)
     , _stream_set(NULL)
+    , _total_streams_unconsumed_size(0)
     , _ninflight_app_health_check(0)
 {
     CreateVarsOnce();
@@ -660,6 +665,7 @@ int Socket::Create(const SocketOptions& options, SocketId* 
id) {
     m->_error_code = 0;
     m->_error_text.clear();
     m->_agent_socket_id.store(INVALID_SOCKET_ID, butil::memory_order_relaxed);
+    m->_total_streams_unconsumed_size.store(0, butil::memory_order_relaxed);
     m->_ninflight_app_health_check.store(0, butil::memory_order_relaxed);
     // NOTE: last two params are useless in bthread > r32787
     const int rc = bthread_id_list_init(&m->_id_wait_list, 512, 512);
@@ -2234,6 +2240,8 @@ void Socket::DebugSocket(std::ostream& os, SocketId id) {
        << "\nlogoff_flag=" << 
ptr->_logoff_flag.load(butil::memory_order_relaxed)
        << "\n_additional_ref_status="
        << ptr->_additional_ref_status.load(butil::memory_order_relaxed)
+       << "\ntotal_streams_buffer_size="
+       << ptr->_total_streams_unconsumed_size.load(butil::memory_order_relaxed)
        << "\nninflight_app_health_check="
        << ptr->_ninflight_app_health_check.load(butil::memory_order_relaxed)
        << "\nagent_socket_id=";
diff --git a/src/brpc/socket.h b/src/brpc/socket.h
index ac3d4deb..014e79b0 100644
--- a/src/brpc/socket.h
+++ b/src/brpc/socket.h
@@ -870,6 +870,7 @@ private:
 
     butil::Mutex _stream_mutex;
     std::set<StreamId> *_stream_set;
+    butil::atomic<int64_t> _total_streams_unconsumed_size;
 
     butil::atomic<int64_t> _ninflight_app_health_check;
 };
diff --git a/src/brpc/stream.cpp b/src/brpc/stream.cpp
index 67b3541a..d8466d2a 100644
--- a/src/brpc/stream.cpp
+++ b/src/brpc/stream.cpp
@@ -35,6 +35,7 @@
 namespace brpc {
 
 DECLARE_bool(usercode_in_pthread);
+DECLARE_int64(socket_max_streams_unconsumed_bytes);
 
 const static butil::IOBuf *TIMEOUT_TASK = (butil::IOBuf*)-1L;
 
@@ -45,6 +46,7 @@ Stream::Stream()
     , _closed(false)
     , _produced(0)
     , _remote_consumed(0)
+    , _cur_buf_size(0)
     , _local_consumed(0)
     , _parse_rpc_response(false)
     , _pending_buf(NULL)
@@ -72,6 +74,16 @@ int Stream::Create(const StreamOptions &options,
     s->_connected = false;
     s->_options = options;
     s->_closed = false;
+    s->_cur_buf_size = options.max_buf_size;
+    if (options.max_buf_size > 0 && options.min_buf_size > 
options.max_buf_size) {
+        // set 0 if min_buf_size is invalid.
+        s->_options.min_buf_size = 0;
+        LOG(WARNING) << "options.min_buf_size is larger than 
options.max_buf_size, it will be set to 0.";
+    }
+    if (FLAGS_socket_max_streams_unconsumed_bytes > 0 && 
s->_options.min_buf_size > 0) {
+        s->_cur_buf_size = s->_options.min_buf_size;
+    }
+
     if (remote_settings != NULL) {
         s->_remote_settings.MergeFrom(*remote_settings);
         s->_parse_rpc_response = false;
@@ -260,9 +272,9 @@ void Stream::TriggerOnConnectIfNeed() {
 }
 
 int Stream::AppendIfNotFull(const butil::IOBuf &data) {
-    if (_options.max_buf_size > 0) {
+    if (_cur_buf_size > 0) {
         std::unique_lock<bthread_mutex_t> lck(_congestion_control_mutex);
-        if (_produced >= _remote_consumed + (size_t)_options.max_buf_size) {
+        if (_produced >= _remote_consumed + _cur_buf_size) {
             const size_t saved_produced = _produced;
             const size_t saved_remote_consumed = _remote_consumed;
             lck.unlock();
@@ -270,25 +282,30 @@ int Stream::AppendIfNotFull(const butil::IOBuf &data) {
                      << "_produced=" << saved_produced
                      << " _remote_consumed=" << saved_remote_consumed
                      << " gap=" << saved_produced - saved_remote_consumed
-                     << " max_buf_size=" << _options.max_buf_size;
+                     << " max_buf_size=" << _cur_buf_size;
             return 1;
         }
         _produced += data.length();
     }
+
+    size_t data_length = data.length();
     butil::IOBuf copied_data(data);
     const int rc = _fake_socket_weak_ref->Write(&copied_data);
     if (rc != 0) {
         // Stream may be closed by peer before
         LOG(WARNING) << "Fail to write to _fake_socket, " << berror();
         BAIDU_SCOPED_LOCK(_congestion_control_mutex);
-        _produced -= data.length();
+        _produced -= data_length;
         return -1;
     }
+    if (FLAGS_socket_max_streams_unconsumed_bytes > 0) {
+        _host_socket->_total_streams_unconsumed_size += data_length;
+    }
     return 0;
 }
 
 void Stream::SetRemoteConsumed(size_t new_remote_consumed) {
-    CHECK(_options.max_buf_size > 0);
+    CHECK(_cur_buf_size > 0);
     bthread_id_list_t tmplist;
     bthread_id_list_init(&tmplist, 0, 0);
     bthread_mutex_lock(&_congestion_control_mutex);
@@ -296,9 +313,28 @@ void Stream::SetRemoteConsumed(size_t new_remote_consumed) 
{
         bthread_mutex_unlock(&_congestion_control_mutex);
         return;
     }
-    const bool was_full = _produced >= _remote_consumed + 
(size_t)_options.max_buf_size;
+    const bool was_full = _produced >= _remote_consumed + _cur_buf_size;
+
+    if (FLAGS_socket_max_streams_unconsumed_bytes > 0) {
+        _host_socket->_total_streams_unconsumed_size -= new_remote_consumed - 
_remote_consumed;
+        if (_host_socket->_total_streams_unconsumed_size > 
FLAGS_socket_max_streams_unconsumed_bytes) {
+            if (_options.min_buf_size > 0) {
+                _cur_buf_size = _options.min_buf_size;
+            } else {
+                _cur_buf_size /= 2;
+            }
+            LOG(INFO) << "stream consumers on socket " << _host_socket->id() 
<< " is crowded, " <<  "cut stream " << id() << " buffer to " << _cur_buf_size;
+        } else if (_produced >= new_remote_consumed + _cur_buf_size && 
(_options.max_buf_size <= 0 || _cur_buf_size < (size_t)_options.max_buf_size)) {
+            if (_options.max_buf_size > 0 && _cur_buf_size * 2 > 
(size_t)_options.max_buf_size) {
+                _cur_buf_size = _options.max_buf_size;
+            } else {
+                _cur_buf_size *= 2;
+            }
+        }
+    }
+
     _remote_consumed = new_remote_consumed;
-    const bool is_full = _produced >= _remote_consumed + 
(size_t)_options.max_buf_size;
+    const bool is_full = _produced >= _remote_consumed + _cur_buf_size;
     if (was_full && !is_full) {
         bthread_id_list_swap(&tmplist, &_writable_wait_list);
     }
@@ -374,8 +410,8 @@ void Stream::Wait(void (*on_writable)(StreamId, void*, 
int), void* arg,
         }
     }
     bthread_mutex_lock(&_congestion_control_mutex);
-    if (_options.max_buf_size <= 0 
-            || _produced < _remote_consumed + (size_t)_options.max_buf_size) {
+    if (_cur_buf_size <= 0 
+            || _produced < _remote_consumed + _cur_buf_size) {
         bthread_mutex_unlock(&_congestion_control_mutex);
         CHECK_EQ(0, TriggerOnWritable(wait_id, wm, 0));
         return;
@@ -524,6 +560,7 @@ int Stream::Consume(void *meta, 
bthread::TaskIterator<butil::IOBuf*>& iter) {
         }
     }
     mb.flush();
+
     if (s->_remote_settings.need_feedback() && mb.total_length() > 0) {
         s->_local_consumed += mb.total_length();
         s->SendFeedback();
@@ -560,7 +597,7 @@ int Stream::SetHostSocket(Socket *host_socket) {
 
 void Stream::FillSettings(StreamSettings *settings) {
     settings->set_stream_id(id());
-    settings->set_need_feedback(_options.max_buf_size > 0);
+    settings->set_need_feedback(_cur_buf_size > 0);
     settings->set_writable(_options.handler != NULL);
 }
 
diff --git a/src/brpc/stream.h b/src/brpc/stream.h
index 53a5fc4b..fbf2d51d 100644
--- a/src/brpc/stream.h
+++ b/src/brpc/stream.h
@@ -49,12 +49,18 @@ public:
 
 struct StreamOptions {
     StreamOptions()
-        : max_buf_size(2 * 1024 * 1024)
+        : min_buf_size(1024 * 1024)
+        , max_buf_size(2 * 1024 * 1024)
         , idle_timeout_ms(-1)
         , messages_in_batch(128)
         , handler(NULL)
     {}
 
+    // stream max buffer size limit in [min_buf_size, max_buf_size]
+    // If |min_buf_size| <= 0, there's no min size limit of buf size
+    // default: 1048576 (1M)
+    int min_buf_size;
+
     // The max size of unconsumed data allowed at remote side. 
     // If |max_buf_size| <= 0, there's no limit of buf size
     // default: 2097152 (2M)
diff --git a/src/brpc/stream_impl.h b/src/brpc/stream_impl.h
index 4adb9ac9..259f0b77 100644
--- a/src/brpc/stream_impl.h
+++ b/src/brpc/stream_impl.h
@@ -114,6 +114,7 @@ friend class MessageBatcher;
     bthread_mutex_t _congestion_control_mutex;
     size_t _produced;
     size_t _remote_consumed;
+    size_t _cur_buf_size;
     bthread_id_list_t _writable_wait_list;
 
     int64_t _local_consumed;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to