This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new bbed1548a73 branch-3.1: [fix](load)fix VNodeChannel close_wait hang
#58024 (#58583)
bbed1548a73 is described below
commit bbed1548a73642cb289df5759b013ac344a99a0b
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Dec 2 11:27:32 2025 +0800
branch-3.1: [fix](load)fix VNodeChannel close_wait hang #58024 (#58583)
Cherry-picked from #58024
Co-authored-by: koarz <[email protected]>
---
be/src/vec/sink/writer/vtablet_writer.cpp | 10 ++
.../test_stream_load_close_wait_hang.groovy | 140 +++++++++++++++++++++
2 files changed, 150 insertions(+)
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp
b/be/src/vec/sink/writer/vtablet_writer.cpp
index 2b5cb48dc18..b0539551f70 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -911,7 +911,16 @@ void VNodeChannel::try_send_pending_block(RuntimeState*
state) {
PSlaveTabletNodes slave_tablet_nodes;
for (auto node_id : _slave_tablet_node.second) {
const auto* node =
_parent->_nodes_info->find_node(node_id);
+
DBUG_EXECUTE_IF("VNodeChannel.try_send_pending_block.slave_node_not_found", {
+ LOG(WARNING) << "trigger "
+
"VNodeChannel.try_send_pending_block.slave_node_not_found "
+ "debug point will set node to nullptr";
+ node = nullptr;
+ });
if (node == nullptr) {
+ LOG(WARNING) << "slave node not found, node_id=" <<
node_id;
+ cancel(fmt::format("slave node not found, node_id={}",
node_id));
+ _send_block_callback->clear_in_flight();
return;
}
PNodeInfo* pnode = slave_tablet_nodes.add_slave_nodes();
@@ -955,6 +964,7 @@ void VNodeChannel::try_send_pending_block(RuntimeState*
state) {
if (!status.ok()) {
LOG(WARNING) << "failed to get ip from host " <<
_node_info.host << ": "
<< status.to_string();
+ cancel(fmt::format("failed to get ip from host {}",
_node_info.host));
_send_block_callback->clear_in_flight();
return;
}
diff --git
a/regression-test/suites/load_p0/test_stream_load_close_wait_hang.groovy
b/regression-test/suites/load_p0/test_stream_load_close_wait_hang.groovy
new file mode 100644
index 00000000000..4d704cb60b2
--- /dev/null
+++ b/regression-test/suites/load_p0/test_stream_load_close_wait_hang.groovy
@@ -0,0 +1,140 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+suite("test_stream_load_close_wait_hang", "docker") {
+ def options = new ClusterOptions()
+ options.feNum = 1
+ options.beNum = 3
+ options.cloudMode = false
+
+ // Enable single replica load in FE BE to trigger write_single_replica mode
+ options.feConfigs += [
+ 'enable_single_replica_load=true'
+ ]
+
+ options.beConfigs += [
+ 'enable_debug_points=true',
+ 'enable_single_replica_load=true'
+ ]
+
+ docker(options) {
+ def tableName = "test_close_wait_hang_table"
+
+ // Create table with 3 replicas and RANDOM distribution
+ // - replication_num=3: triggers write_single_replica mode (1 master +
2 slaves)
+ // - RANDOM distribution: required by load_to_single_tablet parameter
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE ${tableName} (
+ id INT,
+ name VARCHAR(100)
+ )
+ DUPLICATE KEY(id)
+ DISTRIBUTED BY RANDOM BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "3"
+ )
+ """
+
+ // Prepare test data - write to local file
+ def dataFile = new
File("/tmp/test_stream_load_close_wait_hang_data.csv")
+ dataFile.withWriter { writer ->
+ for (int i = 1; i <= 5000; i++) {
+ writer.writeLine("${i}\ttest${i}")
+ }
+ }
+ log.info("Test data written to ${dataFile.absolutePath}")
+
+ // Enable debug point to simulate "slave node not found" scenario
+ // Effect: Sets node = nullptr to trigger the check
+ // Expected behavior: Code should call cancel() and clear_in_flight()
before returning
+ try {
+
GetDebugPoint().enableDebugPointForAllBEs("VNodeChannel.try_send_pending_block.slave_node_not_found")
+ log.info("Debug point enabled: simulating slave node not found
scenario")
+ } catch (Exception e) {
+ log.warn("Failed to enable debug point: ${e.message}")
+ }
+
+ def testStartTime = System.currentTimeMillis()
+ def timed_out = false
+ def load_exception = null
+
+ // Execute stream load in separate thread to detect hangs
+ // If cancel() is NOT called when slave node is missing, close_wait()
will hang forever
+ def load_thread = Thread.start {
+ try {
+ streamLoad {
+ table "${tableName}"
+
+ set 'column_separator', '\t'
+ set 'format', 'csv'
+
+ set 'load_to_single_tablet', 'true'
+
+ time 60000
+
+ file dataFile.absolutePath
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ load_exception = exception
+ log.error("Stream load failed with exception:
${exception.message}")
+ }
+ }
+ }
+ } catch (Exception e) {
+ load_exception = e
+ log.error("Stream load threw exception: ${e.message}")
+ }
+ }
+
+ // Check if stream load completes within timeout
+ // Expected: Should fail quickly (within 65s) because cancel() is
called
+ // Bug symptom: Hangs forever if cancel() is not called
+ load_thread.join(65000)
+
+ if (load_thread.isAlive()) {
+ timed_out = true
+ log.error("Stream load thread hung for too many seconds!")
+
+ load_thread.interrupt()
+ load_thread.join(5000)
+ }
+
+ def elapsed = System.currentTimeMillis() - testStartTime
+ log.info("Stream load completed in ${elapsed}ms")
+
+ try {
+
GetDebugPoint().disableDebugPointForAllBEs("VNodeChannel.try_send_pending_block.slave_node_not_found")
+ } catch (Exception e) {
+ log.warn("Failed to disable debug point: ${e.message}")
+ }
+
+ if (timed_out) {
+ throw new Exception("Stream load time out")
+ }
+
+ try {
+ dataFile.delete()
+ log.info("Cleaned up temp file: ${dataFile.absolutePath}")
+ } catch (Exception e) {
+ log.warn("Failed to delete temp file: ${e.message}")
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]