This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new bef86ea6cdf branch-3.1: [fix](group commit) replay wal set cloud 
cluster name #51517 (#52011)
bef86ea6cdf is described below

commit bef86ea6cdfc1231d16264d6907bc1c02ad658b0
Author: meiyi <[email protected]>
AuthorDate: Fri Jun 20 14:56:09 2025 +0800

    branch-3.1: [fix](group commit) replay wal set cloud cluster name #51517 
(#52011)
    
    Cherry-pick from #51517
---
 .../org/apache/doris/load/StreamLoadHandler.java   |  2 +-
 .../ExternalFileTableValuedFunction.java           |  2 +-
 .../multi_cluster/test_group_commit_replay_wal.csv |  5 ++
 .../test_group_commit_replay_wal.groovy            | 95 ++++++++++++++++++++++
 .../insert_p0/insert_group_commit_into.groovy      |  2 +-
 5 files changed, 103 insertions(+), 3 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadHandler.java
index 61e084386d0..0e19ac55e11 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadHandler.java
@@ -139,7 +139,7 @@ public class StreamLoadHandler {
             Preconditions.checkState(currentUser.size() == 1);
             ctx.setCurrentUserIdentity(currentUser.get(0));
         }
-        if (request.isSetAuthCode() && request.isSetBackendId()) {
+        if ((request.isSetToken() || request.isSetAuthCode()) && 
request.isSetBackendId()) {
             long backendId = request.getBackendId();
             Backend backend = Env.getCurrentSystemInfo().getBackend(backendId);
             Preconditions.checkNotNull(backend);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
index bda5aa71342..5a15ccd21f5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
@@ -91,7 +91,7 @@ import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 
 /**
- * ExternalFileTableValuedFunction is used for S3/HDFS/LOCAL 
table-valued-function
+ * ExternalFileTableValuedFunction is used for 
S3/HDFS/LOCAL/HTTP_STREAM/GROUP_COMMIT table-valued-function
  */
 public abstract class ExternalFileTableValuedFunction extends 
TableValuedFunctionIf {
     public static final Logger LOG = 
LogManager.getLogger(ExternalFileTableValuedFunction.class);
diff --git 
a/regression-test/data/cloud_p0/multi_cluster/test_group_commit_replay_wal.csv 
b/regression-test/data/cloud_p0/multi_cluster/test_group_commit_replay_wal.csv
new file mode 100644
index 00000000000..6ab7bd6bcdf
--- /dev/null
+++ 
b/regression-test/data/cloud_p0/multi_cluster/test_group_commit_replay_wal.csv
@@ -0,0 +1,5 @@
+1,1
+2,2
+3,3
+4,4
+5,5
\ No newline at end of file
diff --git 
a/regression-test/suites/cloud_p0/multi_cluster/test_group_commit_replay_wal.groovy
 
b/regression-test/suites/cloud_p0/multi_cluster/test_group_commit_replay_wal.groovy
new file mode 100644
index 00000000000..7188d8c49b2
--- /dev/null
+++ 
b/regression-test/suites/cloud_p0/multi_cluster/test_group_commit_replay_wal.groovy
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.awaitility.Awaitility
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite('test_group_commit_replay_wal', 'multi_cluster,docker') {
+    def options = new ClusterOptions()
+    options.cloudMode = true
+    options.setFeNum(1)
+    options.setBeNum(1)
+    options.feConfigs += [
+        'cloud_cluster_check_interval_second=1',
+        'enable_workload_group=false',
+        'sys_log_verbose_modules=org.apache.doris.qe.ConnectContext'
+    ]
+    options.beConfigs += [
+        'enable_debug_points=true'
+    ]
+
+    docker(options) {
+        // add cluster1
+        cluster.addBackend(1, "cpuster1")
+        def ret = sql_return_maparray """show clusters"""
+        logger.info("clusters: " + ret)
+        def cluster0 = ret.stream().filter(cluster -> cluster.is_current == 
"TRUE").findFirst().orElse(null)
+        def cluster0Name = cluster0['cluster'] as String
+        logger.info("current cluster: " + cluster0Name)
+        def cluster1 = ret.stream().filter(cluster -> cluster.cluster == 
"cpuster1").findFirst().orElse(null)
+        assertTrue(cluster1 != null)
+        sql """set property 'DEFAULT_CLOUD_CLUSTER' = '$cluster0Name'"""
+
+        def backends = sql_return_maparray """show backends"""
+        logger.info("backends: " + backends)
+        long cluster1BeId = 0
+        def cluster1BeHost = ""
+        int cluster1BePort = 0
+        for (final def backend in backends) {
+            if (backend['Tag'].toString().contains("cpuster1")) {
+                cluster1BeId = backend['BackendId'] as long
+                cluster1BeHost = backend['Host']
+                cluster1BePort = backend['HttpPort'] as int
+            }
+        }
+        logger.info("cluster1BeId: " + cluster1BeId + ", cluster1BeHost: " + 
cluster1BeHost + ", cluster1BePort: " + cluster1BePort)
+        assertTrue(cluster1BeId > 0)
+
+        def testTable = "test_group_commit_replay_wal"
+        sql """
+            create table ${testTable} (`k` int, `v` int)
+            UNIQUE KEY(`k`)
+            DISTRIBUTED BY HASH(`k`) BUCKETS 1
+            PROPERTIES (
+                "group_commit_interval_ms"="100"
+            );
+        """
+
+        
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue._finish_group_commit_load.load_error")
+        streamLoad {
+            table "${testTable}"
+            set 'column_separator', ','
+            set 'group_commit', 'async_mode'
+            unset 'label'
+            file 'test_group_commit_replay_wal.csv'
+            time 10000
+            directToBe cluster1BeHost, cluster1BePort
+        }
+        def rowCount = 0
+        for (int i = 0; i < 30; i++) {
+            def result = sql "select count(*) from ${testTable}"
+            logger.info("rowCount: ${result}")
+            rowCount = result[0][0]
+            if (rowCount == 5) {
+                break
+            }
+            Thread.sleep(500)
+        }
+        assertEquals(5, rowCount)
+    }
+}
diff --git a/regression-test/suites/insert_p0/insert_group_commit_into.groovy 
b/regression-test/suites/insert_p0/insert_group_commit_into.groovy
index a64d66f7f87..bade0c8f279 100644
--- a/regression-test/suites/insert_p0/insert_group_commit_into.groovy
+++ b/regression-test/suites/insert_p0/insert_group_commit_into.groovy
@@ -66,7 +66,7 @@ suite("insert_group_commit_into") {
             } catch (Exception e) {
                 logger.warn("group_commit_insert failed, retry: " + retry + ", 
error: " + e.getMessage())
                 retry++
-                if ((e.getMessage().contains("is blocked on schema change") || 
e.getMessage().contains("can not get a block queue")) && retry < 20) {
+                if ((e.getMessage().contains("is blocked on schema change") || 
e.getMessage().contains("can not get a block queue") || 
e.getMessage().contains("schema version not match")) && retry < 20) {
                     sleep(1500)
                     continue
                 } else {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to