This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 349c090469 [fix](move-memtable) lock when send data in load stream 
stub (#23949)
349c090469 is described below

commit 349c090469e036b7ceaac87943149230b38da372
Author: Kaijie Chen <[email protected]>
AuthorDate: Thu Sep 7 11:19:21 2023 +0800

    [fix](move-memtable) lock when send data in load stream stub (#23949)
---
 be/src/vec/sink/load_stream_stub.cpp | 15 +++++++--------
 be/src/vec/sink/load_stream_stub.h   |  3 ++-
 2 files changed, 9 insertions(+), 9 deletions(-)

diff --git a/be/src/vec/sink/load_stream_stub.cpp 
b/be/src/vec/sink/load_stream_stub.cpp
index fbc69d3a93..63bea0b6c7 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -215,16 +215,15 @@ Status LoadStreamStub::_encode_and_send(PStreamHeader& 
header, std::span<const S
 
 Status LoadStreamStub::_send_with_buffer(butil::IOBuf& buf, bool eos) {
     butil::IOBuf output;
-    {
-        std::unique_lock<decltype(_buffer_mutex)> lock(_buffer_mutex);
-        _buffer.append(buf);
-        if (eos || _buffer.size() >= 
config::brpc_streaming_client_batch_bytes) {
-            output.swap(_buffer);
-        }
-    }
-    if (output.size() == 0) {
+    std::unique_lock<decltype(_buffer_mutex)> buffer_lock(_buffer_mutex);
+    _buffer.append(buf);
+    if (!eos && _buffer.size() < config::brpc_streaming_client_batch_bytes) {
         return Status::OK();
     }
+    output.swap(_buffer);
+    // acquire send lock while holding buffer lock, to ensure the message order
+    std::lock_guard<decltype(_send_mutex)> send_lock(_send_mutex);
+    buffer_lock.unlock();
     VLOG_DEBUG << "send buf size : " << output.size() << ", eos: " << eos;
     return _send_with_retry(output);
 }
diff --git a/be/src/vec/sink/load_stream_stub.h 
b/be/src/vec/sink/load_stream_stub.h
index 2efa887581..8a1ae79c52 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -175,7 +175,8 @@ protected:
     bthread::Mutex _mutex;
     bthread::ConditionVariable _close_cv;
 
-    bthread::Mutex _buffer_mutex;
+    std::mutex _buffer_mutex;
+    std::mutex _send_mutex;
     butil::IOBuf _buffer;
 
     PUniqueId _load_id;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to