This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0744fe7ff86 [fix](load) fix quorum success invalid in
move-memtable-on-sink load path (#60681)
0744fe7ff86 is described below
commit 0744fe7ff86efd8b31055d7b9a970f43cf76e001
Author: hui lai <[email protected]>
AuthorDate: Wed Feb 25 11:11:32 2026 +0800
[fix](load) fix quorum success invalid in move-memtable-on-sink load path
(#60681)
## Description
In `VTabletWriterV2`, the `_quorum_success()` function always returns
`false` even when
all streams have finished successfully, making the quorum success write
feature effectively
non-functional.
## Root Cause
In both `_build_tablet_node_mapping()` and
`_incremental_open_streams()`,
`_tablets_by_node[node].emplace(tablet_id)` is guarded by the
`known_indexes` check:
```cpp
if (known_indexes.contains(index.index_id)) [[likely]] {
continue;
}
_indexes_from_node[node].emplace_back(tablet);
_tablets_by_node[node].emplace(tablet_id);
known_indexes.insert(index.index_id);
```
The known_indexes set is shared across all partitions, tablets, and
nodes. Once an
index_id is inserted after processing the first tablet's first node, all
subsequent
tablets (and all other nodes of the same tablet) with the same index_id
skip the
_tablets_by_node update.
For example, with 1 index, 3 tablets [T1, T2, T3], and 3 replicas [N1,
N2, N3]:
Only _tablets_by_node[N1] = {T1} gets populated
T1 on N2/N3, and T2/T3 on all nodes are skipped
This causes _quorum_success() to compute finished_tablets_replica
incorrectly:
finished_tablets_replica[T1] = 1 (only counted from N1)
finished_tablets_replica[T2] = 0, finished_tablets_replica[T3] = 0
With a quorum requirement of 2 (for 3 replicas), the check always fails.
The known_indexes optimization was intended only for _indexes_from_node
(to avoid
sending duplicate schema info per index), but it incorrectly also
blocked the
_tablets_by_node population.
Note: vtablet_writer.cpp does NOT have this issue — its
_tablets_by_channel is
populated without the known_indexes guard.
## Fix
Move _tablets_by_node[node].emplace(tablet_id) before the known_indexes
check in
both _build_tablet_node_mapping() and _incremental_open_streams(), so
that every
tablet on every node is correctly recorded.
---
be/src/runtime/load_stream.cpp | 1 +
be/src/vec/sink/writer/vtablet_writer_v2.cpp | 4 +--
.../load_p0/stream_load/test_sink_tolerate.groovy | 39 ++++++++++++++++++++++
3 files changed, 42 insertions(+), 2 deletions(-)
diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index bd083127ab1..1067f4c5193 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -760,6 +760,7 @@ void LoadStream::_dispatch(StreamId id, const
PStreamHeader& hdr, butil::IOBuf*
}
} break;
case PStreamHeader::CLOSE_LOAD: {
+ DBUG_EXECUTE_IF("LoadStream.close_load.block", DBUG_BLOCK);
std::vector<int64_t> success_tablet_ids;
FailedTablets failed_tablets;
std::vector<PTabletID> tablets_to_commit(hdr.tablets().begin(),
hdr.tablets().end());
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index dad781059e5..04901208096 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -104,11 +104,11 @@ Status VTabletWriterV2::_incremental_open_streams(
tablet.set_tablet_id(tablet_id);
new_backends.insert(node);
_tablets_for_node[node].emplace(tablet_id, tablet);
+ _tablets_by_node[node].emplace(tablet_id);
if (known_indexes.contains(index.index_id)) [[likely]] {
continue;
}
_indexes_from_node[node].emplace_back(tablet);
- _tablets_by_node[node].emplace(tablet_id);
known_indexes.insert(index.index_id);
VLOG_DEBUG << "incremental open stream (" << partition->id
<< ", " << tablet_id
<< ")";
@@ -347,11 +347,11 @@ Status VTabletWriterV2::_build_tablet_node_mapping() {
// ignore fake tablet for auto partition
continue;
}
+ _tablets_by_node[node].emplace(tablet_id);
if (known_indexes.contains(index.index_id)) [[likely]] {
continue;
}
_indexes_from_node[node].emplace_back(tablet);
- _tablets_by_node[node].emplace(tablet_id);
known_indexes.insert(index.index_id);
}
_build_tablet_replica_info(tablet_id, partition);
diff --git
a/regression-test/suites/load_p0/stream_load/test_sink_tolerate.groovy
b/regression-test/suites/load_p0/stream_load/test_sink_tolerate.groovy
index c55d80a70cd..3e63b5fd02b 100644
--- a/regression-test/suites/load_p0/stream_load/test_sink_tolerate.groovy
+++ b/regression-test/suites/load_p0/stream_load/test_sink_tolerate.groovy
@@ -117,5 +117,44 @@ suite("test_sink_tolerate", "docker") {
} finally {
GetDebugPoint().disableDebugPointForAllBEs("TabletStream.add_segment.add_segment_failed")
}
+
+ try {
+ // Enable close_load block on only the first BE (minor BE)
+ def firstBE = true
+ GetDebugPoint().operateDebugPointForAllBEs({ host, port ->
+ if (port == -1) return
+ if (firstBE) {
+ firstBE = false
+ GetDebugPoint().enableDebugPoint(host, port as int,
NodeType.BE, "LoadStream.close_load.block")
+ }
+ })
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', '\t'
+ set 'columns', 'k1, k2, v2, v10, v11'
+ set 'partitions', 'partition_a, partition_b, partition_c,
partition_d'
+ set 'strict_mode', 'true'
+ set 'memtable_on_sink_node', 'true'
+ file 'test_strict_mode.csv'
+ time 10000 // limit inflight 10s
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(2, json.NumberTotalRows)
+ assertEquals(0, json.NumberFilteredRows)
+ assertEquals(0, json.NumberUnselectedRows)
+ }
+ }
+ sql "sync"
+ def res = sql "select * from ${tableName}"
+ log.info("select result: ${res}".toString())
+ assertEquals(2, res.size())
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("LoadStream.close_load.block")
+ }
}
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]