This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a82699ba087 [fix](auto-partition) Replace std::mutex with 
bthread::Mutex in VTabletWriter (#57508)
a82699ba087 is described below

commit a82699ba08757c1556f096e7be7fdf1f3625d3a0
Author: Xin Liao <[email protected]>
AuthorDate: Tue Nov 4 09:43:51 2025 +0800

    [fix](auto-partition) Replace std::mutex with bthread::Mutex in 
VTabletWriter (#57508)
    
    ### What problem does this PR solve?
    
    Fix potential performance issue where std::mutex was used in bthread
    context. When bthread blocks on std::mutex, it blocks the entire pthread
    instead of just yielding the current bthread, which can cause other
    bthreads on the same pthread to be blocked unnecessarily.
    
    This ensures proper bthread scheduling and improves performance in high
    concurrency scenarios during data writing operations.
---
 be/src/vec/sink/writer/vtablet_writer.cpp | 4 ++--
 be/src/vec/sink/writer/vtablet_writer.h   | 4 +++-
 2 files changed, 5 insertions(+), 3 deletions(-)

diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp 
b/be/src/vec/sink/writer/vtablet_writer.cpp
index f349ac5f1db..c7abe1baa14 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -1333,7 +1333,7 @@ void VTabletWriter::_send_batch_process() {
 
     while (true) {
         // incremental open will temporarily make channels into abnormal 
state. stop checking when this.
-        std::unique_lock<std::mutex> l(_stop_check_channel);
+        std::unique_lock<bthread::Mutex> l(_stop_check_channel);
 
         int running_channels_num = 0;
         int opened_nodes = 0;
@@ -1637,7 +1637,7 @@ Status VTabletWriter::_init(RuntimeState* state, 
RuntimeProfile* profile) {
 Status VTabletWriter::_incremental_open_node_channel(
         const std::vector<TOlapTablePartition>& partitions) {
     // do what we did in prepare() for partitions. indexes which don't change 
when we create new partition is orthogonal to partitions.
-    std::unique_lock<std::mutex> _l(_stop_check_channel);
+    std::unique_lock<bthread::Mutex> _l(_stop_check_channel);
     for (int i = 0; i < _schema->indexes().size(); ++i) {
         const OlapTableIndexSchema* index = _schema->indexes()[i];
         std::vector<TTabletWithPartition> tablets;
diff --git a/be/src/vec/sink/writer/vtablet_writer.h 
b/be/src/vec/sink/writer/vtablet_writer.h
index 64dcbb09f82..ba1c0c505d6 100644
--- a/be/src/vec/sink/writer/vtablet_writer.h
+++ b/be/src/vec/sink/writer/vtablet_writer.h
@@ -31,6 +31,8 @@
 #include <google/protobuf/stubs/callback.h>
 
 // IWYU pragma: no_include <bits/chrono.h>
+#include <bthread/mutex.h>
+
 #include <atomic>
 #include <chrono> // IWYU pragma: keep
 #include <cstddef>
@@ -702,7 +704,7 @@ private:
     std::unique_ptr<OlapTabletFinder> _tablet_finder;
 
     // index_channel
-    std::mutex _stop_check_channel;
+    bthread::Mutex _stop_check_channel;
     std::vector<std::shared_ptr<IndexChannel>> _channels;
     std::unordered_map<int64_t, std::shared_ptr<IndexChannel>> 
_index_id_to_channel;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to