This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 0744fe7ff86 [fix](load) fix quorum success invalid in 
move-memtable-on-sink load path (#60681)
0744fe7ff86 is described below

commit 0744fe7ff86efd8b31055d7b9a970f43cf76e001
Author: hui lai <[email protected]>
AuthorDate: Wed Feb 25 11:11:32 2026 +0800

    [fix](load) fix quorum success invalid in move-memtable-on-sink load path 
(#60681)
    
    ## Description
    
    In `VTabletWriterV2`, the `_quorum_success()` function always returns
    `false` even when
    all streams have finished successfully, making the quorum success write
    feature effectively
    non-functional.
    
    ## Root Cause
    
    In both `_build_tablet_node_mapping()` and
    `_incremental_open_streams()`,
    `_tablets_by_node[node].emplace(tablet_id)` is guarded by the
    `known_indexes` check:
    
    ```cpp
    if (known_indexes.contains(index.index_id)) [[likely]] {
        continue;
    }
    _indexes_from_node[node].emplace_back(tablet);
    _tablets_by_node[node].emplace(tablet_id);
    known_indexes.insert(index.index_id);
    ```
    
    The known_indexes set is shared across all partitions, tablets, and
    nodes. Once an
    index_id is inserted after processing the first tablet's first node, all
    subsequent
    tablets (and all other nodes of the same tablet) with the same index_id
    skip the
    _tablets_by_node update.
    
    For example, with 1 index, 3 tablets [T1, T2, T3], and 3 replicas [N1,
    N2, N3]:
    
    Only _tablets_by_node[N1] = {T1} gets populated
    T1 on N2/N3, and T2/T3 on all nodes are skipped
    This causes _quorum_success() to compute finished_tablets_replica
    incorrectly:
    
    finished_tablets_replica[T1] = 1 (only counted from N1)
    finished_tablets_replica[T2] = 0, finished_tablets_replica[T3] = 0
    With a quorum requirement of 2 (for 3 replicas), the check always fails.
    
    The known_indexes optimization was intended only for _indexes_from_node
    (to avoid
    sending duplicate schema info per index), but it incorrectly also
    blocked the
    _tablets_by_node population.
    
    Note: vtablet_writer.cpp does NOT have this issue — its
    _tablets_by_channel is
    populated without the known_indexes guard.
    
    ## Fix
    Move _tablets_by_node[node].emplace(tablet_id) before the known_indexes
    check in
    both _build_tablet_node_mapping() and _incremental_open_streams(), so
    that every
    tablet on every node is correctly recorded.
---
 be/src/runtime/load_stream.cpp                     |  1 +
 be/src/vec/sink/writer/vtablet_writer_v2.cpp       |  4 +--
 .../load_p0/stream_load/test_sink_tolerate.groovy  | 39 ++++++++++++++++++++++
 3 files changed, 42 insertions(+), 2 deletions(-)

diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index bd083127ab1..1067f4c5193 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -760,6 +760,7 @@ void LoadStream::_dispatch(StreamId id, const 
PStreamHeader& hdr, butil::IOBuf*
         }
     } break;
     case PStreamHeader::CLOSE_LOAD: {
+        DBUG_EXECUTE_IF("LoadStream.close_load.block", DBUG_BLOCK);
         std::vector<int64_t> success_tablet_ids;
         FailedTablets failed_tablets;
         std::vector<PTabletID> tablets_to_commit(hdr.tablets().begin(), 
hdr.tablets().end());
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index dad781059e5..04901208096 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -104,11 +104,11 @@ Status VTabletWriterV2::_incremental_open_streams(
                     tablet.set_tablet_id(tablet_id);
                     new_backends.insert(node);
                     _tablets_for_node[node].emplace(tablet_id, tablet);
+                    _tablets_by_node[node].emplace(tablet_id);
                     if (known_indexes.contains(index.index_id)) [[likely]] {
                         continue;
                     }
                     _indexes_from_node[node].emplace_back(tablet);
-                    _tablets_by_node[node].emplace(tablet_id);
                     known_indexes.insert(index.index_id);
                     VLOG_DEBUG << "incremental open stream (" << partition->id 
<< ", " << tablet_id
                                << ")";
@@ -347,11 +347,11 @@ Status VTabletWriterV2::_build_tablet_node_mapping() {
                         // ignore fake tablet for auto partition
                         continue;
                     }
+                    _tablets_by_node[node].emplace(tablet_id);
                     if (known_indexes.contains(index.index_id)) [[likely]] {
                         continue;
                     }
                     _indexes_from_node[node].emplace_back(tablet);
-                    _tablets_by_node[node].emplace(tablet_id);
                     known_indexes.insert(index.index_id);
                 }
                 _build_tablet_replica_info(tablet_id, partition);
diff --git 
a/regression-test/suites/load_p0/stream_load/test_sink_tolerate.groovy 
b/regression-test/suites/load_p0/stream_load/test_sink_tolerate.groovy
index c55d80a70cd..3e63b5fd02b 100644
--- a/regression-test/suites/load_p0/stream_load/test_sink_tolerate.groovy
+++ b/regression-test/suites/load_p0/stream_load/test_sink_tolerate.groovy
@@ -117,5 +117,44 @@ suite("test_sink_tolerate", "docker") {
         } finally {
             
GetDebugPoint().disableDebugPointForAllBEs("TabletStream.add_segment.add_segment_failed")
         }
+
+        try {
+            // Enable close_load block on only the first BE (minor BE)
+            def firstBE = true
+            GetDebugPoint().operateDebugPointForAllBEs({ host, port ->
+                if (port == -1) return
+                if (firstBE) {
+                    firstBE = false
+                    GetDebugPoint().enableDebugPoint(host, port as int, 
NodeType.BE, "LoadStream.close_load.block")
+                }
+            })
+            streamLoad {
+                table "${tableName}"
+                set 'column_separator', '\t'
+                set 'columns', 'k1, k2, v2, v10, v11'
+                set 'partitions', 'partition_a, partition_b, partition_c, 
partition_d'
+                set 'strict_mode', 'true'
+                set 'memtable_on_sink_node', 'true'
+                file 'test_strict_mode.csv'
+                time 10000 // limit inflight 10s
+                check { result, exception, startTime, endTime ->
+                    if (exception != null) {
+                        throw exception
+                    }
+                    log.info("Stream load result: ${result}".toString())
+                    def json = parseJson(result)
+                    assertEquals("success", json.Status.toLowerCase())
+                    assertEquals(2, json.NumberTotalRows)
+                    assertEquals(0, json.NumberFilteredRows)
+                    assertEquals(0, json.NumberUnselectedRows)
+                }
+            }
+            sql "sync"
+            def res = sql "select * from ${tableName}"
+            log.info("select result: ${res}".toString())
+            assertEquals(2, res.size())
+        } finally {
+            
GetDebugPoint().disableDebugPointForAllBEs("LoadStream.close_load.block")
+        }
     }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to