This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 4fcb9d5cb04 branch-3.1: [feat](load) quorum success write (part I)
#52354 (#53255)
4fcb9d5cb04 is described below
commit 4fcb9d5cb043b6c20b2d224267e092c3e05726a4
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Jul 15 16:34:29 2025 +0800
branch-3.1: [feat](load) quorum success write (part I) #52354 (#53255)
Cherry-picked from #52354
Co-authored-by: hui lai <[email protected]>
---
be/src/common/config.cpp | 5 +
be/src/common/config.h | 5 +
be/src/vec/sink/writer/vtablet_writer.cpp | 159 +++++++++++++++++++--
be/src/vec/sink/writer/vtablet_writer.h | 26 +++-
.../test_writer_fault_injection.groovy | 2 +
5 files changed, 184 insertions(+), 13 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 6ef1742594f..dee3b83b6b7 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1530,6 +1530,11 @@ DEFINE_mInt32(load_trigger_compaction_version_percent,
"66");
DEFINE_mInt64(base_compaction_interval_seconds_since_last_operation, "86400");
DEFINE_mBool(enable_compaction_pause_on_high_memory, "true");
+DEFINE_mBool(enable_quorum_success_write, "true");
+DEFINE_mDouble(quorum_success_max_wait_multiplier, "0.2");
+DEFINE_mInt64(quorum_success_min_wait_seconds, "10");
+DEFINE_mInt32(quorum_success_remaining_timeout_seconds, "30");
+
DEFINE_mBool(enable_calc_delete_bitmap_between_segments_concurrently, "false");
DEFINE_mBool(enable_fetch_rowsets_from_peer_replicas, "false");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 45d7af789ab..4b50066fac3 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1606,6 +1606,11 @@ DECLARE_mInt32(load_trigger_compaction_version_percent);
DECLARE_mInt64(base_compaction_interval_seconds_since_last_operation);
DECLARE_mBool(enable_compaction_pause_on_high_memory);
+DECLARE_mBool(enable_quorum_success_write);
+DECLARE_mDouble(quorum_success_max_wait_multiplier);
+DECLARE_mInt64(quorum_success_min_wait_seconds);
+DECLARE_mInt32(quorum_success_remaining_timeout_seconds);
+
DECLARE_mBool(enable_calc_delete_bitmap_between_segments_concurrently);
DECLARE_mBool(enable_fetch_rowsets_from_peer_replicas);
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp
b/be/src/vec/sink/writer/vtablet_writer.cpp
index 64903c5b212..e1d1505cc66 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -197,6 +197,15 @@ int IndexChannel::_max_failed_replicas(int64_t tablet_id) {
return max_failed_replicas;
}
+int IndexChannel::_load_required_replicas_num(int64_t tablet_id) {
+ auto [total_replicas_num, load_required_replicas_num] =
+ _parent->_tablet_replica_info[tablet_id];
+ if (total_replicas_num == 0) {
+ return (_parent->_num_replicas + 1) / 2;
+ }
+ return load_required_replicas_num;
+}
+
Status IndexChannel::check_intolerable_failure() {
std::lock_guard<std::mutex> l(_fail_lock);
return _intolerable_failure_status;
@@ -301,23 +310,61 @@ static Status
cancel_channel_and_check_intolerable_failure(Status status,
Status IndexChannel::close_wait(
RuntimeState* state, WriterStats* writer_stats,
std::unordered_map<int64_t, AddBatchCounter>*
node_add_batch_counter_map,
- std::unordered_set<int64_t> unfinished_node_channel_ids) {
+ std::unordered_set<int64_t> unfinished_node_channel_ids,
+ bool need_wait_after_quorum_success) {
+ DBUG_EXECUTE_IF("IndexChannel.close_wait.timeout",
+ { return Status::TimedOut("injected timeout"); });
Status status = Status::OK();
+ // 1. wait quorum success
+ std::unordered_set<int64_t> write_tablets;
+ for (const auto& [node_id, node_channel] : _node_channels) {
+ auto node_channel_write_tablets = node_channel->write_tablets();
+ write_tablets.insert(node_channel_write_tablets.begin(),
node_channel_write_tablets.end());
+ }
while (true) {
- status = check_each_node_channel_close(&unfinished_node_channel_ids,
- node_add_batch_counter_map,
writer_stats, status);
- if (!status.ok() || unfinished_node_channel_ids.empty()) {
- LOG(INFO) << ", is all unfinished: " <<
unfinished_node_channel_ids.empty()
- << ", status: " << status << ", txn_id: " <<
_parent->_txn_id
+ RETURN_IF_ERROR(check_each_node_channel_close(
+ &unfinished_node_channel_ids, node_add_batch_counter_map,
writer_stats, status));
+ bool quorum_success = _quorum_success(unfinished_node_channel_ids,
write_tablets);
+ if (unfinished_node_channel_ids.empty() || quorum_success) {
+ LOG(INFO) << "quorum_success: " << quorum_success
+ << ", is all unfinished: " <<
unfinished_node_channel_ids.empty()
+ << ", txn_id: " << _parent->_txn_id
<< ", load_id: " << print_id(_parent->_load_id);
break;
}
bthread_usleep(1000 * 10);
}
- DBUG_EXECUTE_IF("IndexChannel.close_wait.timeout",
- { status = Status::TimedOut("injected timeout"); });
-
+ // 2. wait for all node channel to complete as much as possible
+ if (!unfinished_node_channel_ids.empty() &&
need_wait_after_quorum_success) {
+ int64_t max_wait_time_ms =
_calc_max_wait_time_ms(unfinished_node_channel_ids);
+ while (true) {
+
RETURN_IF_ERROR(check_each_node_channel_close(&unfinished_node_channel_ids,
+
node_add_batch_counter_map, writer_stats,
+ status));
+ if (unfinished_node_channel_ids.empty()) {
+ break;
+ }
+ int64_t elapsed_ms = UnixMillis() - _start_time;
+ if (elapsed_ms > max_wait_time_ms ||
+ _parent->_load_channel_timeout_s - elapsed_ms / 1000 <
+ config::quorum_success_remaining_timeout_seconds) {
+ // cancel unfinished node channel
+ std::stringstream unfinished_node_channel_host_str;
+ for (auto& it : unfinished_node_channel_ids) {
+ unfinished_node_channel_host_str <<
_node_channels[it]->host() << ",";
+ _node_channels[it]->cancel("timeout");
+ }
+ LOG(WARNING) << "reach max wait time, max_wait_time_ms: " <<
max_wait_time_ms
+ << ", cancel unfinished node channel and finish
close"
+ << ", load id: " << print_id(_parent->_load_id)
+ << ", txn_id: " << _parent->_txn_id << ",
unfinished node channel: "
+ << unfinished_node_channel_host_str.str();
+ break;
+ }
+ bthread_usleep(1000 * 10);
+ }
+ }
return status;
}
@@ -340,6 +387,8 @@ Status IndexChannel::check_each_node_channel_close(
node_add_batch_counter_map);
unfinished_node_channel_ids->erase(it.first);
}
+
DBUG_EXECUTE_IF("IndexChannel.check_each_node_channel_close.close_status_not_ok",
+ { close_status = Status::InternalError("injected close
status not ok"); });
if (!close_status.ok()) {
final_status = cancel_channel_and_check_intolerable_failure(
std::move(final_status), close_status.to_string(), *this,
*it.second);
@@ -349,6 +398,80 @@ Status IndexChannel::check_each_node_channel_close(
return final_status;
}
+bool IndexChannel::_quorum_success(const std::unordered_set<int64_t>&
unfinished_node_channel_ids,
+ const std::unordered_set<int64_t>&
write_tablets) {
+ if (!config::enable_quorum_success_write) {
+ return false;
+ }
+ std::unordered_map<int64_t, int64_t> finished_tablets_replica;
+
+ // 1. collect all write tablets and finished tablets
+ for (const auto& [node_id, node_channel] : _node_channels) {
+ auto node_channel_write_tablets = node_channel->write_tablets();
+ if (unfinished_node_channel_ids.contains(node_id) ||
!node_channel->check_status().ok()) {
+ continue;
+ }
+ for (const auto& tablet_id : node_channel_write_tablets) {
+ finished_tablets_replica[tablet_id]++;
+ }
+ }
+
+ // 2. check if quorum success
+ if (write_tablets.empty()) {
+ return false;
+ }
+ for (const auto& tablet_id : write_tablets) {
+ if (finished_tablets_replica[tablet_id] <
_load_required_replicas_num(tablet_id)) {
+ return false;
+ }
+ }
+
+ return true;
+}
+
+int64_t IndexChannel::_calc_max_wait_time_ms(
+ const std::unordered_set<int64_t>& unfinished_node_channel_ids) {
+ // 1. calculate avg speed of all unfinished node channel
+ int64_t elapsed_ms = UnixMillis() - _start_time;
+ int64_t total_bytes = 0;
+ int finished_count = 0;
+ for (const auto& [node_id, node_channel] : _node_channels) {
+ if (unfinished_node_channel_ids.contains(node_id)) {
+ continue;
+ }
+ total_bytes += node_channel->write_bytes();
+ finished_count++;
+ }
+ // no data loaded in index channel, return 0
+ if (total_bytes == 0 || finished_count == 0) {
+ return 0;
+ }
+ // if elapsed_ms is equal to 0, explain the loaded data is too small
+ if (elapsed_ms <= 0) {
+ return config::quorum_success_min_wait_seconds * 1000;
+ }
+ double avg_speed =
+ static_cast<double>(total_bytes) /
(static_cast<double>(elapsed_ms) * finished_count);
+
+ // 2. calculate max wait time of each unfinished node channel and return
the max value
+ int64_t max_wait_time_ms = 0;
+ for (int64_t id : unfinished_node_channel_ids) {
+ int64_t bytes = _node_channels[id]->write_bytes();
+ int64_t wait =
+ avg_speed > 0 ?
static_cast<int64_t>(static_cast<double>(bytes) / avg_speed) : 0;
+ max_wait_time_ms = std::max(max_wait_time_ms, wait);
+ }
+
+ // 3. calculate max wait time
+ // introduce quorum_success_min_wait_seconds to avoid jitter of small load
+ max_wait_time_ms =
+
std::max(static_cast<int64_t>(static_cast<double>(max_wait_time_ms) *
+ (1.0 +
config::quorum_success_max_wait_multiplier)),
+ config::quorum_success_min_wait_seconds * 1000);
+
+ return max_wait_time_ms;
+}
+
static Status none_of(std::initializer_list<bool> vars) {
bool none = std::none_of(vars.begin(), vars.end(), [](bool var) { return
var; });
Status st = Status::OK();
@@ -632,6 +755,11 @@ Status VNodeChannel::add_block(vectorized::Block* block,
const Payload* payload)
for (auto tablet_id : payload->second) {
_cur_add_block_request->add_tablet_ids(tablet_id);
}
+ {
+ std::lock_guard<std::mutex> l(_write_tablets_lock);
+ _write_tablets.insert(payload->second.begin(), payload->second.end());
+ }
+ _write_bytes.fetch_add(_cur_mutable_block->bytes());
if (_cur_mutable_block->rows() >= _batch_size ||
_cur_mutable_block->bytes() > config::doris_scanner_row_bytes) {
@@ -1081,6 +1209,10 @@ Status VNodeChannel::after_close_handle(
return st;
}
+Status VNodeChannel::check_status() {
+ return none_of({_cancelled, !_eos_is_produced});
+}
+
void VNodeChannel::_close_check() {
std::lock_guard<std::mutex> lg(_pending_batches_lock);
CHECK(_pending_blocks.empty()) << name();
@@ -1200,6 +1332,7 @@ Status VTabletWriter::open(doris::RuntimeState* state,
doris::RuntimeProfile* pr
VLOG_DEBUG << "list of open index id = " << fmt::to_string(buf);
for (const auto& index_channel : _channels) {
+ index_channel->set_start_time(UnixMillis());
index_channel->for_each_node_channel([&index_channel](
const
std::shared_ptr<VNodeChannel>& ch) {
auto st = ch->open_wait();
@@ -1577,9 +1710,11 @@ void VTabletWriter::_do_try_close(RuntimeState* state,
const Status& exec_status
if (!status.ok()) {
break;
}
-
+ // Do not need to wait after quorum success,
+ // for first-stage close_wait only ensure incremental node
channels load has been completed,
+ // unified waiting in the second-stage close_wait.
status = index_channel->close_wait(_state, nullptr, nullptr,
-
index_channel->init_node_channel_ids());
+
index_channel->init_node_channel_ids(), false);
if (!status.ok()) {
break;
}
@@ -1657,7 +1792,7 @@ Status VTabletWriter::close(Status exec_status) {
int64_t add_batch_exec_time = 0;
int64_t wait_exec_time = 0;
status = index_channel->close_wait(_state, &writer_stats,
&node_add_batch_counter_map,
-
index_channel->each_node_channel_ids());
+
index_channel->each_node_channel_ids(), true);
// Due to the non-determinism of compaction, the rowsets of each
replica may be different from each other on different
// BE nodes. The number of rows filtered in SegmentWriter depends
on the historical rowsets located in the correspoding
diff --git a/be/src/vec/sink/writer/vtablet_writer.h
b/be/src/vec/sink/writer/vtablet_writer.h
index 8a15df9a252..4948e25448f 100644
--- a/be/src/vec/sink/writer/vtablet_writer.h
+++ b/be/src/vec/sink/writer/vtablet_writer.h
@@ -292,6 +292,8 @@ public:
RuntimeState* state, WriterStats* writer_stats,
std::unordered_map<int64_t, AddBatchCounter>*
node_add_batch_counter_map);
+ Status check_status();
+
void cancel(const std::string& cancel_msg);
void time_report(std::unordered_map<int64_t, AddBatchCounter>*
add_batch_counter_map,
@@ -326,6 +328,12 @@ public:
bool is_incremental() const { return _is_incremental; }
+ int64_t write_bytes() const { return _write_bytes.load(); }
+ std::unordered_set<int64_t> write_tablets() {
+ std::lock_guard<std::mutex> l(_write_tablets_lock);
+ return _write_tablets;
+ }
+
protected:
// make a real open request for relative BE's load channel.
void _open_internal(bool is_incremental);
@@ -428,6 +436,10 @@ protected:
int64_t _wg_id = -1;
bool _is_incremental;
+
+ std::mutex _write_tablets_lock;
+ std::atomic<int64_t> _write_bytes {0};
+ std::unordered_set<int64_t> _write_tablets;
};
// an IndexChannel is related to specific table and its rollup and mv
@@ -505,7 +517,8 @@ public:
Status close_wait(RuntimeState* state, WriterStats* writer_stats,
std::unordered_map<int64_t, AddBatchCounter>*
node_add_batch_counter_map,
- std::unordered_set<int64_t> unfinished_node_channel_ids);
+ std::unordered_set<int64_t> unfinished_node_channel_ids,
+ bool need_wait_after_quorum_success);
Status check_each_node_channel_close(
std::unordered_set<int64_t>* unfinished_node_channel_ids,
@@ -544,6 +557,8 @@ public:
// check whether the rows num filtered by different replicas is consistent
Status check_tablet_filtered_rows_consistency();
+ void set_start_time(const int64_t& start_time) { _start_time = start_time;
}
+
vectorized::VExprContextSPtr get_where_clause() { return _where_clause; }
private:
@@ -553,6 +568,13 @@ private:
int _max_failed_replicas(int64_t tablet_id);
+ int _load_required_replicas_num(int64_t tablet_id);
+
+ bool _quorum_success(const std::unordered_set<int64_t>&
unfinished_node_channel_ids,
+ const std::unordered_set<int64_t>& write_tablets);
+
+ int64_t _calc_max_wait_time_ms(const std::unordered_set<int64_t>&
unfinished_node_channel_ids);
+
VTabletWriter* _parent = nullptr;
int64_t _index_id;
vectorized::VExprContextSPtr _where_clause;
@@ -586,6 +608,8 @@ private:
// rows num filtered by DeltaWriter per tablet, tablet_id -> <node_Id,
filtered_rows_num>
// used to verify whether the rows num filtered by different replicas is
consistent
std::map<int64_t, std::vector<std::pair<int64_t, int64_t>>>
_tablets_filtered_rows;
+
+ int64_t _start_time = 0;
};
} // namespace vectorized
} // namespace doris
diff --git
a/regression-test/suites/fault_injection_p0/test_writer_fault_injection.groovy
b/regression-test/suites/fault_injection_p0/test_writer_fault_injection.groovy
index 5f1d9fced72..351c009dd85 100644
---
a/regression-test/suites/fault_injection_p0/test_writer_fault_injection.groovy
+++
b/regression-test/suites/fault_injection_p0/test_writer_fault_injection.groovy
@@ -94,6 +94,8 @@ suite("test_writer_fault_injection", "nonConcurrent") {
load_with_injection("IndexChannel.close_wait.timeout")
// Test VTabletWriter close with _close_status not ok
load_with_injection("VTabletWriter.close.close_status_not_ok")
+ // Test IndexChannel check_each_node_channel_close with close_status
not ok
+
load_with_injection("IndexChannel.check_each_node_channel_close.close_status_not_ok")
} finally {
sql """ set enable_memtable_on_sink_node=true """
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]