This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new fa2790d5caa branch-3.0: [fix](group commit)Group Commit with stream 
load mode should wait schema change done #49854 (#50117)
fa2790d5caa is described below

commit fa2790d5caad376958df68df247b09840af2ac26
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Apr 18 10:37:59 2025 +0800

    branch-3.0: [fix](group commit)Group Commit with stream load mode should 
wait schema change done #49854 (#50117)
    
    Cherry-picked from #49854
    
    Co-authored-by: huanghaibin <[email protected]>
---
 .../org/apache/doris/load/StreamLoadHandler.java   |  18 +++-
 .../GroupCommitTableValuedFunction.java            |   6 ++
 .../test_schema_change_with_group_commit.groovy    | 112 +++++++++++++++++++++
 3 files changed, 131 insertions(+), 5 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadHandler.java
index 745eb7f913e..61e084386d0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadHandler.java
@@ -27,12 +27,14 @@ import org.apache.doris.cloud.catalog.CloudEnv;
 import org.apache.doris.cloud.planner.CloudStreamLoadPlanner;
 import org.apache.doris.cloud.system.CloudSystemInfoService;
 import org.apache.doris.cluster.ClusterNamespace;
+import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.AuthenticationException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.LoadException;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.load.routineload.RoutineLoadJob;
+import org.apache.doris.planner.GroupCommitPlanner;
 import org.apache.doris.planner.StreamLoadPlanner;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.service.ExecuteEnv;
@@ -177,11 +179,17 @@ public class StreamLoadHandler {
 
         for (String tableName : tableNames) {
             Table table = db.getTableOrMetaException(tableName, 
TableType.OLAP);
-            if (!((OlapTable) 
table).getTableProperty().getUseSchemaLightChange()
-                    && (request.getGroupCommitMode() != null
-                    && !request.getGroupCommitMode().equals("off_mode"))) {
-                throw new UserException(
-                        "table light_schema_change is false, can't do stream 
load with group commit mode");
+            if (request.getGroupCommitMode() != null
+                    && !request.getGroupCommitMode().equals("off_mode")) {
+                if (!((OlapTable) 
table).getTableProperty().getUseSchemaLightChange()) {
+                    throw new UserException(
+                            "table light_schema_change is false, can't do 
stream load with group commit mode");
+                }
+                if 
(Env.getCurrentEnv().getGroupCommitManager().isBlock(table.getId())) {
+                    String msg = "insert table " + table.getId() + 
GroupCommitPlanner.SCHEMA_CHANGE;
+                    LOG.info(msg);
+                    throw new AnalysisException(msg);
+                }
             }
             tables.add((OlapTable) table);
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java
index 324e17d4f24..a387902e94a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java
@@ -26,6 +26,7 @@ import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
+import org.apache.doris.planner.GroupCommitPlanner;
 import org.apache.doris.planner.GroupCommitScanNode;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.planner.ScanNode;
@@ -69,6 +70,11 @@ public class GroupCommitTableValuedFunction extends 
ExternalFileTableValuedFunct
             throw new AnalysisException("Only support OLAP table, but table 
type of table_id "
                     + tableId + " is " + table.getType());
         }
+        if (Env.getCurrentEnv().getGroupCommitManager().isBlock(tableId)) {
+            String msg = "insert table " + tableId + 
GroupCommitPlanner.SCHEMA_CHANGE;
+            LOG.info(msg);
+            throw new AnalysisException(msg);
+        }
         if (Config.group_commit_timeout_multipler > 0) {
             int timeoutS = Math.max((int) (((OlapTable) 
table).getGroupCommitIntervalMs() / 1000.0
                     * Config.group_commit_timeout_multipler), 600);
diff --git 
a/regression-test/suites/schema_change_p0/test_schema_change_with_group_commit.groovy
 
b/regression-test/suites/schema_change_p0/test_schema_change_with_group_commit.groovy
new file mode 100644
index 00000000000..09ecb9ccfa5
--- /dev/null
+++ 
b/regression-test/suites/schema_change_p0/test_schema_change_with_group_commit.groovy
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.concurrent.TimeUnit
+import org.awaitility.Awaitility
+import org.apache.doris.regression.suite.ClusterOptions
+
+suite("test_schema_change_with_group_commit", "docker") {
+    def options = new ClusterOptions()
+    options.feConfigs += [
+            'wait_internal_group_commit_finish=true',
+            'group_commit_interval_ms_default_value=2'
+    ]
+    options.beConfigs += [
+            'group_commit_replay_wal_retry_num=2',
+            'group_commit_replay_wal_retry_interval_seconds=1',
+            'group_commit_wait_replay_wal_finish=true',
+            'wait_internal_group_commit_finish=true'
+    ]
+    options.setFeNum(1)
+    options.setBeNum(1)
+    options.cloudMode = true
+    docker(options) {
+        def tableName3 = "test_schema_change_with_group_commit"
+
+        def getJobState = { tableName ->
+            def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE 
IndexName='${tableName}' ORDER BY createtime DESC LIMIT 1 """
+            return jobStateResult[0][9]
+        }
+        def execStreamLoad = {
+            streamLoad {
+                table "${tableName3}"
+
+                set 'column_separator', ','
+
+                file 'all_types.csv'
+                time 10000 // limit inflight 10s
+
+                check { result, exception, startTime, endTime ->
+                    if (exception != null) {
+                        throw exception
+                    }
+                    log.info("Stream load result: ${result}".toString())
+                    def json = parseJson(result)
+                    if (json.Status.toLowerCase() == "success") {
+                        assertEquals(2500, json.NumberTotalRows)
+                        assertEquals(0, json.NumberFilteredRows)
+                    } else {
+                        assertTrue(json.Message.contains("blocked on schema 
change"))
+                    }
+                }
+            }
+        }
+
+        sql """ DROP TABLE IF EXISTS ${tableName3} """
+
+        sql """
+    CREATE TABLE IF NOT EXISTS ${tableName3} (
+      `k1` int(11) NULL,
+      `k2` tinyint(4) NULL,
+      `k3` smallint(6) NULL,
+      `k4` int(30) NULL,
+      `k5` largeint(40) NULL,
+      `k6` float NULL,
+      `k7` double NULL,
+      `k8` decimal(9, 0) NULL,
+      `k9` char(10) NULL,
+      `k10` varchar(1024) NULL,
+      `k11` text NULL,
+      `k12` date NULL,
+      `k13` datetime NULL
+    ) ENGINE=OLAP
+    unique KEY(k1, k2, k3)
+    DISTRIBUTED BY HASH(`k1`) BUCKETS 1
+    PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1",
+        "enable_unique_key_merge_on_write" = "true"
+    );
+    """
+
+        execStreamLoad()
+
+        sql """ alter table ${tableName3} modify column k4 string NULL"""
+
+        Awaitility.await().atMost(30, TimeUnit.SECONDS).pollDelay(10, 
TimeUnit.MILLISECONDS).pollInterval(10, TimeUnit.MILLISECONDS).until(
+                {
+                    String res = getJobState(tableName3)
+                    if (res == "FINISHED" || res == "CANCELLED") {
+                        assertEquals("FINISHED", res)
+                        return true
+                    }
+                    execStreamLoad()
+                    return false
+                }
+        )
+    }
+
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to