This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new c6a16acef5d branch-4.0: [fix](move-memtable) fix close wait timeout if
part of streams connection failed #59905 (#60002)
c6a16acef5d is described below
commit c6a16acef5d9c86a6f10d5b58480cd7b9612fa51
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Jan 19 15:56:00 2026 +0800
branch-4.0: [fix](move-memtable) fix close wait timeout if part of streams
connection failed #59905 (#60002)
Cherry-picked from #59905
Co-authored-by: hui lai <[email protected]>
---
be/src/vec/sink/load_stream_stub.cpp | 15 ++++
.../test_cancelled_stream_close_wait.groovy | 94 ++++++++++++++++++++++
2 files changed, 109 insertions(+)
diff --git a/be/src/vec/sink/load_stream_stub.cpp
b/be/src/vec/sink/load_stream_stub.cpp
index 7f6ef126fd7..85f30351362 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -335,6 +335,11 @@ Status LoadStreamStub::close_finish_check(RuntimeState*
state, bool* is_closed)
// we don't need to close wait on non-open streams
return Status::OK();
}
+ // If stream is cancelled (e.g., due to connection failure), treat it as
closed
+ // to avoid waiting indefinitely for a stream that will never respond.
+ if (_is_cancelled.load()) {
+ return check_cancel();
+ }
if (state->get_query_ctx()->is_cancelled()) {
return state->get_query_ctx()->exec_status();
}
@@ -517,6 +522,7 @@ Status
LoadStreamStubs::open(BrpcClientCache<PBackendService_Stub>* client_cache
int64_t idle_timeout_ms, bool enable_profile) {
bool get_schema = true;
auto status = Status::OK();
+ bool first_stream = true;
for (auto& stream : _streams) {
Status st;
if (get_schema) {
@@ -526,6 +532,14 @@ Status
LoadStreamStubs::open(BrpcClientCache<PBackendService_Stub>* client_cache
st = stream->open(client_cache, node_info, txn_id, schema, {},
total_streams,
idle_timeout_ms, enable_profile);
}
+ // Simulate one stream open failure within LoadStreamStubs.
+ // This causes the successfully opened streams to be cancelled,
+ // reproducing the bug where cancelled streams cause close_wait
timeout.
+ DBUG_EXECUTE_IF("LoadStreamStubs.open.fail_one_stream", {
+ if (st.ok() && !first_stream) {
+ st = Status::InternalError("Injected stream open failure");
+ }
+ });
if (st.ok()) {
get_schema = false;
} else {
@@ -533,6 +547,7 @@ Status
LoadStreamStubs::open(BrpcClientCache<PBackendService_Stub>* client_cache
status = st;
// no break here to try get schema from the rest streams
}
+ first_stream = false;
}
// only mark open when all streams open success
_open_success.store(status.ok());
diff --git
a/regression-test/suites/fault_injection_p0/test_cancelled_stream_close_wait.groovy
b/regression-test/suites/fault_injection_p0/test_cancelled_stream_close_wait.groovy
new file mode 100644
index 00000000000..750b689a588
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/test_cancelled_stream_close_wait.groovy
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.apache.doris.regression.util.Http
+
+suite("test_cancelled_stream_close_wait", "nonConcurrent") {
+ if (!isCloudMode()) {
+ sql """ DROP TABLE IF EXISTS `baseall` """
+ sql """ DROP TABLE IF EXISTS `test` """
+ sql """
+ CREATE TABLE IF NOT EXISTS `baseall` (
+ `k0` boolean null comment "",
+ `k1` tinyint(4) null comment "",
+ `k2` smallint(6) null comment "",
+ `k3` int(11) null comment "",
+ `k4` bigint(20) null comment "",
+ `k5` decimal(9, 3) null comment "",
+ `k6` char(5) null comment "",
+ `k10` date null comment "",
+ `k11` datetime null comment "",
+ `k7` varchar(20) null comment "",
+ `k8` double max null comment "",
+ `k9` float sum null comment "",
+ `k12` string replace null comment "",
+ `k13` largeint(40) replace null comment ""
+ ) engine=olap
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" =
"1")
+ """
+ sql """
+ CREATE TABLE IF NOT EXISTS `test` (
+ `k0` boolean null comment "",
+ `k1` tinyint(4) null comment "",
+ `k2` smallint(6) null comment "",
+ `k3` int(11) null comment "",
+ `k4` bigint(20) null comment "",
+ `k5` decimal(9, 3) null comment "",
+ `k6` char(5) null comment "",
+ `k10` date null comment "",
+ `k11` datetime null comment "",
+ `k7` varchar(20) null comment "",
+ `k8` double max null comment "",
+ `k9` float sum null comment "",
+ `k12` string replace_if_not_null null comment "",
+ `k13` largeint(40) replace null comment ""
+ ) engine=olap
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" =
"1")
+ """
+
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ streamLoad {
+ table "baseall"
+ db "regression_test_fault_injection_p0"
+ set 'column_separator', ','
+ file "baseall.txt"
+ }
+
+ // Test: Simulate one stream open failure within LoadStreamStubs.
+ // The first stream opens successfully, the second fails.
+ // This causes cancel() to be called on all streams.
+ // Without the fix, the successfully opened stream would cause
close_wait timeout.
+ try {
+
GetDebugPoint().enableDebugPointForAllBEs("LoadStreamStubs.open.fail_one_stream")
+ def res = sql "insert into test select * from baseall where k1 <=
3"
+ logger.info(res.toString())
+ // Should fail due to stream open failure, not timeout
+ assertTrue(false, "Expected Exception to be thrown")
+ } catch(Exception e) {
+ logger.info(e.getMessage())
+ // Should fail quickly with stream open error, not timeout
+ assertTrue(!e.getMessage().contains("timed out"),
+ "Should not timeout waiting for cancelled stream, got:
" + e.getMessage())
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("LoadStreamStubs.open.fail_one_stream")
+ }
+
+ sql """ DROP TABLE IF EXISTS `baseall` """
+ sql """ DROP TABLE IF EXISTS `test` """
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]