This is an automated email from the ASF dual-hosted git repository.

wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d1df9f4 Add options to control whether write to socket in background 
thread (#2280)
7d1df9f4 is described below

commit 7d1df9f4b430774f58fd9462c523dfe2c7d3642c
Author: KevinChou <mrgui...@gmail.com>
AuthorDate: Wed Aug 16 15:39:04 2023 +0800

    Add options to control whether write to socket in background thread (#2280)
---
 src/brpc/controller.cpp |  1 +
 src/brpc/controller.h   | 12 ++++++++++++
 src/brpc/socket.cpp     |  2 +-
 src/brpc/socket.h       | 12 +++++++++++-
 src/brpc/stream.cpp     | 10 +++++++---
 src/brpc/stream.h       | 15 ++++++++++++++-
 src/brpc/stream_impl.h  |  3 ++-
 7 files changed, 48 insertions(+), 7 deletions(-)

diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp
index 338ad66c..eb8ea8a9 100644
--- a/src/brpc/controller.cpp
+++ b/src/brpc/controller.cpp
@@ -1194,6 +1194,7 @@ void Controller::IssueRPC(int64_t start_realtime_us) {
     wopt.pipelined_count = _pipelined_count;
     wopt.auth_flags = _auth_flags;
     wopt.ignore_eovercrowded = has_flag(FLAGS_IGNORE_EOVERCROWDED);
+    wopt.write_in_background = write_to_socket_in_background();
     int rc;
     size_t packet_size = 0;
     if (user_packet_guard) {
diff --git a/src/brpc/controller.h b/src/brpc/controller.h
index f27dd54f..144aa118 100644
--- a/src/brpc/controller.h
+++ b/src/brpc/controller.h
@@ -144,6 +144,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
     static const uint32_t FLAGS_HEALTH_CHECK_CALL = (1 << 19);
     static const uint32_t FLAGS_PB_SINGLE_REPEATED_TO_ARRAY = (1 << 20);
     static const uint32_t FLAGS_MANAGE_HTTP_BODY_ON_ERROR = (1 << 21);
+    static const uint32_t FLAGS_WRITE_TO_SOCKET_IN_BACKGROUND = (1 << 22);
 
 public:
     struct Inheritable {
@@ -353,6 +354,17 @@ public:
     bool is_done_allowed_to_run_in_place() const
     { return has_flag(FLAGS_ALLOW_DONE_TO_RUN_IN_PLACE); }
 
+    // Create a background KEEPWRITE bthread to write to socket when issuing
+    // RPCs, instead of trying to write to socket once in calling thread (see
+    // `Socket::StartWrite` in socket.cpp).
+    // The socket write could take some time (several microseconds maybe), if
+    // you cares about it and don't want the calling thread to be blocked, you
+    // can set this flag.
+    // Should provides better batch effect in situations like when you are
+    // continually issuing lots of async RPC calls in only one thread.
+    void set_write_to_socket_in_background(bool f) { 
set_flag(FLAGS_WRITE_TO_SOCKET_IN_BACKGROUND, f); }
+    bool write_to_socket_in_background() const { return 
has_flag(FLAGS_WRITE_TO_SOCKET_IN_BACKGROUND); }
+
     // ------------------------------------------------------------------------
     //                      Server-side methods.
     // These calls shall be made from the server side only. Their results are
diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp
index c49ca083..a634dd37 100644
--- a/src/brpc/socket.cpp
+++ b/src/brpc/socket.cpp
@@ -1687,7 +1687,7 @@ int Socket::StartWrite(WriteRequest* req, const 
WriteOptions& opt) {
     // in some protocols(namely RTMP).
     req->Setup(this);
     
-    if (ssl_state() != SSL_OFF) {
+    if (opt.write_in_background || ssl_state() != SSL_OFF) {
         // Writing into SSL may block the current bthread, always write
         // in the background.
         goto KEEPWRITE_IN_BACKGROUND;
diff --git a/src/brpc/socket.h b/src/brpc/socket.h
index eff9474c..74f7360e 100644
--- a/src/brpc/socket.h
+++ b/src/brpc/socket.h
@@ -289,10 +289,20 @@ public:
         // Default: false
         bool ignore_eovercrowded;
 
+        // The calling thread directly creates KeepWrite thread to write into
+        // this socket, skipping writing once.
+        // In situations like when you are continually issuing lots of
+        // StreamWrite or async RPC calls in only one thread, directly creating
+        // KeepWrite thread at first provides batch write effect and better
+        // performance. Otherwise, each write only writes one `msg` into socket
+        // and no KeepWrite thread can be created, which brings poor
+        // performance.
+        bool write_in_background;
+
         WriteOptions()
             : id_wait(INVALID_BTHREAD_ID), abstime(NULL)
             , pipelined_count(0), auth_flags(0)
-            , ignore_eovercrowded(false) {}
+            , ignore_eovercrowded(false), write_in_background(false) {}
     };
     int Write(butil::IOBuf *msg, const WriteOptions* options = NULL);
 
diff --git a/src/brpc/stream.cpp b/src/brpc/stream.cpp
index d8466d2a..8b346bd6 100644
--- a/src/brpc/stream.cpp
+++ b/src/brpc/stream.cpp
@@ -271,7 +271,8 @@ void Stream::TriggerOnConnectIfNeed() {
     bthread_mutex_unlock(&_connect_mutex);
 }
 
-int Stream::AppendIfNotFull(const butil::IOBuf &data) {
+int Stream::AppendIfNotFull(const butil::IOBuf &data,
+                            const StreamWriteOptions* options) {
     if (_cur_buf_size > 0) {
         std::unique_lock<bthread_mutex_t> lck(_congestion_control_mutex);
         if (_produced >= _remote_consumed + _cur_buf_size) {
@@ -290,7 +291,9 @@ int Stream::AppendIfNotFull(const butil::IOBuf &data) {
 
     size_t data_length = data.length();
     butil::IOBuf copied_data(data);
-    const int rc = _fake_socket_weak_ref->Write(&copied_data);
+    Socket::WriteOptions wopt;
+    wopt.write_in_background = options != NULL && options->write_in_background;
+    const int rc = _fake_socket_weak_ref->Write(&copied_data, &wopt);
     if (rc != 0) {
         // Stream may be closed by peer before
         LOG(WARNING) << "Fail to write to _fake_socket, " << berror();
@@ -679,7 +682,8 @@ void Stream::HandleRpcResponse(butil::IOBuf* 
response_buffer) {
     policy::ProcessRpcResponse(msg);
 }
 
-int StreamWrite(StreamId stream_id, const butil::IOBuf &message) {
+int StreamWrite(StreamId stream_id, const butil::IOBuf &message,
+                const StreamWriteOptions* options) {
     SocketUniquePtr ptr;
     if (Socket::Address(stream_id, &ptr) != 0) {
         return EINVAL;
diff --git a/src/brpc/stream.h b/src/brpc/stream.h
index fbf2d51d..410a5a09 100644
--- a/src/brpc/stream.h
+++ b/src/brpc/stream.h
@@ -82,6 +82,18 @@ struct StreamOptions {
     StreamInputHandler* handler;
 };
 
+struct StreamWriteOptions
+{
+    StreamWriteOptions() : write_in_background(false) {}
+
+    // Write message to socket in background thread.
+    // Provides batch write effect and better performance in situations when
+    // you are continually issuing lots of StreamWrite or async RPC calls in
+    // only one thread. Otherwise, each StreamWrite directly writes message 
into
+    // socket and brings poor performance.
+    bool write_in_background;
+};
+
 // [Called at the client side]
 // Create a stream at client-side along with the |cntl|, which will be 
connected
 // when receiving the response with a stream from server-side. If |options| is
@@ -104,7 +116,8 @@ int StreamAccept(StreamId* response_stream, Controller 
&cntl,
 //  - EAGAIN: |stream_id| is created with positive |max_buf_size| and buf size
 //            which the remote side hasn't consumed yet excceeds the number.
 //  - EINVAL: |stream_id| is invalied or has been closed
-int StreamWrite(StreamId stream_id, const butil::IOBuf &message);
+int StreamWrite(StreamId stream_id, const butil::IOBuf &message,
+                const StreamWriteOptions* options = NULL);
 
 // Write util the pending buffer size is less than |max_buf_size| or orrur
 // occurs
diff --git a/src/brpc/stream_impl.h b/src/brpc/stream_impl.h
index 259f0b77..f24b75a3 100644
--- a/src/brpc/stream_impl.h
+++ b/src/brpc/stream_impl.h
@@ -42,7 +42,8 @@ public:
 
     // --------------------- SocketConnection --------------
 
-    int AppendIfNotFull(const butil::IOBuf& msg);
+    int AppendIfNotFull(const butil::IOBuf& msg,
+                        const StreamWriteOptions* options = NULL);
     static int Create(const StreamOptions& options,
                       const StreamSettings *remote_settings,
                       StreamId *id);


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org
For additional commands, e-mail: dev-h...@brpc.apache.org

Reply via email to