This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 3541a6da7a6 branch-4.0: [fix](load) fix quorum success invalid in
move-memtable-on-sink load path #60681 (#60820)
3541a6da7a6 is described below
commit 3541a6da7a60a9afd880dd5bda0bb5e690ba0191
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Feb 26 18:18:48 2026 +0800
branch-4.0: [fix](load) fix quorum success invalid in move-memtable-on-sink
load path #60681 (#60820)
Cherry-picked from #60681
Co-authored-by: hui lai <[email protected]>
---
be/src/runtime/load_stream.cpp | 1 +
be/src/vec/sink/writer/vtablet_writer_v2.cpp | 4 +--
.../load_p0/stream_load/test_sink_tolerate.groovy | 39 ++++++++++++++++++++++
3 files changed, 42 insertions(+), 2 deletions(-)
diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index f94e7864377..2e272e25c70 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -707,6 +707,7 @@ void LoadStream::_dispatch(StreamId id, const
PStreamHeader& hdr, butil::IOBuf*
}
} break;
case PStreamHeader::CLOSE_LOAD: {
+ DBUG_EXECUTE_IF("LoadStream.close_load.block", DBUG_BLOCK);
std::vector<int64_t> success_tablet_ids;
FailedTablets failed_tablets;
std::vector<PTabletID> tablets_to_commit(hdr.tablets().begin(),
hdr.tablets().end());
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 666954744f1..898bb10c437 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -102,11 +102,11 @@ Status VTabletWriterV2::_incremental_open_streams(
tablet.set_tablet_id(tablet_id);
new_backends.insert(node);
_tablets_for_node[node].emplace(tablet_id, tablet);
+ _tablets_by_node[node].emplace(tablet_id);
if (known_indexes.contains(index.index_id)) [[likely]] {
continue;
}
_indexes_from_node[node].emplace_back(tablet);
- _tablets_by_node[node].emplace(tablet_id);
known_indexes.insert(index.index_id);
VLOG_DEBUG << "incremental open stream (" << partition->id
<< ", " << tablet_id
<< ")";
@@ -343,11 +343,11 @@ Status VTabletWriterV2::_build_tablet_node_mapping() {
// ignore fake tablet for auto partition
continue;
}
+ _tablets_by_node[node].emplace(tablet_id);
if (known_indexes.contains(index.index_id)) [[likely]] {
continue;
}
_indexes_from_node[node].emplace_back(tablet);
- _tablets_by_node[node].emplace(tablet_id);
known_indexes.insert(index.index_id);
}
_build_tablet_replica_info(tablet_id, partition);
diff --git
a/regression-test/suites/load_p0/stream_load/test_sink_tolerate.groovy
b/regression-test/suites/load_p0/stream_load/test_sink_tolerate.groovy
index c55d80a70cd..3e63b5fd02b 100644
--- a/regression-test/suites/load_p0/stream_load/test_sink_tolerate.groovy
+++ b/regression-test/suites/load_p0/stream_load/test_sink_tolerate.groovy
@@ -117,5 +117,44 @@ suite("test_sink_tolerate", "docker") {
} finally {
GetDebugPoint().disableDebugPointForAllBEs("TabletStream.add_segment.add_segment_failed")
}
+
+ try {
+ // Enable close_load block on only the first BE (minor BE)
+ def firstBE = true
+ GetDebugPoint().operateDebugPointForAllBEs({ host, port ->
+ if (port == -1) return
+ if (firstBE) {
+ firstBE = false
+ GetDebugPoint().enableDebugPoint(host, port as int,
NodeType.BE, "LoadStream.close_load.block")
+ }
+ })
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', '\t'
+ set 'columns', 'k1, k2, v2, v10, v11'
+ set 'partitions', 'partition_a, partition_b, partition_c,
partition_d'
+ set 'strict_mode', 'true'
+ set 'memtable_on_sink_node', 'true'
+ file 'test_strict_mode.csv'
+ time 10000 // limit inflight 10s
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(2, json.NumberTotalRows)
+ assertEquals(0, json.NumberFilteredRows)
+ assertEquals(0, json.NumberUnselectedRows)
+ }
+ }
+ sql "sync"
+ def res = sql "select * from ${tableName}"
+ log.info("select result: ${res}".toString())
+ assertEquals(2, res.size())
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("LoadStream.close_load.block")
+ }
}
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]