This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 3541a6da7a6 branch-4.0: [fix](load) fix quorum success invalid in 
move-memtable-on-sink load path #60681 (#60820)
3541a6da7a6 is described below

commit 3541a6da7a60a9afd880dd5bda0bb5e690ba0191
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Feb 26 18:18:48 2026 +0800

    branch-4.0: [fix](load) fix quorum success invalid in move-memtable-on-sink 
load path #60681 (#60820)
    
    Cherry-picked from #60681
    
    Co-authored-by: hui lai <[email protected]>
---
 be/src/runtime/load_stream.cpp                     |  1 +
 be/src/vec/sink/writer/vtablet_writer_v2.cpp       |  4 +--
 .../load_p0/stream_load/test_sink_tolerate.groovy  | 39 ++++++++++++++++++++++
 3 files changed, 42 insertions(+), 2 deletions(-)

diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index f94e7864377..2e272e25c70 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -707,6 +707,7 @@ void LoadStream::_dispatch(StreamId id, const 
PStreamHeader& hdr, butil::IOBuf*
         }
     } break;
     case PStreamHeader::CLOSE_LOAD: {
+        DBUG_EXECUTE_IF("LoadStream.close_load.block", DBUG_BLOCK);
         std::vector<int64_t> success_tablet_ids;
         FailedTablets failed_tablets;
         std::vector<PTabletID> tablets_to_commit(hdr.tablets().begin(), 
hdr.tablets().end());
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 666954744f1..898bb10c437 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -102,11 +102,11 @@ Status VTabletWriterV2::_incremental_open_streams(
                     tablet.set_tablet_id(tablet_id);
                     new_backends.insert(node);
                     _tablets_for_node[node].emplace(tablet_id, tablet);
+                    _tablets_by_node[node].emplace(tablet_id);
                     if (known_indexes.contains(index.index_id)) [[likely]] {
                         continue;
                     }
                     _indexes_from_node[node].emplace_back(tablet);
-                    _tablets_by_node[node].emplace(tablet_id);
                     known_indexes.insert(index.index_id);
                     VLOG_DEBUG << "incremental open stream (" << partition->id 
<< ", " << tablet_id
                                << ")";
@@ -343,11 +343,11 @@ Status VTabletWriterV2::_build_tablet_node_mapping() {
                         // ignore fake tablet for auto partition
                         continue;
                     }
+                    _tablets_by_node[node].emplace(tablet_id);
                     if (known_indexes.contains(index.index_id)) [[likely]] {
                         continue;
                     }
                     _indexes_from_node[node].emplace_back(tablet);
-                    _tablets_by_node[node].emplace(tablet_id);
                     known_indexes.insert(index.index_id);
                 }
                 _build_tablet_replica_info(tablet_id, partition);
diff --git 
a/regression-test/suites/load_p0/stream_load/test_sink_tolerate.groovy 
b/regression-test/suites/load_p0/stream_load/test_sink_tolerate.groovy
index c55d80a70cd..3e63b5fd02b 100644
--- a/regression-test/suites/load_p0/stream_load/test_sink_tolerate.groovy
+++ b/regression-test/suites/load_p0/stream_load/test_sink_tolerate.groovy
@@ -117,5 +117,44 @@ suite("test_sink_tolerate", "docker") {
         } finally {
             
GetDebugPoint().disableDebugPointForAllBEs("TabletStream.add_segment.add_segment_failed")
         }
+
+        try {
+            // Enable close_load block on only the first BE (minor BE)
+            def firstBE = true
+            GetDebugPoint().operateDebugPointForAllBEs({ host, port ->
+                if (port == -1) return
+                if (firstBE) {
+                    firstBE = false
+                    GetDebugPoint().enableDebugPoint(host, port as int, 
NodeType.BE, "LoadStream.close_load.block")
+                }
+            })
+            streamLoad {
+                table "${tableName}"
+                set 'column_separator', '\t'
+                set 'columns', 'k1, k2, v2, v10, v11'
+                set 'partitions', 'partition_a, partition_b, partition_c, 
partition_d'
+                set 'strict_mode', 'true'
+                set 'memtable_on_sink_node', 'true'
+                file 'test_strict_mode.csv'
+                time 10000 // limit inflight 10s
+                check { result, exception, startTime, endTime ->
+                    if (exception != null) {
+                        throw exception
+                    }
+                    log.info("Stream load result: ${result}".toString())
+                    def json = parseJson(result)
+                    assertEquals("success", json.Status.toLowerCase())
+                    assertEquals(2, json.NumberTotalRows)
+                    assertEquals(0, json.NumberFilteredRows)
+                    assertEquals(0, json.NumberUnselectedRows)
+                }
+            }
+            sql "sync"
+            def res = sql "select * from ${tableName}"
+            log.info("select result: ${res}".toString())
+            assertEquals(2, res.size())
+        } finally {
+            
GetDebugPoint().disableDebugPointForAllBEs("LoadStream.close_load.block")
+        }
     }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to