This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new fa2790d5caa branch-3.0: [fix](group commit)Group Commit with stream
load mode should wait schema change done #49854 (#50117)
fa2790d5caa is described below
commit fa2790d5caad376958df68df247b09840af2ac26
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Apr 18 10:37:59 2025 +0800
branch-3.0: [fix](group commit)Group Commit with stream load mode should
wait schema change done #49854 (#50117)
Cherry-picked from #49854
Co-authored-by: huanghaibin <[email protected]>
---
.../org/apache/doris/load/StreamLoadHandler.java | 18 +++-
.../GroupCommitTableValuedFunction.java | 6 ++
.../test_schema_change_with_group_commit.groovy | 112 +++++++++++++++++++++
3 files changed, 131 insertions(+), 5 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadHandler.java
index 745eb7f913e..61e084386d0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadHandler.java
@@ -27,12 +27,14 @@ import org.apache.doris.cloud.catalog.CloudEnv;
import org.apache.doris.cloud.planner.CloudStreamLoadPlanner;
import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.cluster.ClusterNamespace;
+import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.AuthenticationException;
import org.apache.doris.common.Config;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.load.routineload.RoutineLoadJob;
+import org.apache.doris.planner.GroupCommitPlanner;
import org.apache.doris.planner.StreamLoadPlanner;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.service.ExecuteEnv;
@@ -177,11 +179,17 @@ public class StreamLoadHandler {
for (String tableName : tableNames) {
Table table = db.getTableOrMetaException(tableName,
TableType.OLAP);
- if (!((OlapTable)
table).getTableProperty().getUseSchemaLightChange()
- && (request.getGroupCommitMode() != null
- && !request.getGroupCommitMode().equals("off_mode"))) {
- throw new UserException(
- "table light_schema_change is false, can't do stream
load with group commit mode");
+ if (request.getGroupCommitMode() != null
+ && !request.getGroupCommitMode().equals("off_mode")) {
+ if (!((OlapTable)
table).getTableProperty().getUseSchemaLightChange()) {
+ throw new UserException(
+ "table light_schema_change is false, can't do
stream load with group commit mode");
+ }
+ if
(Env.getCurrentEnv().getGroupCommitManager().isBlock(table.getId())) {
+ String msg = "insert table " + table.getId() +
GroupCommitPlanner.SCHEMA_CHANGE;
+ LOG.info(msg);
+ throw new AnalysisException(msg);
+ }
}
tables.add((OlapTable) table);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java
index 324e17d4f24..a387902e94a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java
@@ -26,6 +26,7 @@ import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
+import org.apache.doris.planner.GroupCommitPlanner;
import org.apache.doris.planner.GroupCommitScanNode;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanNode;
@@ -69,6 +70,11 @@ public class GroupCommitTableValuedFunction extends
ExternalFileTableValuedFunct
throw new AnalysisException("Only support OLAP table, but table
type of table_id "
+ tableId + " is " + table.getType());
}
+ if (Env.getCurrentEnv().getGroupCommitManager().isBlock(tableId)) {
+ String msg = "insert table " + tableId +
GroupCommitPlanner.SCHEMA_CHANGE;
+ LOG.info(msg);
+ throw new AnalysisException(msg);
+ }
if (Config.group_commit_timeout_multipler > 0) {
int timeoutS = Math.max((int) (((OlapTable)
table).getGroupCommitIntervalMs() / 1000.0
* Config.group_commit_timeout_multipler), 600);
diff --git
a/regression-test/suites/schema_change_p0/test_schema_change_with_group_commit.groovy
b/regression-test/suites/schema_change_p0/test_schema_change_with_group_commit.groovy
new file mode 100644
index 00000000000..09ecb9ccfa5
--- /dev/null
+++
b/regression-test/suites/schema_change_p0/test_schema_change_with_group_commit.groovy
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.concurrent.TimeUnit
+import org.awaitility.Awaitility
+import org.apache.doris.regression.suite.ClusterOptions
+
+suite("test_schema_change_with_group_commit", "docker") {
+ def options = new ClusterOptions()
+ options.feConfigs += [
+ 'wait_internal_group_commit_finish=true',
+ 'group_commit_interval_ms_default_value=2'
+ ]
+ options.beConfigs += [
+ 'group_commit_replay_wal_retry_num=2',
+ 'group_commit_replay_wal_retry_interval_seconds=1',
+ 'group_commit_wait_replay_wal_finish=true',
+ 'wait_internal_group_commit_finish=true'
+ ]
+ options.setFeNum(1)
+ options.setBeNum(1)
+ options.cloudMode = true
+ docker(options) {
+ def tableName3 = "test_schema_change_with_group_commit"
+
+ def getJobState = { tableName ->
+ def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE
IndexName='${tableName}' ORDER BY createtime DESC LIMIT 1 """
+ return jobStateResult[0][9]
+ }
+ def execStreamLoad = {
+ streamLoad {
+ table "${tableName3}"
+
+ set 'column_separator', ','
+
+ file 'all_types.csv'
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ if (json.Status.toLowerCase() == "success") {
+ assertEquals(2500, json.NumberTotalRows)
+ assertEquals(0, json.NumberFilteredRows)
+ } else {
+ assertTrue(json.Message.contains("blocked on schema
change"))
+ }
+ }
+ }
+ }
+
+ sql """ DROP TABLE IF EXISTS ${tableName3} """
+
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName3} (
+ `k1` int(11) NULL,
+ `k2` tinyint(4) NULL,
+ `k3` smallint(6) NULL,
+ `k4` int(30) NULL,
+ `k5` largeint(40) NULL,
+ `k6` float NULL,
+ `k7` double NULL,
+ `k8` decimal(9, 0) NULL,
+ `k9` char(10) NULL,
+ `k10` varchar(1024) NULL,
+ `k11` text NULL,
+ `k12` date NULL,
+ `k13` datetime NULL
+ ) ENGINE=OLAP
+ unique KEY(k1, k2, k3)
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "enable_unique_key_merge_on_write" = "true"
+ );
+ """
+
+ execStreamLoad()
+
+ sql """ alter table ${tableName3} modify column k4 string NULL"""
+
+ Awaitility.await().atMost(30, TimeUnit.SECONDS).pollDelay(10,
TimeUnit.MILLISECONDS).pollInterval(10, TimeUnit.MILLISECONDS).until(
+ {
+ String res = getJobState(tableName3)
+ if (res == "FINISHED" || res == "CANCELLED") {
+ assertEquals("FINISHED", res)
+ return true
+ }
+ execStreamLoad()
+ return false
+ }
+ )
+ }
+
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]