This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit b7c86d66faa5186316559a42a8965926b84d20ee
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Tue Jan 30 14:04:08 2024 +0800

    [improvement](thirdparty) introduce brpc stream patch to make stream write 
to socket in background bthread (#30458)
---
 .../patches/brpc-1.4.0-write-background.patch      | 170 +++++++++++++++++++++
 1 file changed, 170 insertions(+)

diff --git a/thirdparty/patches/brpc-1.4.0-write-background.patch 
b/thirdparty/patches/brpc-1.4.0-write-background.patch
new file mode 100644
index 00000000000..7c94df50654
--- /dev/null
+++ b/thirdparty/patches/brpc-1.4.0-write-background.patch
@@ -0,0 +1,170 @@
+diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp
+index b6c8e750..c52e9451 100644
+--- a/src/brpc/controller.cpp
++++ b/src/brpc/controller.cpp
+@@ -1169,6 +1169,7 @@ void Controller::IssueRPC(int64_t start_realtime_us) {
+     wopt.pipelined_count = _pipelined_count;
+     wopt.auth_flags = _auth_flags;
+     wopt.ignore_eovercrowded = has_flag(FLAGS_IGNORE_EOVERCROWDED);
++    wopt.write_in_background = write_to_socket_in_background();
+     int rc;
+     size_t packet_size = 0;
+     if (user_packet_guard) {
+diff --git a/src/brpc/controller.h b/src/brpc/controller.h
+index 658cc695..9221f583 100644
+--- a/src/brpc/controller.h
++++ b/src/brpc/controller.h
+@@ -144,6 +144,7 @@ friend void 
policy::ProcessThriftRequest(InputMessageBase*);
+     static const uint32_t FLAGS_HEALTH_CHECK_CALL = (1 << 19);
+     static const uint32_t FLAGS_PB_SINGLE_REPEATED_TO_ARRAY = (1 << 20);
+     static const uint32_t FLAGS_MANAGE_HTTP_BODY_ON_ERROR = (1 << 21);
++    static const uint32_t FLAGS_WRITE_TO_SOCKET_IN_BACKGROUND = (1 << 22);
+ 
+ public:
+     struct Inheritable {
+@@ -350,6 +351,17 @@ public:
+     bool is_done_allowed_to_run_in_place() const
+     { return has_flag(FLAGS_ALLOW_DONE_TO_RUN_IN_PLACE); }
+ 
++    // Create a background KEEPWRITE bthread to write to socket when issuing
++    // RPCs, instead of trying to write to socket once in calling thread (see
++    // `Socket::StartWrite` in socket.cpp).
++    // The socket write could take some time (several microseconds maybe), if
++    // you cares about it and don't want the calling thread to be blocked, you
++    // can set this flag.
++    // Should provides better batch effect in situations like when you are
++    // continually issuing lots of async RPC calls in only one thread.
++    void set_write_to_socket_in_background(bool f) { 
set_flag(FLAGS_WRITE_TO_SOCKET_IN_BACKGROUND, f); }
++    bool write_to_socket_in_background() const { return 
has_flag(FLAGS_WRITE_TO_SOCKET_IN_BACKGROUND); }
++
+     // 
------------------------------------------------------------------------
+     //                      Server-side methods.
+     // These calls shall be made from the server side only. Their results are
+diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp
+index e3878c19..27748434 100644
+--- a/src/brpc/socket.cpp
++++ b/src/brpc/socket.cpp
+@@ -1620,7 +1620,7 @@ int Socket::StartWrite(WriteRequest* req, const 
WriteOptions& opt) {
+     // in some protocols(namely RTMP).
+     req->Setup(this);
+     
+-    if (ssl_state() != SSL_OFF) {
++    if (opt.write_in_background || ssl_state() != SSL_OFF) {
+         // Writing into SSL may block the current bthread, always write
+         // in the background.
+         goto KEEPWRITE_IN_BACKGROUND;
+diff --git a/src/brpc/socket.h b/src/brpc/socket.h
+index 6f710ee2..28a7ada6 100644
+--- a/src/brpc/socket.h
++++ b/src/brpc/socket.h
+@@ -269,10 +269,20 @@ public:
+         // Default: false
+         bool ignore_eovercrowded;
+ 
++        // The calling thread directly creates KeepWrite thread to write into
++        // this socket, skipping writing once.
++        // In situations like when you are continually issuing lots of
++        // StreamWrite or async RPC calls in only one thread, directly 
creating
++        // KeepWrite thread at first provides batch write effect and better
++        // performance. Otherwise, each write only writes one `msg` into 
socket
++        // and no KeepWrite thread can be created, which brings poor
++        // performance.
++        bool write_in_background;
++
+         WriteOptions()
+             : id_wait(INVALID_BTHREAD_ID), abstime(NULL)
+             , pipelined_count(0), auth_flags(0)
+-            , ignore_eovercrowded(false) {}
++            , ignore_eovercrowded(false), write_in_background(false) {}
+     };
+     int Write(butil::IOBuf *msg, const WriteOptions* options = NULL);
+ 
+diff --git a/src/brpc/stream.cpp b/src/brpc/stream.cpp
+index d8466d2a..2d565759 100644
+--- a/src/brpc/stream.cpp
++++ b/src/brpc/stream.cpp
+@@ -271,7 +271,8 @@ void Stream::TriggerOnConnectIfNeed() {
+     bthread_mutex_unlock(&_connect_mutex);
+ }
+ 
+-int Stream::AppendIfNotFull(const butil::IOBuf &data) {
++int Stream::AppendIfNotFull(const butil::IOBuf &data,
++                            const StreamWriteOptions* options) {
+     if (_cur_buf_size > 0) {
+         std::unique_lock<bthread_mutex_t> lck(_congestion_control_mutex);
+         if (_produced >= _remote_consumed + _cur_buf_size) {
+@@ -290,7 +291,9 @@ int Stream::AppendIfNotFull(const butil::IOBuf &data) {
+ 
+     size_t data_length = data.length();
+     butil::IOBuf copied_data(data);
+-    const int rc = _fake_socket_weak_ref->Write(&copied_data);
++    Socket::WriteOptions wopt;
++    wopt.write_in_background = options != NULL && 
options->write_in_background;
++    const int rc = _fake_socket_weak_ref->Write(&copied_data, &wopt);
+     if (rc != 0) {
+         // Stream may be closed by peer before
+         LOG(WARNING) << "Fail to write to _fake_socket, " << berror();
+@@ -679,13 +682,14 @@ void Stream::HandleRpcResponse(butil::IOBuf* 
response_buffer) {
+     policy::ProcessRpcResponse(msg);
+ }
+ 
+-int StreamWrite(StreamId stream_id, const butil::IOBuf &message) {
++int StreamWrite(StreamId stream_id, const butil::IOBuf &message,
++                const StreamWriteOptions* options) {
+     SocketUniquePtr ptr;
+     if (Socket::Address(stream_id, &ptr) != 0) {
+         return EINVAL;
+     }
+     Stream* s = (Stream*)ptr->conn();
+-    const int rc = s->AppendIfNotFull(message);
++    const int rc = s->AppendIfNotFull(message, options);
+     if (rc == 0) {
+         return 0;
+     }
+diff --git a/src/brpc/stream.h b/src/brpc/stream.h
+index fbf2d51d..410a5a09 100644
+--- a/src/brpc/stream.h
++++ b/src/brpc/stream.h
+@@ -82,6 +82,18 @@ struct StreamOptions {
+     StreamInputHandler* handler;
+ };
+ 
++struct StreamWriteOptions
++{
++    StreamWriteOptions() : write_in_background(false) {}
++
++    // Write message to socket in background thread.
++    // Provides batch write effect and better performance in situations when
++    // you are continually issuing lots of StreamWrite or async RPC calls in
++    // only one thread. Otherwise, each StreamWrite directly writes message 
into
++    // socket and brings poor performance.
++    bool write_in_background;
++};
++
+ // [Called at the client side]
+ // Create a stream at client-side along with the |cntl|, which will be 
connected
+ // when receiving the response with a stream from server-side. If |options| is
+@@ -104,7 +116,8 @@ int StreamAccept(StreamId* response_stream, Controller 
&cntl,
+ //  - EAGAIN: |stream_id| is created with positive |max_buf_size| and buf size
+ //            which the remote side hasn't consumed yet excceeds the number.
+ //  - EINVAL: |stream_id| is invalied or has been closed
+-int StreamWrite(StreamId stream_id, const butil::IOBuf &message);
++int StreamWrite(StreamId stream_id, const butil::IOBuf &message,
++                const StreamWriteOptions* options = NULL);
+ 
+ // Write util the pending buffer size is less than |max_buf_size| or orrur
+ // occurs
+diff --git a/src/brpc/stream_impl.h b/src/brpc/stream_impl.h
+index 259f0b77..f24b75a3 100644
+--- a/src/brpc/stream_impl.h
++++ b/src/brpc/stream_impl.h
+@@ -42,7 +42,8 @@ public:
+ 
+     // --------------------- SocketConnection --------------
+ 
+-    int AppendIfNotFull(const butil::IOBuf& msg);
++    int AppendIfNotFull(const butil::IOBuf& msg,
++                        const StreamWriteOptions* options = NULL);
+     static int Create(const StreamOptions& options,
+                       const StreamSettings *remote_settings,
+                       StreamId *id);
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to