This is an automated email from the ASF dual-hosted git repository.
liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a82699ba087 [fix](auto-partition) Replace std::mutex with
bthread::Mutex in VTabletWriter (#57508)
a82699ba087 is described below
commit a82699ba08757c1556f096e7be7fdf1f3625d3a0
Author: Xin Liao <[email protected]>
AuthorDate: Tue Nov 4 09:43:51 2025 +0800
[fix](auto-partition) Replace std::mutex with bthread::Mutex in
VTabletWriter (#57508)
### What problem does this PR solve?
Fix potential performance issue where std::mutex was used in bthread
context. When bthread blocks on std::mutex, it blocks the entire pthread
instead of just yielding the current bthread, which can cause other
bthreads on the same pthread to be blocked unnecessarily.
This ensures proper bthread scheduling and improves performance in high
concurrency scenarios during data writing operations.
---
be/src/vec/sink/writer/vtablet_writer.cpp | 4 ++--
be/src/vec/sink/writer/vtablet_writer.h | 4 +++-
2 files changed, 5 insertions(+), 3 deletions(-)
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp
b/be/src/vec/sink/writer/vtablet_writer.cpp
index f349ac5f1db..c7abe1baa14 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -1333,7 +1333,7 @@ void VTabletWriter::_send_batch_process() {
while (true) {
// incremental open will temporarily make channels into abnormal
state. stop checking when this.
- std::unique_lock<std::mutex> l(_stop_check_channel);
+ std::unique_lock<bthread::Mutex> l(_stop_check_channel);
int running_channels_num = 0;
int opened_nodes = 0;
@@ -1637,7 +1637,7 @@ Status VTabletWriter::_init(RuntimeState* state,
RuntimeProfile* profile) {
Status VTabletWriter::_incremental_open_node_channel(
const std::vector<TOlapTablePartition>& partitions) {
// do what we did in prepare() for partitions. indexes which don't change
when we create new partition is orthogonal to partitions.
- std::unique_lock<std::mutex> _l(_stop_check_channel);
+ std::unique_lock<bthread::Mutex> _l(_stop_check_channel);
for (int i = 0; i < _schema->indexes().size(); ++i) {
const OlapTableIndexSchema* index = _schema->indexes()[i];
std::vector<TTabletWithPartition> tablets;
diff --git a/be/src/vec/sink/writer/vtablet_writer.h
b/be/src/vec/sink/writer/vtablet_writer.h
index 64dcbb09f82..ba1c0c505d6 100644
--- a/be/src/vec/sink/writer/vtablet_writer.h
+++ b/be/src/vec/sink/writer/vtablet_writer.h
@@ -31,6 +31,8 @@
#include <google/protobuf/stubs/callback.h>
// IWYU pragma: no_include <bits/chrono.h>
+#include <bthread/mutex.h>
+
#include <atomic>
#include <chrono> // IWYU pragma: keep
#include <cstddef>
@@ -702,7 +704,7 @@ private:
std::unique_ptr<OlapTabletFinder> _tablet_finder;
// index_channel
- std::mutex _stop_check_channel;
+ bthread::Mutex _stop_check_channel;
std::vector<std::shared_ptr<IndexChannel>> _channels;
std::unordered_map<int64_t, std::shared_ptr<IndexChannel>>
_index_id_to_channel;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]