This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 335ebf6a8e8 [fix](move-memtable) fix close wait timeout if part of 
streams connection failed (#59905)
335ebf6a8e8 is described below

commit 335ebf6a8e8f775c9302ab06df9de6d080074794
Author: hui lai <[email protected]>
AuthorDate: Sun Jan 18 21:04:19 2026 +0800

    [fix](move-memtable) fix close wait timeout if part of streams connection 
failed (#59905)
    
    ### What problem does this PR solve?
    
    Bug scenario:
    1. LoadStreamStubs has multiple LoadStreamStub objects (stream_per_node
    >= 2)
    2. First stream opens successfully (_is_open = true)
    3. Second stream fails to open
    4. LoadStreamStubs::open() calls cancel() on ALL streams (including the
    successful one)
    5. The first stream now has: _is_open = true, _is_cancelled = true,
    _is_closing = false
    6. Without the fix, close_finish_check() would mark it as not closed
    (stuck in unfinished_streams)
    7. This causes close_wait to timeout
---
 be/src/vec/sink/load_stream_stub.cpp               | 15 ++++
 .../test_cancelled_stream_close_wait.groovy        | 94 ++++++++++++++++++++++
 2 files changed, 109 insertions(+)

diff --git a/be/src/vec/sink/load_stream_stub.cpp 
b/be/src/vec/sink/load_stream_stub.cpp
index 4671c9fb78d..faf2b0c1776 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -342,6 +342,11 @@ Status LoadStreamStub::close_finish_check(RuntimeState* 
state, bool* is_closed)
         // we don't need to close wait on non-open streams
         return Status::OK();
     }
+    // If stream is cancelled (e.g., due to connection failure), treat it as 
closed
+    // to avoid waiting indefinitely for a stream that will never respond.
+    if (_is_cancelled.load()) {
+        return check_cancel();
+    }
     if (state->get_query_ctx()->is_cancelled()) {
         return state->get_query_ctx()->exec_status();
     }
@@ -562,6 +567,7 @@ Status 
LoadStreamStubs::open(BrpcClientCache<PBackendService_Stub>* client_cache
                              int64_t idle_timeout_ms, bool enable_profile) {
     bool get_schema = true;
     auto status = Status::OK();
+    bool first_stream = true;
     for (auto& stream : _streams) {
         Status st;
         if (get_schema) {
@@ -571,6 +577,14 @@ Status 
LoadStreamStubs::open(BrpcClientCache<PBackendService_Stub>* client_cache
             st = stream->open(client_cache, node_info, txn_id, schema, {}, 
total_streams,
                               idle_timeout_ms, enable_profile);
         }
+        // Simulate one stream open failure within LoadStreamStubs.
+        // This causes the successfully opened streams to be cancelled,
+        // reproducing the bug where cancelled streams cause close_wait 
timeout.
+        DBUG_EXECUTE_IF("LoadStreamStubs.open.fail_one_stream", {
+            if (st.ok() && !first_stream) {
+                st = Status::InternalError("Injected stream open failure");
+            }
+        });
         if (st.ok()) {
             get_schema = false;
         } else {
@@ -578,6 +592,7 @@ Status 
LoadStreamStubs::open(BrpcClientCache<PBackendService_Stub>* client_cache
             status = st;
             // no break here to try get schema from the rest streams
         }
+        first_stream = false;
     }
     // only mark open when all streams open success
     _open_success.store(status.ok());
diff --git 
a/regression-test/suites/fault_injection_p0/test_cancelled_stream_close_wait.groovy
 
b/regression-test/suites/fault_injection_p0/test_cancelled_stream_close_wait.groovy
new file mode 100644
index 00000000000..750b689a588
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/test_cancelled_stream_close_wait.groovy
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.apache.doris.regression.util.Http
+
+suite("test_cancelled_stream_close_wait", "nonConcurrent") {
+    if (!isCloudMode()) {
+        sql """ DROP TABLE IF EXISTS `baseall` """
+        sql """ DROP TABLE IF EXISTS `test` """
+        sql """
+            CREATE TABLE IF NOT EXISTS `baseall` (
+                `k0` boolean null comment "",
+                `k1` tinyint(4) null comment "",
+                `k2` smallint(6) null comment "",
+                `k3` int(11) null comment "",
+                `k4` bigint(20) null comment "",
+                `k5` decimal(9, 3) null comment "",
+                `k6` char(5) null comment "",
+                `k10` date null comment "",
+                `k11` datetime null comment "",
+                `k7` varchar(20) null comment "",
+                `k8` double max null comment "",
+                `k9` float sum null comment "",
+                `k12` string replace null comment "",
+                `k13` largeint(40) replace null comment ""
+            ) engine=olap
+            DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = 
"1")
+            """
+        sql """
+            CREATE TABLE IF NOT EXISTS `test` (
+                `k0` boolean null comment "",
+                `k1` tinyint(4) null comment "",
+                `k2` smallint(6) null comment "",
+                `k3` int(11) null comment "",
+                `k4` bigint(20) null comment "",
+                `k5` decimal(9, 3) null comment "",
+                `k6` char(5) null comment "",
+                `k10` date null comment "",
+                `k11` datetime null comment "",
+                `k7` varchar(20) null comment "",
+                `k8` double max null comment "",
+                `k9` float sum null comment "",
+                `k12` string replace_if_not_null null comment "",
+                `k13` largeint(40) replace null comment ""
+            ) engine=olap
+            DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = 
"1")
+            """
+
+        GetDebugPoint().clearDebugPointsForAllBEs()
+        streamLoad {
+            table "baseall"
+            db "regression_test_fault_injection_p0"
+            set 'column_separator', ','
+            file "baseall.txt"
+        }
+
+        // Test: Simulate one stream open failure within LoadStreamStubs.
+        // The first stream opens successfully, the second fails.
+        // This causes cancel() to be called on all streams.
+        // Without the fix, the successfully opened stream would cause 
close_wait timeout.
+        try {
+            
GetDebugPoint().enableDebugPointForAllBEs("LoadStreamStubs.open.fail_one_stream")
+            def res = sql "insert into test select * from baseall where k1 <= 
3"
+            logger.info(res.toString())
+            // Should fail due to stream open failure, not timeout
+            assertTrue(false, "Expected Exception to be thrown")
+        } catch(Exception e) {
+            logger.info(e.getMessage())
+            // Should fail quickly with stream open error, not timeout
+            assertTrue(!e.getMessage().contains("timed out"),
+                       "Should not timeout waiting for cancelled stream, got: 
" + e.getMessage())
+        } finally {
+            
GetDebugPoint().disableDebugPointForAllBEs("LoadStreamStubs.open.fail_one_stream")
+        }
+
+        sql """ DROP TABLE IF EXISTS `baseall` """
+        sql """ DROP TABLE IF EXISTS `test` """
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to