This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 794cf768bc3 branch-3.1: [fix](load) quorum success write should 
consider data skew #53737 (#53827)
794cf768bc3 is described below

commit 794cf768bc3f292264dc9e0fb717c3018f794565
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Jul 25 14:06:18 2025 +0800

    branch-3.1: [fix](load) quorum success write should consider data skew 
#53737 (#53827)
    
    Cherry-picked from #53737
    
    Co-authored-by: hui lai <[email protected]>
---
 be/src/vec/sink/load_stream_stub.cpp         |  2 --
 be/src/vec/sink/load_stream_stub.h           | 11 ------
 be/src/vec/sink/writer/vtablet_writer.cpp    | 37 +++++++++----------
 be/src/vec/sink/writer/vtablet_writer.h      |  8 +----
 be/src/vec/sink/writer/vtablet_writer_v2.cpp | 54 ++++++++++++++--------------
 be/src/vec/sink/writer/vtablet_writer_v2.h   |  6 ++--
 6 files changed, 51 insertions(+), 67 deletions(-)

diff --git a/be/src/vec/sink/load_stream_stub.cpp 
b/be/src/vec/sink/load_stream_stub.cpp
index 00c327a21d2..44b09e935eb 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -229,7 +229,6 @@ Status LoadStreamStub::append_data(int64_t partition_id, 
int64_t index_id, int64
     header.set_offset(offset);
     header.set_opcode(doris::PStreamHeader::APPEND_DATA);
     header.set_file_type(file_type);
-    add_write_tablets(tablet_id);
     return _encode_and_send(header, data);
 }
 
@@ -254,7 +253,6 @@ Status LoadStreamStub::add_segment(int64_t partition_id, 
int64_t index_id, int64
     if (flush_schema != nullptr) {
         flush_schema->to_schema_pb(header.mutable_flush_schema());
     }
-    add_write_tablets(tablet_id);
     return _encode_and_send(header);
 }
 
diff --git a/be/src/vec/sink/load_stream_stub.h 
b/be/src/vec/sink/load_stream_stub.h
index 4a2d843a7a1..408d9fede54 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -223,16 +223,6 @@ public:
         return _bytes_written;
     }
 
-    void add_write_tablets(int64_t tablet_id) {
-        std::lock_guard<bthread::Mutex> lock(_write_mutex);
-        _write_tablets.insert(tablet_id);
-    }
-
-    std::set<int64_t> write_tablets() {
-        std::lock_guard<bthread::Mutex> lock(_write_mutex);
-        return _write_tablets;
-    }
-
     Status check_cancel() {
         DBUG_EXECUTE_IF("LoadStreamStub._check_cancel.cancelled",
                         { return Status::InternalError("stream cancelled"); });
@@ -285,7 +275,6 @@ protected:
     bool _is_incremental = false;
 
     bthread::Mutex _write_mutex;
-    std::set<int64_t> _write_tablets;
     size_t _bytes_written = 0;
 };
 
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp 
b/be/src/vec/sink/writer/vtablet_writer.cpp
index e1d1505cc66..94c4a33e73e 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -316,18 +316,24 @@ Status IndexChannel::close_wait(
                     { return Status::TimedOut("injected timeout"); });
     Status status = Status::OK();
     // 1. wait quorum success
-    std::unordered_set<int64_t> write_tablets;
-    for (const auto& [node_id, node_channel] : _node_channels) {
-        auto node_channel_write_tablets = node_channel->write_tablets();
-        write_tablets.insert(node_channel_write_tablets.begin(), 
node_channel_write_tablets.end());
+    std::unordered_set<int64_t> need_finish_tablets;
+    auto partition_ids = _parent->_tablet_finder->partition_ids();
+    for (const auto& part : _parent->_vpartition->get_partitions()) {
+        if (partition_ids.contains(part->id)) {
+            for (const auto& index : part->indexes) {
+                for (const auto& tablet_id : index.tablets) {
+                    need_finish_tablets.insert(tablet_id);
+                }
+            }
+        }
     }
     while (true) {
         RETURN_IF_ERROR(check_each_node_channel_close(
                 &unfinished_node_channel_ids, node_add_batch_counter_map, 
writer_stats, status));
-        bool quorum_success = _quorum_success(unfinished_node_channel_ids, 
write_tablets);
+        bool quorum_success = _quorum_success(unfinished_node_channel_ids, 
need_finish_tablets);
         if (unfinished_node_channel_ids.empty() || quorum_success) {
             LOG(INFO) << "quorum_success: " << quorum_success
-                      << ", is all unfinished: " << 
unfinished_node_channel_ids.empty()
+                      << ", is all finished: " << 
unfinished_node_channel_ids.empty()
                       << ", txn_id: " << _parent->_txn_id
                       << ", load_id: " << print_id(_parent->_load_id);
             break;
@@ -399,28 +405,27 @@ Status IndexChannel::check_each_node_channel_close(
 }
 
 bool IndexChannel::_quorum_success(const std::unordered_set<int64_t>& 
unfinished_node_channel_ids,
-                                   const std::unordered_set<int64_t>& 
write_tablets) {
+                                   const std::unordered_set<int64_t>& 
need_finish_tablets) {
     if (!config::enable_quorum_success_write) {
         return false;
     }
-    std::unordered_map<int64_t, int64_t> finished_tablets_replica;
+    if (need_finish_tablets.empty()) [[unlikely]] {
+        return false;
+    }
 
     // 1. collect all write tablets and finished tablets
+    std::unordered_map<int64_t, int64_t> finished_tablets_replica;
     for (const auto& [node_id, node_channel] : _node_channels) {
-        auto node_channel_write_tablets = node_channel->write_tablets();
         if (unfinished_node_channel_ids.contains(node_id) || 
!node_channel->check_status().ok()) {
             continue;
         }
-        for (const auto& tablet_id : node_channel_write_tablets) {
+        for (const auto& tablet_id : _tablets_by_channel[node_id]) {
             finished_tablets_replica[tablet_id]++;
         }
     }
 
     // 2. check if quorum success
-    if (write_tablets.empty()) {
-        return false;
-    }
-    for (const auto& tablet_id : write_tablets) {
+    for (const auto& tablet_id : need_finish_tablets) {
         if (finished_tablets_replica[tablet_id] < 
_load_required_replicas_num(tablet_id)) {
             return false;
         }
@@ -755,10 +760,6 @@ Status VNodeChannel::add_block(vectorized::Block* block, 
const Payload* payload)
     for (auto tablet_id : payload->second) {
         _cur_add_block_request->add_tablet_ids(tablet_id);
     }
-    {
-        std::lock_guard<std::mutex> l(_write_tablets_lock);
-        _write_tablets.insert(payload->second.begin(), payload->second.end());
-    }
     _write_bytes.fetch_add(_cur_mutable_block->bytes());
 
     if (_cur_mutable_block->rows() >= _batch_size ||
diff --git a/be/src/vec/sink/writer/vtablet_writer.h 
b/be/src/vec/sink/writer/vtablet_writer.h
index 4948e25448f..554a2d711a4 100644
--- a/be/src/vec/sink/writer/vtablet_writer.h
+++ b/be/src/vec/sink/writer/vtablet_writer.h
@@ -329,10 +329,6 @@ public:
     bool is_incremental() const { return _is_incremental; }
 
     int64_t write_bytes() const { return _write_bytes.load(); }
-    std::unordered_set<int64_t> write_tablets() {
-        std::lock_guard<std::mutex> l(_write_tablets_lock);
-        return _write_tablets;
-    }
 
 protected:
     // make a real open request for relative BE's load channel.
@@ -437,9 +433,7 @@ protected:
 
     bool _is_incremental;
 
-    std::mutex _write_tablets_lock;
     std::atomic<int64_t> _write_bytes {0};
-    std::unordered_set<int64_t> _write_tablets;
 };
 
 // an IndexChannel is related to specific table and its rollup and mv
@@ -571,7 +565,7 @@ private:
     int _load_required_replicas_num(int64_t tablet_id);
 
     bool _quorum_success(const std::unordered_set<int64_t>& 
unfinished_node_channel_ids,
-                         const std::unordered_set<int64_t>& write_tablets);
+                         const std::unordered_set<int64_t>& 
need_finish_tablets);
 
     int64_t _calc_max_wait_time_ms(const std::unordered_set<int64_t>& 
unfinished_node_channel_ids);
 
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 71061965ba9..f524891bcae 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -107,6 +107,7 @@ Status VTabletWriterV2::_incremental_open_streams(
                         continue;
                     }
                     _indexes_from_node[node].emplace_back(tablet);
+                    _tablets_by_node[node].emplace(tablet_id);
                     known_indexes.insert(index.index_id);
                     VLOG_DEBUG << "incremental open stream (" << partition->id 
<< ", " << tablet_id
                                << ")";
@@ -345,6 +346,7 @@ Status VTabletWriterV2::_build_tablet_node_mapping() {
                         continue;
                     }
                     _indexes_from_node[node].emplace_back(tablet);
+                    _tablets_by_node[node].emplace(tablet_id);
                     known_indexes.insert(index.index_id);
                 }
                 _build_tablet_replica_info(tablet_id, partition);
@@ -500,10 +502,6 @@ Status VTabletWriterV2::write(RuntimeState* state, Block& 
input_block) {
     // For each tablet, send its input_rows from block to delta writer
     for (const auto& [tablet_id, rows] : rows_for_tablet) {
         RETURN_IF_ERROR(_write_memtable(block, tablet_id, rows));
-        {
-            std::lock_guard<std::mutex> l(_write_tablets_lock);
-            _write_tablets.insert(tablet_id);
-        }
     }
 
     return Status::OK();
@@ -749,13 +747,24 @@ Status VTabletWriterV2::_close_wait(
     Status status;
     auto streams_for_node = _load_stream_map->get_streams_for_node();
     // 1. first wait for quorum success
+    std::unordered_set<int64_t> need_finish_tablets;
+    auto partition_ids = _tablet_finder->partition_ids();
+    for (const auto& part : _vpartition->get_partitions()) {
+        if (partition_ids.contains(part->id)) {
+            for (const auto& index : part->indexes) {
+                for (const auto& tablet_id : index.tablets) {
+                    need_finish_tablets.insert(tablet_id);
+                }
+            }
+        }
+    }
     while (true) {
         RETURN_IF_ERROR(_check_timeout());
         RETURN_IF_ERROR(_check_streams_finish(unfinished_streams, status, 
streams_for_node));
-        bool quorum_success = _quorum_success(unfinished_streams);
+        bool quorum_success = _quorum_success(unfinished_streams, 
need_finish_tablets);
         if (quorum_success || unfinished_streams.empty()) {
             LOG(INFO) << "quorum_success: " << quorum_success
-                      << ", is all unfinished: " << unfinished_streams.empty()
+                      << ", is all finished: " << unfinished_streams.empty()
                       << ", txn_id: " << _txn_id << ", load_id: " << 
print_id(_load_id);
             break;
         }
@@ -795,21 +804,19 @@ Status VTabletWriterV2::_close_wait(
 }
 
 bool VTabletWriterV2::_quorum_success(
-        const std::unordered_set<std::shared_ptr<LoadStreamStub>>& 
unfinished_streams) {
+        const std::unordered_set<std::shared_ptr<LoadStreamStub>>& 
unfinished_streams,
+        const std::unordered_set<int64_t>& need_finish_tablets) {
     if (!config::enable_quorum_success_write) {
         return false;
     }
-    {
-        std::lock_guard<std::mutex> l(_write_tablets_lock);
-        if (_write_tablets.empty()) {
-            return false;
-        }
-    }
-    std::unordered_map<int64_t, int64_t> finished_tablets_replica;
     auto streams_for_node = _load_stream_map->get_streams_for_node();
-    std::unordered_set<int64_t> finished_dst_ids;
+    if (need_finish_tablets.empty()) [[unlikely]] {
+        return false;
+    }
 
     // 1. calculate finished tablets replica num
+    std::unordered_set<int64_t> finished_dst_ids;
+    std::unordered_map<int64_t, int64_t> finished_tablets_replica;
     for (const auto& [dst_id, streams] : streams_for_node) {
         bool finished = true;
         for (const auto& stream : streams->streams()) {
@@ -822,24 +829,19 @@ bool VTabletWriterV2::_quorum_success(
             finished_dst_ids.insert(dst_id);
         }
     }
-    for (const auto& [dst_id, streams] : streams_for_node) {
+    for (const auto& [dst_id, _] : streams_for_node) {
         if (!finished_dst_ids.contains(dst_id)) {
             continue;
         }
-        for (const auto& stream : streams->streams()) {
-            for (auto tablet_id : stream->write_tablets()) {
-                finished_tablets_replica[tablet_id]++;
-            }
+        for (const auto& tablet_id : _tablets_by_node[dst_id]) {
+            finished_tablets_replica[tablet_id]++;
         }
     }
 
     // 2. check if quorum success
-    {
-        std::lock_guard<std::mutex> l(_write_tablets_lock);
-        for (const auto& tablet_id : _write_tablets) {
-            if (finished_tablets_replica[tablet_id] < 
_load_required_replicas_num(tablet_id)) {
-                return false;
-            }
+    for (const auto& tablet_id : need_finish_tablets) {
+        if (finished_tablets_replica[tablet_id] < 
_load_required_replicas_num(tablet_id)) {
+            return false;
         }
     }
     return true;
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h 
b/be/src/vec/sink/writer/vtablet_writer_v2.h
index eee2c956644..02fee25e743 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.h
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.h
@@ -157,7 +157,8 @@ private:
                        bool need_wait_after_quorum_success);
 
     bool _quorum_success(
-            const std::unordered_set<std::shared_ptr<LoadStreamStub>>& 
unfinished_streams);
+            const std::unordered_set<std::shared_ptr<LoadStreamStub>>& 
unfinished_streams,
+            const std::unordered_set<int64_t>& need_finish_tablets);
 
     int _load_required_replicas_num(int64_t tablet_id);
 
@@ -245,6 +246,7 @@ private:
 
     std::unordered_map<int64_t, std::unordered_map<int64_t, PTabletID>> 
_tablets_for_node;
     std::unordered_map<int64_t, std::vector<PTabletID>> _indexes_from_node;
+    std::unordered_map<int64_t, std::unordered_set<int64_t>> _tablets_by_node;
 
     std::shared_ptr<LoadStreamMap> _load_stream_map;
 
@@ -256,8 +258,6 @@ private:
 
     // tablet_id -> <total replicas num, load required replicas num>
     std::unordered_map<int64_t, std::pair<int, int>> _tablet_replica_info;
-    std::mutex _write_tablets_lock;
-    std::unordered_set<int64_t> _write_tablets;
 };
 
 } // namespace vectorized


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to