This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new bbed1548a73 branch-3.1: [fix](load)fix VNodeChannel close_wait hang 
#58024 (#58583)
bbed1548a73 is described below

commit bbed1548a73642cb289df5759b013ac344a99a0b
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Dec 2 11:27:32 2025 +0800

    branch-3.1: [fix](load)fix VNodeChannel close_wait hang #58024 (#58583)
    
    Cherry-picked from #58024
    
    Co-authored-by: koarz <[email protected]>
---
 be/src/vec/sink/writer/vtablet_writer.cpp          |  10 ++
 .../test_stream_load_close_wait_hang.groovy        | 140 +++++++++++++++++++++
 2 files changed, 150 insertions(+)

diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp 
b/be/src/vec/sink/writer/vtablet_writer.cpp
index 2b5cb48dc18..b0539551f70 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -911,7 +911,16 @@ void VNodeChannel::try_send_pending_block(RuntimeState* 
state) {
                 PSlaveTabletNodes slave_tablet_nodes;
                 for (auto node_id : _slave_tablet_node.second) {
                     const auto* node = 
_parent->_nodes_info->find_node(node_id);
+                    
DBUG_EXECUTE_IF("VNodeChannel.try_send_pending_block.slave_node_not_found", {
+                        LOG(WARNING) << "trigger "
+                                        
"VNodeChannel.try_send_pending_block.slave_node_not_found "
+                                        "debug point will set node to nullptr";
+                        node = nullptr;
+                    });
                     if (node == nullptr) {
+                        LOG(WARNING) << "slave node not found, node_id=" << 
node_id;
+                        cancel(fmt::format("slave node not found, node_id={}", 
node_id));
+                        _send_block_callback->clear_in_flight();
                         return;
                     }
                     PNodeInfo* pnode = slave_tablet_nodes.add_slave_nodes();
@@ -955,6 +964,7 @@ void VNodeChannel::try_send_pending_block(RuntimeState* 
state) {
             if (!status.ok()) {
                 LOG(WARNING) << "failed to get ip from host " << 
_node_info.host << ": "
                              << status.to_string();
+                cancel(fmt::format("failed to get ip from host {}", 
_node_info.host));
                 _send_block_callback->clear_in_flight();
                 return;
             }
diff --git 
a/regression-test/suites/load_p0/test_stream_load_close_wait_hang.groovy 
b/regression-test/suites/load_p0/test_stream_load_close_wait_hang.groovy
new file mode 100644
index 00000000000..4d704cb60b2
--- /dev/null
+++ b/regression-test/suites/load_p0/test_stream_load_close_wait_hang.groovy
@@ -0,0 +1,140 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+suite("test_stream_load_close_wait_hang", "docker") {
+    def options = new ClusterOptions()
+    options.feNum = 1
+    options.beNum = 3
+    options.cloudMode = false
+
+    // Enable single replica load in FE BE to trigger write_single_replica mode
+    options.feConfigs += [
+        'enable_single_replica_load=true'
+    ]
+
+    options.beConfigs += [
+        'enable_debug_points=true',
+        'enable_single_replica_load=true'
+    ]
+
+    docker(options) {
+        def tableName = "test_close_wait_hang_table"
+
+        // Create table with 3 replicas and RANDOM distribution
+        // - replication_num=3: triggers write_single_replica mode (1 master + 
2 slaves)
+        // - RANDOM distribution: required by load_to_single_tablet parameter
+        sql """ DROP TABLE IF EXISTS ${tableName} """
+        sql """
+            CREATE TABLE ${tableName} (
+                id INT,
+                name VARCHAR(100)
+            )
+            DUPLICATE KEY(id)
+            DISTRIBUTED BY RANDOM BUCKETS 1
+            PROPERTIES (
+                "replication_num" = "3"
+            )
+        """
+
+        // Prepare test data - write to local file
+        def dataFile = new 
File("/tmp/test_stream_load_close_wait_hang_data.csv")
+        dataFile.withWriter { writer ->
+            for (int i = 1; i <= 5000; i++) {
+                writer.writeLine("${i}\ttest${i}")
+            }
+        }
+        log.info("Test data written to ${dataFile.absolutePath}")
+
+        // Enable debug point to simulate "slave node not found" scenario
+        // Effect: Sets node = nullptr to trigger the check
+        // Expected behavior: Code should call cancel() and clear_in_flight() 
before returning
+        try {
+            
GetDebugPoint().enableDebugPointForAllBEs("VNodeChannel.try_send_pending_block.slave_node_not_found")
+            log.info("Debug point enabled: simulating slave node not found 
scenario")
+        } catch (Exception e) {
+            log.warn("Failed to enable debug point: ${e.message}")
+        }
+
+        def testStartTime = System.currentTimeMillis()
+        def timed_out = false
+        def load_exception = null
+
+        // Execute stream load in separate thread to detect hangs
+        // If cancel() is NOT called when slave node is missing, close_wait() 
will hang forever
+        def load_thread = Thread.start {
+            try {
+                streamLoad {
+                    table "${tableName}"
+
+                    set 'column_separator', '\t'
+                    set 'format', 'csv'
+
+                    set 'load_to_single_tablet', 'true'
+
+                    time 60000
+
+                    file dataFile.absolutePath
+
+                    check { result, exception, startTime, endTime ->
+                        if (exception != null) {
+                            load_exception = exception
+                            log.error("Stream load failed with exception: 
${exception.message}")
+                        }
+                    }
+                }
+            } catch (Exception e) {
+                load_exception = e
+                log.error("Stream load threw exception: ${e.message}")
+            }
+        }
+
+        // Check if stream load completes within timeout
+        // Expected: Should fail quickly (within 65s) because cancel() is 
called
+        // Bug symptom: Hangs forever if cancel() is not called
+        load_thread.join(65000)
+
+        if (load_thread.isAlive()) {
+            timed_out = true
+            log.error("Stream load thread hung for too many seconds!")
+
+            load_thread.interrupt()
+            load_thread.join(5000)
+        }
+
+        def elapsed = System.currentTimeMillis() - testStartTime
+        log.info("Stream load completed in ${elapsed}ms")
+
+        try {
+            
GetDebugPoint().disableDebugPointForAllBEs("VNodeChannel.try_send_pending_block.slave_node_not_found")
+        } catch (Exception e) {
+            log.warn("Failed to disable debug point: ${e.message}")
+        }
+
+        if (timed_out) {
+            throw new Exception("Stream load time out")
+        }
+
+        try {
+            dataFile.delete()
+            log.info("Cleaned up temp file: ${dataFile.absolutePath}")
+        } catch (Exception e) {
+            log.warn("Failed to delete temp file: ${e.message}")
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to