This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit b7c86d66faa5186316559a42a8965926b84d20ee Author: HHoflittlefish777 <[email protected]> AuthorDate: Tue Jan 30 14:04:08 2024 +0800 [improvement](thirdparty) introduce brpc stream patch to make stream write to socket in background bthread (#30458) --- .../patches/brpc-1.4.0-write-background.patch | 170 +++++++++++++++++++++ 1 file changed, 170 insertions(+) diff --git a/thirdparty/patches/brpc-1.4.0-write-background.patch b/thirdparty/patches/brpc-1.4.0-write-background.patch new file mode 100644 index 00000000000..7c94df50654 --- /dev/null +++ b/thirdparty/patches/brpc-1.4.0-write-background.patch @@ -0,0 +1,170 @@ +diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp +index b6c8e750..c52e9451 100644 +--- a/src/brpc/controller.cpp ++++ b/src/brpc/controller.cpp +@@ -1169,6 +1169,7 @@ void Controller::IssueRPC(int64_t start_realtime_us) { + wopt.pipelined_count = _pipelined_count; + wopt.auth_flags = _auth_flags; + wopt.ignore_eovercrowded = has_flag(FLAGS_IGNORE_EOVERCROWDED); ++ wopt.write_in_background = write_to_socket_in_background(); + int rc; + size_t packet_size = 0; + if (user_packet_guard) { +diff --git a/src/brpc/controller.h b/src/brpc/controller.h +index 658cc695..9221f583 100644 +--- a/src/brpc/controller.h ++++ b/src/brpc/controller.h +@@ -144,6 +144,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); + static const uint32_t FLAGS_HEALTH_CHECK_CALL = (1 << 19); + static const uint32_t FLAGS_PB_SINGLE_REPEATED_TO_ARRAY = (1 << 20); + static const uint32_t FLAGS_MANAGE_HTTP_BODY_ON_ERROR = (1 << 21); ++ static const uint32_t FLAGS_WRITE_TO_SOCKET_IN_BACKGROUND = (1 << 22); + + public: + struct Inheritable { +@@ -350,6 +351,17 @@ public: + bool is_done_allowed_to_run_in_place() const + { return has_flag(FLAGS_ALLOW_DONE_TO_RUN_IN_PLACE); } + ++ // Create a background KEEPWRITE bthread to write to socket when issuing ++ // RPCs, instead of trying to write to socket once in calling thread (see ++ // `Socket::StartWrite` in socket.cpp). ++ // The socket write could take some time (several microseconds maybe), if ++ // you cares about it and don't want the calling thread to be blocked, you ++ // can set this flag. ++ // Should provides better batch effect in situations like when you are ++ // continually issuing lots of async RPC calls in only one thread. ++ void set_write_to_socket_in_background(bool f) { set_flag(FLAGS_WRITE_TO_SOCKET_IN_BACKGROUND, f); } ++ bool write_to_socket_in_background() const { return has_flag(FLAGS_WRITE_TO_SOCKET_IN_BACKGROUND); } ++ + // ------------------------------------------------------------------------ + // Server-side methods. + // These calls shall be made from the server side only. Their results are +diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp +index e3878c19..27748434 100644 +--- a/src/brpc/socket.cpp ++++ b/src/brpc/socket.cpp +@@ -1620,7 +1620,7 @@ int Socket::StartWrite(WriteRequest* req, const WriteOptions& opt) { + // in some protocols(namely RTMP). + req->Setup(this); + +- if (ssl_state() != SSL_OFF) { ++ if (opt.write_in_background || ssl_state() != SSL_OFF) { + // Writing into SSL may block the current bthread, always write + // in the background. + goto KEEPWRITE_IN_BACKGROUND; +diff --git a/src/brpc/socket.h b/src/brpc/socket.h +index 6f710ee2..28a7ada6 100644 +--- a/src/brpc/socket.h ++++ b/src/brpc/socket.h +@@ -269,10 +269,20 @@ public: + // Default: false + bool ignore_eovercrowded; + ++ // The calling thread directly creates KeepWrite thread to write into ++ // this socket, skipping writing once. ++ // In situations like when you are continually issuing lots of ++ // StreamWrite or async RPC calls in only one thread, directly creating ++ // KeepWrite thread at first provides batch write effect and better ++ // performance. Otherwise, each write only writes one `msg` into socket ++ // and no KeepWrite thread can be created, which brings poor ++ // performance. ++ bool write_in_background; ++ + WriteOptions() + : id_wait(INVALID_BTHREAD_ID), abstime(NULL) + , pipelined_count(0), auth_flags(0) +- , ignore_eovercrowded(false) {} ++ , ignore_eovercrowded(false), write_in_background(false) {} + }; + int Write(butil::IOBuf *msg, const WriteOptions* options = NULL); + +diff --git a/src/brpc/stream.cpp b/src/brpc/stream.cpp +index d8466d2a..2d565759 100644 +--- a/src/brpc/stream.cpp ++++ b/src/brpc/stream.cpp +@@ -271,7 +271,8 @@ void Stream::TriggerOnConnectIfNeed() { + bthread_mutex_unlock(&_connect_mutex); + } + +-int Stream::AppendIfNotFull(const butil::IOBuf &data) { ++int Stream::AppendIfNotFull(const butil::IOBuf &data, ++ const StreamWriteOptions* options) { + if (_cur_buf_size > 0) { + std::unique_lock<bthread_mutex_t> lck(_congestion_control_mutex); + if (_produced >= _remote_consumed + _cur_buf_size) { +@@ -290,7 +291,9 @@ int Stream::AppendIfNotFull(const butil::IOBuf &data) { + + size_t data_length = data.length(); + butil::IOBuf copied_data(data); +- const int rc = _fake_socket_weak_ref->Write(&copied_data); ++ Socket::WriteOptions wopt; ++ wopt.write_in_background = options != NULL && options->write_in_background; ++ const int rc = _fake_socket_weak_ref->Write(&copied_data, &wopt); + if (rc != 0) { + // Stream may be closed by peer before + LOG(WARNING) << "Fail to write to _fake_socket, " << berror(); +@@ -679,13 +682,14 @@ void Stream::HandleRpcResponse(butil::IOBuf* response_buffer) { + policy::ProcessRpcResponse(msg); + } + +-int StreamWrite(StreamId stream_id, const butil::IOBuf &message) { ++int StreamWrite(StreamId stream_id, const butil::IOBuf &message, ++ const StreamWriteOptions* options) { + SocketUniquePtr ptr; + if (Socket::Address(stream_id, &ptr) != 0) { + return EINVAL; + } + Stream* s = (Stream*)ptr->conn(); +- const int rc = s->AppendIfNotFull(message); ++ const int rc = s->AppendIfNotFull(message, options); + if (rc == 0) { + return 0; + } +diff --git a/src/brpc/stream.h b/src/brpc/stream.h +index fbf2d51d..410a5a09 100644 +--- a/src/brpc/stream.h ++++ b/src/brpc/stream.h +@@ -82,6 +82,18 @@ struct StreamOptions { + StreamInputHandler* handler; + }; + ++struct StreamWriteOptions ++{ ++ StreamWriteOptions() : write_in_background(false) {} ++ ++ // Write message to socket in background thread. ++ // Provides batch write effect and better performance in situations when ++ // you are continually issuing lots of StreamWrite or async RPC calls in ++ // only one thread. Otherwise, each StreamWrite directly writes message into ++ // socket and brings poor performance. ++ bool write_in_background; ++}; ++ + // [Called at the client side] + // Create a stream at client-side along with the |cntl|, which will be connected + // when receiving the response with a stream from server-side. If |options| is +@@ -104,7 +116,8 @@ int StreamAccept(StreamId* response_stream, Controller &cntl, + // - EAGAIN: |stream_id| is created with positive |max_buf_size| and buf size + // which the remote side hasn't consumed yet excceeds the number. + // - EINVAL: |stream_id| is invalied or has been closed +-int StreamWrite(StreamId stream_id, const butil::IOBuf &message); ++int StreamWrite(StreamId stream_id, const butil::IOBuf &message, ++ const StreamWriteOptions* options = NULL); + + // Write util the pending buffer size is less than |max_buf_size| or orrur + // occurs +diff --git a/src/brpc/stream_impl.h b/src/brpc/stream_impl.h +index 259f0b77..f24b75a3 100644 +--- a/src/brpc/stream_impl.h ++++ b/src/brpc/stream_impl.h +@@ -42,7 +42,8 @@ public: + + // --------------------- SocketConnection -------------- + +- int AppendIfNotFull(const butil::IOBuf& msg); ++ int AppendIfNotFull(const butil::IOBuf& msg, ++ const StreamWriteOptions* options = NULL); + static int Create(const StreamOptions& options, + const StreamSettings *remote_settings, + StreamId *id); \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
