This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 349c090469 [fix](move-memtable) lock when send data in load stream
stub (#23949)
349c090469 is described below
commit 349c090469e036b7ceaac87943149230b38da372
Author: Kaijie Chen <[email protected]>
AuthorDate: Thu Sep 7 11:19:21 2023 +0800
[fix](move-memtable) lock when send data in load stream stub (#23949)
---
be/src/vec/sink/load_stream_stub.cpp | 15 +++++++--------
be/src/vec/sink/load_stream_stub.h | 3 ++-
2 files changed, 9 insertions(+), 9 deletions(-)
diff --git a/be/src/vec/sink/load_stream_stub.cpp
b/be/src/vec/sink/load_stream_stub.cpp
index fbc69d3a93..63bea0b6c7 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -215,16 +215,15 @@ Status LoadStreamStub::_encode_and_send(PStreamHeader&
header, std::span<const S
Status LoadStreamStub::_send_with_buffer(butil::IOBuf& buf, bool eos) {
butil::IOBuf output;
- {
- std::unique_lock<decltype(_buffer_mutex)> lock(_buffer_mutex);
- _buffer.append(buf);
- if (eos || _buffer.size() >=
config::brpc_streaming_client_batch_bytes) {
- output.swap(_buffer);
- }
- }
- if (output.size() == 0) {
+ std::unique_lock<decltype(_buffer_mutex)> buffer_lock(_buffer_mutex);
+ _buffer.append(buf);
+ if (!eos && _buffer.size() < config::brpc_streaming_client_batch_bytes) {
return Status::OK();
}
+ output.swap(_buffer);
+ // acquire send lock while holding buffer lock, to ensure the message order
+ std::lock_guard<decltype(_send_mutex)> send_lock(_send_mutex);
+ buffer_lock.unlock();
VLOG_DEBUG << "send buf size : " << output.size() << ", eos: " << eos;
return _send_with_retry(output);
}
diff --git a/be/src/vec/sink/load_stream_stub.h
b/be/src/vec/sink/load_stream_stub.h
index 2efa887581..8a1ae79c52 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -175,7 +175,8 @@ protected:
bthread::Mutex _mutex;
bthread::ConditionVariable _close_cv;
- bthread::Mutex _buffer_mutex;
+ std::mutex _buffer_mutex;
+ std::mutex _send_mutex;
butil::IOBuf _buffer;
PUniqueId _load_id;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]