This is an automated email from the ASF dual-hosted git repository.
liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 335ebf6a8e8 [fix](move-memtable) fix close wait timeout if part of
streams connection failed (#59905)
335ebf6a8e8 is described below
commit 335ebf6a8e8f775c9302ab06df9de6d080074794
Author: hui lai <[email protected]>
AuthorDate: Sun Jan 18 21:04:19 2026 +0800
[fix](move-memtable) fix close wait timeout if part of streams connection
failed (#59905)
### What problem does this PR solve?
Bug scenario:
1. LoadStreamStubs has multiple LoadStreamStub objects (stream_per_node
>= 2)
2. First stream opens successfully (_is_open = true)
3. Second stream fails to open
4. LoadStreamStubs::open() calls cancel() on ALL streams (including the
successful one)
5. The first stream now has: _is_open = true, _is_cancelled = true,
_is_closing = false
6. Without the fix, close_finish_check() would mark it as not closed
(stuck in unfinished_streams)
7. This causes close_wait to timeout
---
be/src/vec/sink/load_stream_stub.cpp | 15 ++++
.../test_cancelled_stream_close_wait.groovy | 94 ++++++++++++++++++++++
2 files changed, 109 insertions(+)
diff --git a/be/src/vec/sink/load_stream_stub.cpp
b/be/src/vec/sink/load_stream_stub.cpp
index 4671c9fb78d..faf2b0c1776 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -342,6 +342,11 @@ Status LoadStreamStub::close_finish_check(RuntimeState*
state, bool* is_closed)
// we don't need to close wait on non-open streams
return Status::OK();
}
+ // If stream is cancelled (e.g., due to connection failure), treat it as
closed
+ // to avoid waiting indefinitely for a stream that will never respond.
+ if (_is_cancelled.load()) {
+ return check_cancel();
+ }
if (state->get_query_ctx()->is_cancelled()) {
return state->get_query_ctx()->exec_status();
}
@@ -562,6 +567,7 @@ Status
LoadStreamStubs::open(BrpcClientCache<PBackendService_Stub>* client_cache
int64_t idle_timeout_ms, bool enable_profile) {
bool get_schema = true;
auto status = Status::OK();
+ bool first_stream = true;
for (auto& stream : _streams) {
Status st;
if (get_schema) {
@@ -571,6 +577,14 @@ Status
LoadStreamStubs::open(BrpcClientCache<PBackendService_Stub>* client_cache
st = stream->open(client_cache, node_info, txn_id, schema, {},
total_streams,
idle_timeout_ms, enable_profile);
}
+ // Simulate one stream open failure within LoadStreamStubs.
+ // This causes the successfully opened streams to be cancelled,
+ // reproducing the bug where cancelled streams cause close_wait
timeout.
+ DBUG_EXECUTE_IF("LoadStreamStubs.open.fail_one_stream", {
+ if (st.ok() && !first_stream) {
+ st = Status::InternalError("Injected stream open failure");
+ }
+ });
if (st.ok()) {
get_schema = false;
} else {
@@ -578,6 +592,7 @@ Status
LoadStreamStubs::open(BrpcClientCache<PBackendService_Stub>* client_cache
status = st;
// no break here to try get schema from the rest streams
}
+ first_stream = false;
}
// only mark open when all streams open success
_open_success.store(status.ok());
diff --git
a/regression-test/suites/fault_injection_p0/test_cancelled_stream_close_wait.groovy
b/regression-test/suites/fault_injection_p0/test_cancelled_stream_close_wait.groovy
new file mode 100644
index 00000000000..750b689a588
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/test_cancelled_stream_close_wait.groovy
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.apache.doris.regression.util.Http
+
+suite("test_cancelled_stream_close_wait", "nonConcurrent") {
+ if (!isCloudMode()) {
+ sql """ DROP TABLE IF EXISTS `baseall` """
+ sql """ DROP TABLE IF EXISTS `test` """
+ sql """
+ CREATE TABLE IF NOT EXISTS `baseall` (
+ `k0` boolean null comment "",
+ `k1` tinyint(4) null comment "",
+ `k2` smallint(6) null comment "",
+ `k3` int(11) null comment "",
+ `k4` bigint(20) null comment "",
+ `k5` decimal(9, 3) null comment "",
+ `k6` char(5) null comment "",
+ `k10` date null comment "",
+ `k11` datetime null comment "",
+ `k7` varchar(20) null comment "",
+ `k8` double max null comment "",
+ `k9` float sum null comment "",
+ `k12` string replace null comment "",
+ `k13` largeint(40) replace null comment ""
+ ) engine=olap
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" =
"1")
+ """
+ sql """
+ CREATE TABLE IF NOT EXISTS `test` (
+ `k0` boolean null comment "",
+ `k1` tinyint(4) null comment "",
+ `k2` smallint(6) null comment "",
+ `k3` int(11) null comment "",
+ `k4` bigint(20) null comment "",
+ `k5` decimal(9, 3) null comment "",
+ `k6` char(5) null comment "",
+ `k10` date null comment "",
+ `k11` datetime null comment "",
+ `k7` varchar(20) null comment "",
+ `k8` double max null comment "",
+ `k9` float sum null comment "",
+ `k12` string replace_if_not_null null comment "",
+ `k13` largeint(40) replace null comment ""
+ ) engine=olap
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" =
"1")
+ """
+
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ streamLoad {
+ table "baseall"
+ db "regression_test_fault_injection_p0"
+ set 'column_separator', ','
+ file "baseall.txt"
+ }
+
+ // Test: Simulate one stream open failure within LoadStreamStubs.
+ // The first stream opens successfully, the second fails.
+ // This causes cancel() to be called on all streams.
+ // Without the fix, the successfully opened stream would cause
close_wait timeout.
+ try {
+
GetDebugPoint().enableDebugPointForAllBEs("LoadStreamStubs.open.fail_one_stream")
+ def res = sql "insert into test select * from baseall where k1 <=
3"
+ logger.info(res.toString())
+ // Should fail due to stream open failure, not timeout
+ assertTrue(false, "Expected Exception to be thrown")
+ } catch(Exception e) {
+ logger.info(e.getMessage())
+ // Should fail quickly with stream open error, not timeout
+ assertTrue(!e.getMessage().contains("timed out"),
+ "Should not timeout waiting for cancelled stream, got:
" + e.getMessage())
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("LoadStreamStubs.open.fail_one_stream")
+ }
+
+ sql """ DROP TABLE IF EXISTS `baseall` """
+ sql """ DROP TABLE IF EXISTS `test` """
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]