This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 4fcb9d5cb04 branch-3.1: [feat](load) quorum success write (part I) 
#52354 (#53255)
4fcb9d5cb04 is described below

commit 4fcb9d5cb043b6c20b2d224267e092c3e05726a4
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Jul 15 16:34:29 2025 +0800

    branch-3.1: [feat](load) quorum success write (part I) #52354 (#53255)
    
    Cherry-picked from #52354
    
    Co-authored-by: hui lai <[email protected]>
---
 be/src/common/config.cpp                           |   5 +
 be/src/common/config.h                             |   5 +
 be/src/vec/sink/writer/vtablet_writer.cpp          | 159 +++++++++++++++++++--
 be/src/vec/sink/writer/vtablet_writer.h            |  26 +++-
 .../test_writer_fault_injection.groovy             |   2 +
 5 files changed, 184 insertions(+), 13 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 6ef1742594f..dee3b83b6b7 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1530,6 +1530,11 @@ DEFINE_mInt32(load_trigger_compaction_version_percent, 
"66");
 DEFINE_mInt64(base_compaction_interval_seconds_since_last_operation, "86400");
 DEFINE_mBool(enable_compaction_pause_on_high_memory, "true");
 
+DEFINE_mBool(enable_quorum_success_write, "true");
+DEFINE_mDouble(quorum_success_max_wait_multiplier, "0.2");
+DEFINE_mInt64(quorum_success_min_wait_seconds, "10");
+DEFINE_mInt32(quorum_success_remaining_timeout_seconds, "30");
+
 DEFINE_mBool(enable_calc_delete_bitmap_between_segments_concurrently, "false");
 
 DEFINE_mBool(enable_fetch_rowsets_from_peer_replicas, "false");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 45d7af789ab..4b50066fac3 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1606,6 +1606,11 @@ DECLARE_mInt32(load_trigger_compaction_version_percent);
 DECLARE_mInt64(base_compaction_interval_seconds_since_last_operation);
 DECLARE_mBool(enable_compaction_pause_on_high_memory);
 
+DECLARE_mBool(enable_quorum_success_write);
+DECLARE_mDouble(quorum_success_max_wait_multiplier);
+DECLARE_mInt64(quorum_success_min_wait_seconds);
+DECLARE_mInt32(quorum_success_remaining_timeout_seconds);
+
 DECLARE_mBool(enable_calc_delete_bitmap_between_segments_concurrently);
 
 DECLARE_mBool(enable_fetch_rowsets_from_peer_replicas);
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp 
b/be/src/vec/sink/writer/vtablet_writer.cpp
index 64903c5b212..e1d1505cc66 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -197,6 +197,15 @@ int IndexChannel::_max_failed_replicas(int64_t tablet_id) {
     return max_failed_replicas;
 }
 
+int IndexChannel::_load_required_replicas_num(int64_t tablet_id) {
+    auto [total_replicas_num, load_required_replicas_num] =
+            _parent->_tablet_replica_info[tablet_id];
+    if (total_replicas_num == 0) {
+        return (_parent->_num_replicas + 1) / 2;
+    }
+    return load_required_replicas_num;
+}
+
 Status IndexChannel::check_intolerable_failure() {
     std::lock_guard<std::mutex> l(_fail_lock);
     return _intolerable_failure_status;
@@ -301,23 +310,61 @@ static Status 
cancel_channel_and_check_intolerable_failure(Status status,
 Status IndexChannel::close_wait(
         RuntimeState* state, WriterStats* writer_stats,
         std::unordered_map<int64_t, AddBatchCounter>* 
node_add_batch_counter_map,
-        std::unordered_set<int64_t> unfinished_node_channel_ids) {
+        std::unordered_set<int64_t> unfinished_node_channel_ids,
+        bool need_wait_after_quorum_success) {
+    DBUG_EXECUTE_IF("IndexChannel.close_wait.timeout",
+                    { return Status::TimedOut("injected timeout"); });
     Status status = Status::OK();
+    // 1. wait quorum success
+    std::unordered_set<int64_t> write_tablets;
+    for (const auto& [node_id, node_channel] : _node_channels) {
+        auto node_channel_write_tablets = node_channel->write_tablets();
+        write_tablets.insert(node_channel_write_tablets.begin(), 
node_channel_write_tablets.end());
+    }
     while (true) {
-        status = check_each_node_channel_close(&unfinished_node_channel_ids,
-                                               node_add_batch_counter_map, 
writer_stats, status);
-        if (!status.ok() || unfinished_node_channel_ids.empty()) {
-            LOG(INFO) << ", is all unfinished: " << 
unfinished_node_channel_ids.empty()
-                      << ", status: " << status << ", txn_id: " << 
_parent->_txn_id
+        RETURN_IF_ERROR(check_each_node_channel_close(
+                &unfinished_node_channel_ids, node_add_batch_counter_map, 
writer_stats, status));
+        bool quorum_success = _quorum_success(unfinished_node_channel_ids, 
write_tablets);
+        if (unfinished_node_channel_ids.empty() || quorum_success) {
+            LOG(INFO) << "quorum_success: " << quorum_success
+                      << ", is all unfinished: " << 
unfinished_node_channel_ids.empty()
+                      << ", txn_id: " << _parent->_txn_id
                       << ", load_id: " << print_id(_parent->_load_id);
             break;
         }
         bthread_usleep(1000 * 10);
     }
 
-    DBUG_EXECUTE_IF("IndexChannel.close_wait.timeout",
-                    { status = Status::TimedOut("injected timeout"); });
-
+    // 2. wait for all node channel to complete as much as possible
+    if (!unfinished_node_channel_ids.empty() && 
need_wait_after_quorum_success) {
+        int64_t max_wait_time_ms = 
_calc_max_wait_time_ms(unfinished_node_channel_ids);
+        while (true) {
+            
RETURN_IF_ERROR(check_each_node_channel_close(&unfinished_node_channel_ids,
+                                                          
node_add_batch_counter_map, writer_stats,
+                                                          status));
+            if (unfinished_node_channel_ids.empty()) {
+                break;
+            }
+            int64_t elapsed_ms = UnixMillis() - _start_time;
+            if (elapsed_ms > max_wait_time_ms ||
+                _parent->_load_channel_timeout_s - elapsed_ms / 1000 <
+                        config::quorum_success_remaining_timeout_seconds) {
+                // cancel unfinished node channel
+                std::stringstream unfinished_node_channel_host_str;
+                for (auto& it : unfinished_node_channel_ids) {
+                    unfinished_node_channel_host_str << 
_node_channels[it]->host() << ",";
+                    _node_channels[it]->cancel("timeout");
+                }
+                LOG(WARNING) << "reach max wait time, max_wait_time_ms: " << 
max_wait_time_ms
+                             << ", cancel unfinished node channel and finish 
close"
+                             << ", load id: " << print_id(_parent->_load_id)
+                             << ", txn_id: " << _parent->_txn_id << ", 
unfinished node channel: "
+                             << unfinished_node_channel_host_str.str();
+                break;
+            }
+            bthread_usleep(1000 * 10);
+        }
+    }
     return status;
 }
 
@@ -340,6 +387,8 @@ Status IndexChannel::check_each_node_channel_close(
                                                          
node_add_batch_counter_map);
             unfinished_node_channel_ids->erase(it.first);
         }
+        
DBUG_EXECUTE_IF("IndexChannel.check_each_node_channel_close.close_status_not_ok",
+                        { close_status = Status::InternalError("injected close 
status not ok"); });
         if (!close_status.ok()) {
             final_status = cancel_channel_and_check_intolerable_failure(
                     std::move(final_status), close_status.to_string(), *this, 
*it.second);
@@ -349,6 +398,80 @@ Status IndexChannel::check_each_node_channel_close(
     return final_status;
 }
 
+bool IndexChannel::_quorum_success(const std::unordered_set<int64_t>& 
unfinished_node_channel_ids,
+                                   const std::unordered_set<int64_t>& 
write_tablets) {
+    if (!config::enable_quorum_success_write) {
+        return false;
+    }
+    std::unordered_map<int64_t, int64_t> finished_tablets_replica;
+
+    // 1. collect all write tablets and finished tablets
+    for (const auto& [node_id, node_channel] : _node_channels) {
+        auto node_channel_write_tablets = node_channel->write_tablets();
+        if (unfinished_node_channel_ids.contains(node_id) || 
!node_channel->check_status().ok()) {
+            continue;
+        }
+        for (const auto& tablet_id : node_channel_write_tablets) {
+            finished_tablets_replica[tablet_id]++;
+        }
+    }
+
+    // 2. check if quorum success
+    if (write_tablets.empty()) {
+        return false;
+    }
+    for (const auto& tablet_id : write_tablets) {
+        if (finished_tablets_replica[tablet_id] < 
_load_required_replicas_num(tablet_id)) {
+            return false;
+        }
+    }
+
+    return true;
+}
+
+int64_t IndexChannel::_calc_max_wait_time_ms(
+        const std::unordered_set<int64_t>& unfinished_node_channel_ids) {
+    // 1. calculate avg speed of all unfinished node channel
+    int64_t elapsed_ms = UnixMillis() - _start_time;
+    int64_t total_bytes = 0;
+    int finished_count = 0;
+    for (const auto& [node_id, node_channel] : _node_channels) {
+        if (unfinished_node_channel_ids.contains(node_id)) {
+            continue;
+        }
+        total_bytes += node_channel->write_bytes();
+        finished_count++;
+    }
+    // no data loaded in index channel, return 0
+    if (total_bytes == 0 || finished_count == 0) {
+        return 0;
+    }
+    // if elapsed_ms is equal to 0, explain the loaded data is too small
+    if (elapsed_ms <= 0) {
+        return config::quorum_success_min_wait_seconds * 1000;
+    }
+    double avg_speed =
+            static_cast<double>(total_bytes) / 
(static_cast<double>(elapsed_ms) * finished_count);
+
+    // 2. calculate max wait time of each unfinished node channel and return 
the max value
+    int64_t max_wait_time_ms = 0;
+    for (int64_t id : unfinished_node_channel_ids) {
+        int64_t bytes = _node_channels[id]->write_bytes();
+        int64_t wait =
+                avg_speed > 0 ? 
static_cast<int64_t>(static_cast<double>(bytes) / avg_speed) : 0;
+        max_wait_time_ms = std::max(max_wait_time_ms, wait);
+    }
+
+    // 3. calculate max wait time
+    // introduce quorum_success_min_wait_seconds to avoid jitter of small load
+    max_wait_time_ms =
+            
std::max(static_cast<int64_t>(static_cast<double>(max_wait_time_ms) *
+                                          (1.0 + 
config::quorum_success_max_wait_multiplier)),
+                     config::quorum_success_min_wait_seconds * 1000);
+
+    return max_wait_time_ms;
+}
+
 static Status none_of(std::initializer_list<bool> vars) {
     bool none = std::none_of(vars.begin(), vars.end(), [](bool var) { return 
var; });
     Status st = Status::OK();
@@ -632,6 +755,11 @@ Status VNodeChannel::add_block(vectorized::Block* block, 
const Payload* payload)
     for (auto tablet_id : payload->second) {
         _cur_add_block_request->add_tablet_ids(tablet_id);
     }
+    {
+        std::lock_guard<std::mutex> l(_write_tablets_lock);
+        _write_tablets.insert(payload->second.begin(), payload->second.end());
+    }
+    _write_bytes.fetch_add(_cur_mutable_block->bytes());
 
     if (_cur_mutable_block->rows() >= _batch_size ||
         _cur_mutable_block->bytes() > config::doris_scanner_row_bytes) {
@@ -1081,6 +1209,10 @@ Status VNodeChannel::after_close_handle(
     return st;
 }
 
+Status VNodeChannel::check_status() {
+    return none_of({_cancelled, !_eos_is_produced});
+}
+
 void VNodeChannel::_close_check() {
     std::lock_guard<std::mutex> lg(_pending_batches_lock);
     CHECK(_pending_blocks.empty()) << name();
@@ -1200,6 +1332,7 @@ Status VTabletWriter::open(doris::RuntimeState* state, 
doris::RuntimeProfile* pr
     VLOG_DEBUG << "list of open index id = " << fmt::to_string(buf);
 
     for (const auto& index_channel : _channels) {
+        index_channel->set_start_time(UnixMillis());
         index_channel->for_each_node_channel([&index_channel](
                                                      const 
std::shared_ptr<VNodeChannel>& ch) {
             auto st = ch->open_wait();
@@ -1577,9 +1710,11 @@ void VTabletWriter::_do_try_close(RuntimeState* state, 
const Status& exec_status
                 if (!status.ok()) {
                     break;
                 }
-
+                // Do not need to wait after quorum success,
+                // for first-stage close_wait only ensure incremental node 
channels load has been completed,
+                // unified waiting in the second-stage close_wait.
                 status = index_channel->close_wait(_state, nullptr, nullptr,
-                                                   
index_channel->init_node_channel_ids());
+                                                   
index_channel->init_node_channel_ids(), false);
                 if (!status.ok()) {
                     break;
                 }
@@ -1657,7 +1792,7 @@ Status VTabletWriter::close(Status exec_status) {
             int64_t add_batch_exec_time = 0;
             int64_t wait_exec_time = 0;
             status = index_channel->close_wait(_state, &writer_stats, 
&node_add_batch_counter_map,
-                                               
index_channel->each_node_channel_ids());
+                                               
index_channel->each_node_channel_ids(), true);
 
             // Due to the non-determinism of compaction, the rowsets of each 
replica may be different from each other on different
             // BE nodes. The number of rows filtered in SegmentWriter depends 
on the historical rowsets located in the correspoding
diff --git a/be/src/vec/sink/writer/vtablet_writer.h 
b/be/src/vec/sink/writer/vtablet_writer.h
index 8a15df9a252..4948e25448f 100644
--- a/be/src/vec/sink/writer/vtablet_writer.h
+++ b/be/src/vec/sink/writer/vtablet_writer.h
@@ -292,6 +292,8 @@ public:
             RuntimeState* state, WriterStats* writer_stats,
             std::unordered_map<int64_t, AddBatchCounter>* 
node_add_batch_counter_map);
 
+    Status check_status();
+
     void cancel(const std::string& cancel_msg);
 
     void time_report(std::unordered_map<int64_t, AddBatchCounter>* 
add_batch_counter_map,
@@ -326,6 +328,12 @@ public:
 
     bool is_incremental() const { return _is_incremental; }
 
+    int64_t write_bytes() const { return _write_bytes.load(); }
+    std::unordered_set<int64_t> write_tablets() {
+        std::lock_guard<std::mutex> l(_write_tablets_lock);
+        return _write_tablets;
+    }
+
 protected:
     // make a real open request for relative BE's load channel.
     void _open_internal(bool is_incremental);
@@ -428,6 +436,10 @@ protected:
     int64_t _wg_id = -1;
 
     bool _is_incremental;
+
+    std::mutex _write_tablets_lock;
+    std::atomic<int64_t> _write_bytes {0};
+    std::unordered_set<int64_t> _write_tablets;
 };
 
 // an IndexChannel is related to specific table and its rollup and mv
@@ -505,7 +517,8 @@ public:
 
     Status close_wait(RuntimeState* state, WriterStats* writer_stats,
                       std::unordered_map<int64_t, AddBatchCounter>* 
node_add_batch_counter_map,
-                      std::unordered_set<int64_t> unfinished_node_channel_ids);
+                      std::unordered_set<int64_t> unfinished_node_channel_ids,
+                      bool need_wait_after_quorum_success);
 
     Status check_each_node_channel_close(
             std::unordered_set<int64_t>* unfinished_node_channel_ids,
@@ -544,6 +557,8 @@ public:
     // check whether the rows num filtered by different replicas is consistent
     Status check_tablet_filtered_rows_consistency();
 
+    void set_start_time(const int64_t& start_time) { _start_time = start_time; 
}
+
     vectorized::VExprContextSPtr get_where_clause() { return _where_clause; }
 
 private:
@@ -553,6 +568,13 @@ private:
 
     int _max_failed_replicas(int64_t tablet_id);
 
+    int _load_required_replicas_num(int64_t tablet_id);
+
+    bool _quorum_success(const std::unordered_set<int64_t>& 
unfinished_node_channel_ids,
+                         const std::unordered_set<int64_t>& write_tablets);
+
+    int64_t _calc_max_wait_time_ms(const std::unordered_set<int64_t>& 
unfinished_node_channel_ids);
+
     VTabletWriter* _parent = nullptr;
     int64_t _index_id;
     vectorized::VExprContextSPtr _where_clause;
@@ -586,6 +608,8 @@ private:
     // rows num filtered by DeltaWriter per tablet, tablet_id -> <node_Id, 
filtered_rows_num>
     // used to verify whether the rows num filtered by different replicas is 
consistent
     std::map<int64_t, std::vector<std::pair<int64_t, int64_t>>> 
_tablets_filtered_rows;
+
+    int64_t _start_time = 0;
 };
 } // namespace vectorized
 } // namespace doris
diff --git 
a/regression-test/suites/fault_injection_p0/test_writer_fault_injection.groovy 
b/regression-test/suites/fault_injection_p0/test_writer_fault_injection.groovy
index 5f1d9fced72..351c009dd85 100644
--- 
a/regression-test/suites/fault_injection_p0/test_writer_fault_injection.groovy
+++ 
b/regression-test/suites/fault_injection_p0/test_writer_fault_injection.groovy
@@ -94,6 +94,8 @@ suite("test_writer_fault_injection", "nonConcurrent") {
         load_with_injection("IndexChannel.close_wait.timeout")
         // Test VTabletWriter close with _close_status not ok
         load_with_injection("VTabletWriter.close.close_status_not_ok")
+        // Test IndexChannel check_each_node_channel_close with close_status 
not ok
+        
load_with_injection("IndexChannel.check_each_node_channel_close.close_status_not_ok")
     } finally {
         sql """ set enable_memtable_on_sink_node=true """
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to