This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 08ac74a43ff branch-3.1: [refactor](load) refactor wait logic after
quorum success #53884 (#54070)
08ac74a43ff is described below
commit 08ac74a43ff085b70b1d66ec911f31de94210268
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Jul 30 18:42:56 2025 +0800
branch-3.1: [refactor](load) refactor wait logic after quorum success
#53884 (#54070)
Cherry-picked from #53884
Co-authored-by: hui lai <[email protected]>
---
be/src/common/config.cpp | 2 +-
be/src/vec/sink/writer/vtablet_writer.cpp | 4 +++-
be/src/vec/sink/writer/vtablet_writer_v2.cpp | 4 +++-
3 files changed, 7 insertions(+), 3 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index fc1de3d1bce..b3f56b285db 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1530,7 +1530,7 @@ DEFINE_mBool(enable_compaction_pause_on_high_memory,
"true");
DEFINE_mBool(enable_quorum_success_write, "true");
DEFINE_mDouble(quorum_success_max_wait_multiplier, "0.2");
-DEFINE_mInt64(quorum_success_min_wait_seconds, "10");
+DEFINE_mInt64(quorum_success_min_wait_seconds, "60");
DEFINE_mInt32(quorum_success_remaining_timeout_seconds, "30");
DEFINE_mBool(enable_calc_delete_bitmap_between_segments_concurrently, "false");
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp
b/be/src/vec/sink/writer/vtablet_writer.cpp
index 94c4a33e73e..d14d603f6fa 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -343,6 +343,7 @@ Status IndexChannel::close_wait(
// 2. wait for all node channel to complete as much as possible
if (!unfinished_node_channel_ids.empty() &&
need_wait_after_quorum_success) {
+ int64_t arrival_quorum_success_time = UnixMillis();
int64_t max_wait_time_ms =
_calc_max_wait_time_ms(unfinished_node_channel_ids);
while (true) {
RETURN_IF_ERROR(check_each_node_channel_close(&unfinished_node_channel_ids,
@@ -351,7 +352,7 @@ Status IndexChannel::close_wait(
if (unfinished_node_channel_ids.empty()) {
break;
}
- int64_t elapsed_ms = UnixMillis() - _start_time;
+ int64_t elapsed_ms = UnixMillis() - arrival_quorum_success_time;
if (elapsed_ms > max_wait_time_ms ||
_parent->_load_channel_timeout_s - elapsed_ms / 1000 <
config::quorum_success_remaining_timeout_seconds) {
@@ -469,6 +470,7 @@ int64_t IndexChannel::_calc_max_wait_time_ms(
// 3. calculate max wait time
// introduce quorum_success_min_wait_seconds to avoid jitter of small load
+ max_wait_time_ms -= UnixMillis() - _start_time;
max_wait_time_ms =
std::max(static_cast<int64_t>(static_cast<double>(max_wait_time_ms) *
(1.0 +
config::quorum_success_max_wait_multiplier)),
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index f524891bcae..2c89975d142 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -773,6 +773,7 @@ Status VTabletWriterV2::_close_wait(
// 2. then wait for remaining streams as much as possible
if (!unfinished_streams.empty() && need_wait_after_quorum_success) {
+ int64_t arrival_quorum_success_time = UnixMillis();
int64_t max_wait_time_ms = _calc_max_wait_time_ms(streams_for_node,
unfinished_streams);
while (true) {
RETURN_IF_ERROR(_check_timeout());
@@ -780,7 +781,7 @@ Status VTabletWriterV2::_close_wait(
if (unfinished_streams.empty()) {
break;
}
- int64_t elapsed_ms = _timeout_watch.elapsed_time() / 1000 / 1000;
+ int64_t elapsed_ms = UnixMillis() - arrival_quorum_success_time;
if (elapsed_ms > max_wait_time_ms ||
_state->execution_timeout() - elapsed_ms / 1000 <
config::quorum_success_remaining_timeout_seconds) {
@@ -898,6 +899,7 @@ int64_t VTabletWriterV2::_calc_max_wait_time_ms(
// 3. calculate max wait time
// introduce quorum_success_min_wait_time_ms to avoid jitter of small load
+ max_wait_time_ms -= UnixMillis() - _timeout_watch.elapsed_time() / 1000 /
1000;
max_wait_time_ms =
std::max(static_cast<int64_t>(static_cast<double>(max_wait_time_ms) *
(1.0 +
config::quorum_success_max_wait_multiplier)),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]