This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 0b9817ea4b4 [cherry-pick](branch-2.1) Pick "[Enhancement](txn) Block
new insert into if schema change happens during transaction (#39483)" (#40115)
0b9817ea4b4 is described below
commit 0b9817ea4b4840597b6433189389336b18c1b9fd
Author: abmdocrt <[email protected]>
AuthorDate: Fri Aug 30 10:01:10 2024 +0800
[cherry-pick](branch-2.1) Pick "[Enhancement](txn) Block new insert into if
schema change happens during transaction (#39483)" (#40115)
## Proposed changes
Pick #39483
<!--Describe your changes.-->
---
be/src/olap/schema_change.cpp | 1 +
.../insert/BatchInsertIntoTableCommand.java | 11 ++
.../trees/plans/physical/PhysicalResultSink.java | 4 -
.../java/org/apache/doris/qe/StmtExecutor.java | 1 +
.../apache/doris/transaction/TransactionEntry.java | 18 +++
.../data/insert_p0/{ => transaction}/test_txn.out | 0
.../insert_p0/{ => transaction}/txn_insert.out | 0
.../txn_insert_values_with_schema_change.out | 0
...pecify_columns_schema_change_add_key_column.out | 15 +++
...cify_columns_schema_change_add_value_column.out | 15 +++
...pecify_columns_schema_change_reorder_column.out | 19 +++
.../insert_p0/{ => transaction}/test_txn.groovy | 0
.../insert_p0/{ => transaction}/txn_insert.groovy | 0
.../txn_insert_values_with_schema_change.groovy | 2 +-
...ify_columns_schema_change_add_key_column.groovy | 127 ++++++++++++++++++++
...y_columns_schema_change_add_value_column.groovy | 129 +++++++++++++++++++++
...ify_columns_schema_change_reorder_column.groovy | 129 +++++++++++++++++++++
17 files changed, 466 insertions(+), 5 deletions(-)
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index 97d367e78d4..f50c1fa9522 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -735,6 +735,7 @@ std::unordered_set<int64_t>
SchemaChangeHandler::_tablet_ids_in_converting;
// The admin should upgrade all BE and then upgrade FE.
// Should delete the old code after upgrade finished.
Status SchemaChangeHandler::_do_process_alter_tablet_v2(const
TAlterTabletReqV2& request) {
+ DBUG_EXECUTE_IF("SchemaChangeJob._do_process_alter_tablet.sleep", {
sleep(10); })
Status res = Status::OK();
TabletSharedPtr base_tablet =
StorageEngine::instance()->tablet_manager()->get_tablet(request.base_tablet_id);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BatchInsertIntoTableCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BatchInsertIntoTableCommand.java
index 4b7afb1f6a8..12c9827f552 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BatchInsertIntoTableCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BatchInsertIntoTableCommand.java
@@ -19,6 +19,7 @@ package org.apache.doris.nereids.trees.plans.commands.insert;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.ErrorCode;
@@ -114,6 +115,16 @@ public class BatchInsertIntoTableCommand extends Command
implements NoForward, E
Preconditions.checkArgument(plan.isPresent(), "insert into command
must contain OlapTableSinkNode");
sink = ((PhysicalOlapTableSink<?>) plan.get());
Table targetTable = sink.getTargetTable();
+ if (ctx.getTxnEntry().isFirstTxnInsert()) {
+ ctx.getTxnEntry().setTxnSchemaVersion(((OlapTable)
targetTable).getBaseSchemaVersion());
+ ctx.getTxnEntry().setFirstTxnInsert(false);
+ } else {
+ if (((OlapTable) targetTable).getBaseSchemaVersion() !=
ctx.getTxnEntry().getTxnSchemaVersion()) {
+ throw new AnalysisException("There are schema changes in
one transaction, "
+ + "you can commit this transaction with formal
data or rollback "
+ + "this whole transaction.");
+ }
+ }
// should set columns of sink since we maybe generate some
invisible columns
List<Column> fullSchema = sink.getTargetTable().getFullSchema();
List<Column> targetSchema = Lists.newArrayList();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalResultSink.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalResultSink.java
index aceb1f13774..8fb6dfb286e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalResultSink.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalResultSink.java
@@ -58,10 +58,6 @@ public class PhysicalResultSink<CHILD_TYPE extends Plan>
extends PhysicalSink<CH
logicalProperties, physicalProperties, statistics, child);
}
- public List<NamedExpression> getOutputExprs() {
- return outputExprs;
- }
-
@Override
public PhysicalResultSink<Plan> withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 1,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index c78a701d279..062f83443f9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -1978,6 +1978,7 @@ public class StmtExecutor {
.setThriftRpcTimeoutMs(5000).setTxnId(-1).setDb("").setTbl("")
.setMaxFilterRatio(context.getSessionVariable().getEnableInsertStrict() ? 0
:
context.getSessionVariable().getInsertMaxFilterRatio()));
+ context.getTxnEntry().setFirstTxnInsert(true);
StringBuilder sb = new StringBuilder();
sb.append("{'label':'").append(context.getTxnEntry().getLabel()).append("',
'status':'")
.append(TransactionStatus.PREPARE.name());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
index 816740a3202..b88ca66027a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
@@ -67,6 +67,8 @@ public class TransactionEntry {
private List<InternalService.PDataRow> dataToSend = new ArrayList<>();
private long rowsInTransaction = 0;
private Types.PUniqueId pLoadId;
+ private boolean isFirstTxnInsert = false;
+ private volatile int txnSchemaVersion = -1;
// for insert into select for multi tables
private boolean isTransactionBegan = false;
@@ -164,6 +166,22 @@ public class TransactionEntry {
this.pLoadId = pLoadId;
}
+ public boolean isFirstTxnInsert() {
+ return isFirstTxnInsert;
+ }
+
+ public void setFirstTxnInsert(boolean firstTxnInsert) {
+ isFirstTxnInsert = firstTxnInsert;
+ }
+
+ public int getTxnSchemaVersion() {
+ return txnSchemaVersion;
+ }
+
+ public void setTxnSchemaVersion(int txnSchemaVersion) {
+ this.txnSchemaVersion = txnSchemaVersion;
+ }
+
// Used for insert into select
public void beginTransaction(DatabaseIf database, TableIf table)
throws DdlException, BeginTransactionException,
MetaNotFoundException, AnalysisException,
diff --git a/regression-test/data/insert_p0/test_txn.out
b/regression-test/data/insert_p0/transaction/test_txn.out
similarity index 100%
rename from regression-test/data/insert_p0/test_txn.out
rename to regression-test/data/insert_p0/transaction/test_txn.out
diff --git a/regression-test/data/insert_p0/txn_insert.out
b/regression-test/data/insert_p0/transaction/txn_insert.out
similarity index 100%
rename from regression-test/data/insert_p0/txn_insert.out
rename to regression-test/data/insert_p0/transaction/txn_insert.out
diff --git
a/regression-test/data/insert_p0/txn_insert_values_with_schema_change.out
b/regression-test/data/insert_p0/transaction/txn_insert_values_with_schema_change.out
similarity index 100%
rename from
regression-test/data/insert_p0/txn_insert_values_with_schema_change.out
rename to
regression-test/data/insert_p0/transaction/txn_insert_values_with_schema_change.out
diff --git
a/regression-test/data/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_add_key_column.out
b/regression-test/data/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_add_key_column.out
new file mode 100644
index 00000000000..b06ce07b4a8
--- /dev/null
+++
b/regression-test/data/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_add_key_column.out
@@ -0,0 +1,15 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_desc1 --
+c1 int Yes true \N
+c2 int Yes false \N NONE
+c3 int Yes false \N NONE
+
+-- !select_desc2 --
+c1 int Yes true \N
+new_col int Yes true \N
+c2 int Yes false \N NONE
+c3 int Yes false \N NONE
+
+-- !select1 --
+1 \N 2 3
+
diff --git
a/regression-test/data/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_add_value_column.out
b/regression-test/data/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_add_value_column.out
new file mode 100644
index 00000000000..560051c8800
--- /dev/null
+++
b/regression-test/data/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_add_value_column.out
@@ -0,0 +1,15 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_desc1 --
+c1 int Yes true \N
+c2 int Yes false \N NONE
+c3 int Yes false \N NONE
+
+-- !select_desc2 --
+c1 int Yes true \N
+c2 int Yes false \N NONE
+c3 int Yes false \N NONE
+new_col int Yes false \N NONE
+
+-- !select1 --
+1 2 3 \N
+
diff --git
a/regression-test/data/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_reorder_column.out
b/regression-test/data/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_reorder_column.out
new file mode 100644
index 00000000000..2cac2aa11bc
--- /dev/null
+++
b/regression-test/data/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_reorder_column.out
@@ -0,0 +1,19 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_desc1 --
+c1 int Yes true \N
+c2 bigint Yes false \N NONE
+c3 int Yes false \N NONE
+
+-- !select_desc2 --
+c1 int Yes true \N
+c3 int Yes false \N NONE
+c2 bigint Yes false \N NONE
+
+-- !select_desc3 --
+c1 int Yes true \N
+c3 int Yes false \N NONE
+c2 bigint Yes false \N NONE
+
+-- !select1 --
+1 3 2
+
diff --git a/regression-test/suites/insert_p0/test_txn.groovy
b/regression-test/suites/insert_p0/transaction/test_txn.groovy
similarity index 100%
rename from regression-test/suites/insert_p0/test_txn.groovy
rename to regression-test/suites/insert_p0/transaction/test_txn.groovy
diff --git a/regression-test/suites/insert_p0/txn_insert.groovy
b/regression-test/suites/insert_p0/transaction/txn_insert.groovy
similarity index 100%
rename from regression-test/suites/insert_p0/txn_insert.groovy
rename to regression-test/suites/insert_p0/transaction/txn_insert.groovy
diff --git
a/regression-test/suites/insert_p0/txn_insert_values_with_schema_change.groovy
b/regression-test/suites/insert_p0/transaction/txn_insert_values_with_schema_change.groovy
similarity index 98%
rename from
regression-test/suites/insert_p0/txn_insert_values_with_schema_change.groovy
rename to
regression-test/suites/insert_p0/transaction/txn_insert_values_with_schema_change.groovy
index cd428b185c4..477579760aa 100644
---
a/regression-test/suites/insert_p0/txn_insert_values_with_schema_change.groovy
+++
b/regression-test/suites/insert_p0/transaction/txn_insert_values_with_schema_change.groovy
@@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit
suite("txn_insert_values_with_schema_change") {
def table = "txn_insert_values_with_schema_change"
- def dbName = "regression_test_insert_p0"
+ def dbName = "regression_test_insert_p0_transaction"
def url = getServerPrepareJdbcUrl(context.config.jdbcUrl,
dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true"
logger.info("url: ${url}")
List<String> errors = new ArrayList<>()
diff --git
a/regression-test/suites/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_add_key_column.groovy
b/regression-test/suites/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_add_key_column.groovy
new file mode 100644
index 00000000000..de9162cabc7
--- /dev/null
+++
b/regression-test/suites/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_add_key_column.groovy
@@ -0,0 +1,127 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.sql.Connection
+import java.sql.DriverManager
+import java.sql.Statement
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.TimeUnit
+
+suite("txn_insert_with_specify_columns_schema_change_add_key_column",
"nonConcurrent") {
+ if(!isCloudMode()) {
+ def table =
"txn_insert_with_specify_columns_schema_change_add_key_column"
+
+ def dbName = "regression_test_insert_p0_transaction"
+ def url = getServerPrepareJdbcUrl(context.config.jdbcUrl,
dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true"
+ logger.info("url: ${url}")
+ List<String> errors = new ArrayList<>()
+ CountDownLatch insertLatch = new CountDownLatch(1)
+ CountDownLatch insertLatch2 = new CountDownLatch(1)
+
+ sql """ DROP TABLE IF EXISTS $table force """
+ sql """
+ create table $table (
+ c1 INT NULL,
+ c2 INT NULL,
+ c3 INT NULL
+ ) ENGINE=OLAP
+ UNIQUE KEY(c1)
+ DISTRIBUTED BY HASH(c1) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1");
+ """
+ sql """ insert into ${table} (c3, c2, c1) values (3, 2, 1) """
+
+ def getAlterTableState = { job_state ->
+ def retry = 0
+ sql "use ${dbName};"
+ while (true) {
+ sleep(2000)
+ def state = sql " show alter table column where tablename =
'${table}' order by CreateTime desc limit 1"
+ logger.info("alter table state: ${state}")
+ if (state.size() > 0 && state[0][9] == job_state) {
+ return
+ }
+ retry++
+ if (retry >= 10) {
+ break
+ }
+ }
+ assertTrue(false, "alter table job state is ${last_state}, not
${job_state} after retry ${retry} times")
+ }
+
+ def txnInsert = {
+ try (Connection conn = DriverManager.getConnection(url,
context.config.jdbcUser, context.config.jdbcPassword);
+ Statement statement = conn.createStatement()) {
+ try {
+ qt_select_desc1 """desc $table"""
+
+ insertLatch.await(2, TimeUnit.MINUTES)
+
+ statement.execute("begin")
+ statement.execute("insert into ${table} (c3, c2, c1) values
(33, 22, 11),(333, 222, 111);")
+
+ insertLatch2.await(2, TimeUnit.MINUTES)
+ qt_select_desc2 """desc $table"""
+ statement.execute("insert into ${table} (c3, c2, c1)
values(3333, 2222, 1111);")
+ statement.execute("insert into ${table} (c3, c2, c1)
values(33333, 22222, 11111),(333333, 222222, 111111);")
+ statement.execute("commit")
+ } catch (Exception e) {
+ logger.info("txn insert failed", e)
+ assertTrue(e.getMessage().contains("There are schema
changes in one transaction, you can commit this transaction with formal data or
rollback this whole transaction."))
+ statement.execute("rollback")
+ }
+ }
+ }
+
+ def schemaChange = { sql ->
+ try (Connection conn = DriverManager.getConnection(url,
context.config.jdbcUser, context.config.jdbcPassword);
+ Statement statement = conn.createStatement()) {
+ statement.execute(sql)
+ getAlterTableState("RUNNING")
+ insertLatch.countDown()
+ getAlterTableState("FINISHED")
+ insertLatch2.countDown()
+ } catch (Throwable e) {
+ logger.error("schema change failed", e)
+ errors.add("schema change failed " + e.getMessage())
+ }
+ }
+
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ try {
+
GetDebugPoint().enableDebugPointForAllBEs("SchemaChangeJob._do_process_alter_tablet.sleep")
+ Thread schema_change_thread = new Thread(() -> schemaChange("alter
table ${table} add column new_col int key after c1;"))
+ Thread insert_thread = new Thread(() -> txnInsert())
+ schema_change_thread.start()
+ insert_thread.start()
+ schema_change_thread.join()
+ insert_thread.join()
+
+ logger.info("errors: " + errors)
+ assertEquals(0, errors.size())
+ getAlterTableState("FINISHED")
+ order_qt_select1 """select * from ${table} order by c1, c2, c3"""
+ } catch (Exception e) {
+ logger.info("failed: " + e.getMessage())
+ assertTrue(false)
+ } finally {
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ }
+ }
+}
diff --git
a/regression-test/suites/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_add_value_column.groovy
b/regression-test/suites/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_add_value_column.groovy
new file mode 100644
index 00000000000..1d072f572c6
--- /dev/null
+++
b/regression-test/suites/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_add_value_column.groovy
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.sql.Connection
+import java.sql.DriverManager
+import java.sql.Statement
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.TimeUnit
+
+suite("txn_insert_with_specify_columns_schema_change_add_value_column",
"nonConcurrent") {
+ if(!isCloudMode()) {
+ def table =
"txn_insert_with_specify_columns_schema_change_add_value_column"
+
+ def dbName = "regression_test_insert_p0_transaction"
+ def url = getServerPrepareJdbcUrl(context.config.jdbcUrl,
dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true"
+ logger.info("url: ${url}")
+ List<String> errors = new ArrayList<>()
+ CountDownLatch insertLatch = new CountDownLatch(1)
+ CountDownLatch schemaChangeLatch = new CountDownLatch(1)
+
+ sql """ DROP TABLE IF EXISTS $table force """
+ sql """
+ create table $table (
+ c1 INT NULL,
+ c2 INT NULL,
+ c3 INT NULL
+ ) ENGINE=OLAP
+ UNIQUE KEY(c1)
+ DISTRIBUTED BY HASH(c1) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1");
+ """
+ sql """ insert into ${table} (c3, c2, c1) values (3, 2, 1) """
+
+ def getAlterTableState = { job_state ->
+ def retry = 0
+ sql "use ${dbName};"
+ while (true) {
+ sleep(2000)
+ def state = sql " show alter table column where tablename =
'${table}' order by CreateTime desc limit 1"
+ logger.info("alter table state: ${state}")
+ if (state.size() > 0 && state[0][9] == job_state) {
+ return
+ }
+ retry++
+ if (retry >= 10) {
+ break
+ }
+ }
+ assertTrue(false, "alter table job state is ${last_state}, not
${job_state} after retry ${retry} times")
+ }
+
+ def txnInsert = {
+ try (Connection conn = DriverManager.getConnection(url,
context.config.jdbcUser, context.config.jdbcPassword);
+ Statement statement = conn.createStatement()) {
+ try {
+ qt_select_desc1 """desc $table"""
+
+ statement.execute("begin")
+ statement.execute("insert into ${table} (c3, c2, c1)
values (33, 22, 11),(333, 222, 111);")
+
+ schemaChangeLatch.countDown()
+
+ insertLatch.await(2, TimeUnit.MINUTES)
+
+ qt_select_desc2 """desc $table"""
+ statement.execute("insert into ${table} (c3, c2, c1)
values(3333, 2222, 1111);")
+ statement.execute("insert into ${table} (c3, c2, c1)
values(33333, 22222, 11111),(333333, 222222, 111111);")
+ statement.execute("commit")
+ } catch (Exception e) {
+ logger.info("txn insert failed", e)
+ assertTrue(e.getMessage().contains("There are schema
changes in one transaction, you can commit this transaction with formal data or
rollback this whole transaction."))
+ statement.execute("rollback")
+ }
+ }
+ }
+
+ def schemaChange = { sql ->
+ try (Connection conn = DriverManager.getConnection(url,
context.config.jdbcUser, context.config.jdbcPassword);
+ Statement statement = conn.createStatement()) {
+ schemaChangeLatch.await(2, TimeUnit.MINUTES)
+ statement.execute(sql)
+ getAlterTableState("FINISHED")
+ insertLatch.countDown()
+ } catch (Throwable e) {
+ logger.error("schema change failed", e)
+ errors.add("schema change failed " + e.getMessage())
+ }
+ }
+
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ try {
+
GetDebugPoint().enableDebugPointForAllBEs("SchemaChangeJob._do_process_alter_tablet.sleep")
+ Thread schema_change_thread = new Thread(() -> schemaChange("alter
table ${table} add column new_col int after c3;"))
+ Thread insert_thread = new Thread(() -> txnInsert())
+ schema_change_thread.start()
+ insert_thread.start()
+
+ schema_change_thread.join()
+ insert_thread.join()
+
+ logger.info("errors: " + errors)
+ assertEquals(0, errors.size())
+ getAlterTableState("FINISHED")
+ order_qt_select1 """select * from ${table} order by c1, c2, c3"""
+ } catch (Exception e) {
+ logger.info("failed: " + e.getMessage())
+ assertTrue(false)
+ } finally {
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ }
+ }
+
+}
diff --git
a/regression-test/suites/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_reorder_column.groovy
b/regression-test/suites/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_reorder_column.groovy
new file mode 100644
index 00000000000..e4c16fdf8c3
--- /dev/null
+++
b/regression-test/suites/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_reorder_column.groovy
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.sql.Connection
+import java.sql.DriverManager
+import java.sql.Statement
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.TimeUnit
+
+suite("txn_insert_with_specify_columns_schema_change_reorder_column",
"nonConcurrent") {
+ if(!isCloudMode()){
+ def table =
"txn_insert_with_specify_columns_schema_change_reorder_column"
+
+ def dbName = "regression_test_insert_p0_transaction"
+ def url = getServerPrepareJdbcUrl(context.config.jdbcUrl,
dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true"
+ logger.info("url: ${url}")
+ List<String> errors = new ArrayList<>()
+ CountDownLatch insertLatch = new CountDownLatch(1)
+ CountDownLatch insertLatch2 = new CountDownLatch(1)
+
+ sql """ DROP TABLE IF EXISTS $table force """
+ sql """
+ create table $table (
+ c1 INT NULL,
+ c2 BIGINT NULL,
+ c3 INT NULL
+ ) ENGINE=OLAP
+ UNIQUE KEY(c1)
+ DISTRIBUTED BY HASH(c1) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1");
+ """
+ sql """ insert into ${table} (c3, c2, c1) values (3, 2, 1) """
+
+ def getAlterTableState = { job_state ->
+ def retry = 0
+ sql "use ${dbName};"
+ while (true) {
+ sleep(2000)
+ def state = sql " show alter table column where tablename =
'${table}' order by CreateTime desc limit 1"
+ logger.info("alter table state: ${state}")
+ if (state.size() > 0 && state[0][9] == job_state) {
+ return
+ }
+ retry++
+ if (retry >= 10) {
+ break
+ }
+ }
+ assertTrue(false, "alter table job state is ${last_state}, not
${job_state} after retry ${retry} times")
+ }
+
+ def txnInsert = {
+ try (Connection conn = DriverManager.getConnection(url,
context.config.jdbcUser, context.config.jdbcPassword);
+ Statement statement = conn.createStatement()) {
+ try {
+ qt_select_desc1 """desc $table"""
+
+ insertLatch.await(2, TimeUnit.MINUTES)
+
+ statement.execute("begin")
+ statement.execute("insert into ${table} (c3, c2, c1)
values (33, 22, 11),(333, 222, 111);")
+
+ insertLatch2.await(2, TimeUnit.MINUTES)
+
+ qt_select_desc2 """desc $table"""
+ statement.execute("insert into ${table} (c3, c2, c1)
values(3333, 2222, 1111);")
+ statement.execute("insert into ${table} (c3, c2, c1)
values(33333, 22222, 11111),(333333, 222222, 111111);")
+ statement.execute("commit")
+ } catch (Throwable e) {
+ logger.error("txn insert failed", e)
+ assertTrue(e.getMessage().contains("There are schema
changes in one transaction, you can commit this transaction with formal data or
rollback this whole transaction."))
+ statement.execute("rollback")
+ }
+ }
+ }
+
+ def schemaChange = { sql ->
+ try (Connection conn = DriverManager.getConnection(url,
context.config.jdbcUser, context.config.jdbcPassword);
+ Statement statement = conn.createStatement()) {
+ statement.execute(sql)
+ getAlterTableState("RUNNING")
+ insertLatch.countDown()
+ getAlterTableState("FINISHED")
+ insertLatch2.countDown()
+ } catch (Throwable e) {
+ logger.error("schema change failed", e)
+ errors.add("schema change failed " + e.getMessage())
+ }
+ }
+
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ try {
+
GetDebugPoint().enableDebugPointForAllBEs("SchemaChangeJob._do_process_alter_tablet.sleep")
+ Thread schema_change_thread = new Thread(() -> schemaChange("alter
table ${table} order by (c1,c3,c2);"))
+ Thread insert_thread = new Thread(() -> txnInsert())
+ schema_change_thread.start()
+ insert_thread.start()
+ schema_change_thread.join()
+ insert_thread.join()
+
+ logger.info("errors: " + errors)
+ assertEquals(0, errors.size())
+ getAlterTableState("FINISHED")
+ qt_select_desc3 """desc $table"""
+ order_qt_select1 """select * from ${table} order by c1, c2, c3"""
+ } catch (Exception e) {
+ logger.info("failed: " + e.getMessage())
+ assertTrue(false)
+ } finally {
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]