This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new bef86ea6cdf branch-3.1: [fix](group commit) replay wal set cloud
cluster name #51517 (#52011)
bef86ea6cdf is described below
commit bef86ea6cdfc1231d16264d6907bc1c02ad658b0
Author: meiyi <[email protected]>
AuthorDate: Fri Jun 20 14:56:09 2025 +0800
branch-3.1: [fix](group commit) replay wal set cloud cluster name #51517
(#52011)
Cherry-pick from #51517
---
.../org/apache/doris/load/StreamLoadHandler.java | 2 +-
.../ExternalFileTableValuedFunction.java | 2 +-
.../multi_cluster/test_group_commit_replay_wal.csv | 5 ++
.../test_group_commit_replay_wal.groovy | 95 ++++++++++++++++++++++
.../insert_p0/insert_group_commit_into.groovy | 2 +-
5 files changed, 103 insertions(+), 3 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadHandler.java
index 61e084386d0..0e19ac55e11 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadHandler.java
@@ -139,7 +139,7 @@ public class StreamLoadHandler {
Preconditions.checkState(currentUser.size() == 1);
ctx.setCurrentUserIdentity(currentUser.get(0));
}
- if (request.isSetAuthCode() && request.isSetBackendId()) {
+ if ((request.isSetToken() || request.isSetAuthCode()) &&
request.isSetBackendId()) {
long backendId = request.getBackendId();
Backend backend = Env.getCurrentSystemInfo().getBackend(backendId);
Preconditions.checkNotNull(backend);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
index bda5aa71342..5a15ccd21f5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
@@ -91,7 +91,7 @@ import java.util.concurrent.Future;
import java.util.stream.Collectors;
/**
- * ExternalFileTableValuedFunction is used for S3/HDFS/LOCAL
table-valued-function
+ * ExternalFileTableValuedFunction is used for
S3/HDFS/LOCAL/HTTP_STREAM/GROUP_COMMIT table-valued-function
*/
public abstract class ExternalFileTableValuedFunction extends
TableValuedFunctionIf {
public static final Logger LOG =
LogManager.getLogger(ExternalFileTableValuedFunction.class);
diff --git
a/regression-test/data/cloud_p0/multi_cluster/test_group_commit_replay_wal.csv
b/regression-test/data/cloud_p0/multi_cluster/test_group_commit_replay_wal.csv
new file mode 100644
index 00000000000..6ab7bd6bcdf
--- /dev/null
+++
b/regression-test/data/cloud_p0/multi_cluster/test_group_commit_replay_wal.csv
@@ -0,0 +1,5 @@
+1,1
+2,2
+3,3
+4,4
+5,5
\ No newline at end of file
diff --git
a/regression-test/suites/cloud_p0/multi_cluster/test_group_commit_replay_wal.groovy
b/regression-test/suites/cloud_p0/multi_cluster/test_group_commit_replay_wal.groovy
new file mode 100644
index 00000000000..7188d8c49b2
--- /dev/null
+++
b/regression-test/suites/cloud_p0/multi_cluster/test_group_commit_replay_wal.groovy
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.awaitility.Awaitility
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite('test_group_commit_replay_wal', 'multi_cluster,docker') {
+ def options = new ClusterOptions()
+ options.cloudMode = true
+ options.setFeNum(1)
+ options.setBeNum(1)
+ options.feConfigs += [
+ 'cloud_cluster_check_interval_second=1',
+ 'enable_workload_group=false',
+ 'sys_log_verbose_modules=org.apache.doris.qe.ConnectContext'
+ ]
+ options.beConfigs += [
+ 'enable_debug_points=true'
+ ]
+
+ docker(options) {
+ // add cluster1
+ cluster.addBackend(1, "cpuster1")
+ def ret = sql_return_maparray """show clusters"""
+ logger.info("clusters: " + ret)
+ def cluster0 = ret.stream().filter(cluster -> cluster.is_current ==
"TRUE").findFirst().orElse(null)
+ def cluster0Name = cluster0['cluster'] as String
+ logger.info("current cluster: " + cluster0Name)
+ def cluster1 = ret.stream().filter(cluster -> cluster.cluster ==
"cpuster1").findFirst().orElse(null)
+ assertTrue(cluster1 != null)
+ sql """set property 'DEFAULT_CLOUD_CLUSTER' = '$cluster0Name'"""
+
+ def backends = sql_return_maparray """show backends"""
+ logger.info("backends: " + backends)
+ long cluster1BeId = 0
+ def cluster1BeHost = ""
+ int cluster1BePort = 0
+ for (final def backend in backends) {
+ if (backend['Tag'].toString().contains("cpuster1")) {
+ cluster1BeId = backend['BackendId'] as long
+ cluster1BeHost = backend['Host']
+ cluster1BePort = backend['HttpPort'] as int
+ }
+ }
+ logger.info("cluster1BeId: " + cluster1BeId + ", cluster1BeHost: " +
cluster1BeHost + ", cluster1BePort: " + cluster1BePort)
+ assertTrue(cluster1BeId > 0)
+
+ def testTable = "test_group_commit_replay_wal"
+ sql """
+ create table ${testTable} (`k` int, `v` int)
+ UNIQUE KEY(`k`)
+ DISTRIBUTED BY HASH(`k`) BUCKETS 1
+ PROPERTIES (
+ "group_commit_interval_ms"="100"
+ );
+ """
+
+
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.load_error")
+ streamLoad {
+ table "${testTable}"
+ set 'column_separator', ','
+ set 'group_commit', 'async_mode'
+ unset 'label'
+ file 'test_group_commit_replay_wal.csv'
+ time 10000
+ directToBe cluster1BeHost, cluster1BePort
+ }
+ def rowCount = 0
+ for (int i = 0; i < 30; i++) {
+ def result = sql "select count(*) from ${testTable}"
+ logger.info("rowCount: ${result}")
+ rowCount = result[0][0]
+ if (rowCount == 5) {
+ break
+ }
+ Thread.sleep(500)
+ }
+ assertEquals(5, rowCount)
+ }
+}
diff --git a/regression-test/suites/insert_p0/insert_group_commit_into.groovy
b/regression-test/suites/insert_p0/insert_group_commit_into.groovy
index a64d66f7f87..bade0c8f279 100644
--- a/regression-test/suites/insert_p0/insert_group_commit_into.groovy
+++ b/regression-test/suites/insert_p0/insert_group_commit_into.groovy
@@ -66,7 +66,7 @@ suite("insert_group_commit_into") {
} catch (Exception e) {
logger.warn("group_commit_insert failed, retry: " + retry + ",
error: " + e.getMessage())
retry++
- if ((e.getMessage().contains("is blocked on schema change") ||
e.getMessage().contains("can not get a block queue")) && retry < 20) {
+ if ((e.getMessage().contains("is blocked on schema change") ||
e.getMessage().contains("can not get a block queue") ||
e.getMessage().contains("schema version not match")) && retry < 20) {
sleep(1500)
continue
} else {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]