This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ebc15fc6ccf [fix](transaction) Fix concurrent schema change and txn
cause dead lock (#26428)
ebc15fc6ccf is described below
commit ebc15fc6ccf012d3bd7765fce33518b85adfcf30
Author: yujun <[email protected]>
AuthorDate: Mon Nov 13 21:39:28 2023 +0800
[fix](transaction) Fix concurrent schema change and txn cause dead lock
(#26428)
Concurrent schema change and txn may cause dead lock. An example:
Txn T commit but not publish;
Run schema change or rollup on T's related partition, add alter replica R;
sc/rollup add a sched txn watermark M;
Restart fe;
After fe restart, T's loadedTblIndexes will clear because it's not save to
disk;
T will publish version to all tablet, including sc/rollup's new alter
replica R;
Since R not contains txn data, so the T will fail. It will then always
waitting for R's data;
sc/rollup wait for txn before M to finish, only after that it will let R
copy history data;
Since T's not finished, so sc/rollup will always wait, so R will nerver
copy history data;
Txn T and sc/rollup will wait each other forever, cause dead lock;
Fix: because sc/rollup will ensure double write after the sched watermark
M, so for finish transaction, when checking a alter replica:
if txn id is bigger than M, check it just like a normal replica;
otherwise skip check this replica, the BE will modify history data later.
---
be/src/olap/tablet_meta.cpp | 22 ++++
be/src/olap/tablet_meta.h | 2 +
be/src/olap/task/engine_publish_version_task.cpp | 15 ++-
.../main/java/org/apache/doris/common/Config.java | 7 ++
.../java/org/apache/doris/alter/AlterJobV2.java | 9 ++
.../java/org/apache/doris/alter/RollupJobV2.java | 4 -
.../org/apache/doris/alter/SchemaChangeJobV2.java | 3 -
.../doris/transaction/DatabaseTransactionMgr.java | 37 +++++-
.../test_schema_change_concurrent_with_txn.out | 61 ++++++++++
.../org/apache/doris/regression/suite/Suite.groovy | 7 +-
.../doris/regression/suite/SuiteContext.groovy | 26 ++++
.../test_schema_change_concurrent_with_txn.groovy | 135 +++++++++++++++++++++
12 files changed, 306 insertions(+), 22 deletions(-)
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index 84160c411e2..76cf14a75ba 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -1074,4 +1074,26 @@ std::shared_ptr<roaring::Roaring>
DeleteBitmap::get_agg(const BitmapKey& bmk) co
std::atomic<ShardedLRUCache*> DeleteBitmap::AggCache::s_repr {nullptr};
+std::string tablet_state_name(TabletState state) {
+ switch (state) {
+ case TABLET_NOTREADY:
+ return "TABLET_NOTREADY";
+
+ case TABLET_RUNNING:
+ return "TABLET_RUNNING";
+
+ case TABLET_TOMBSTONED:
+ return "TABLET_TOMBSTONED";
+
+ case TABLET_STOPPED:
+ return "TABLET_STOPPED";
+
+ case TABLET_SHUTDOWN:
+ return "TABLET_SHUTDOWN";
+
+ default:
+ return "TabletState(" + std::to_string(state) + ")";
+ }
+}
+
} // namespace doris
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index 622d5c44f98..60cc485882a 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -632,6 +632,8 @@ inline bool TabletMeta::all_beta() const {
return true;
}
+std::string tablet_state_name(TabletState state);
+
// Only for unit test now.
bool operator==(const TabletMeta& a, const TabletMeta& b);
bool operator!=(const TabletMeta& a, const TabletMeta& b);
diff --git a/be/src/olap/task/engine_publish_version_task.cpp
b/be/src/olap/task/engine_publish_version_task.cpp
index d914816e0ad..040b5a7edea 100644
--- a/be/src/olap/task/engine_publish_version_task.cpp
+++ b/be/src/olap/task/engine_publish_version_task.cpp
@@ -244,14 +244,17 @@ Status EnginePublishVersionTask::finish() {
add_error_tablet_id(tablet_id);
if (res.ok()) {
res = Status::Error<VERSION_NOT_EXIST>(
- "tablet {} not exists version {}",
tablet_id,
+ "tablet {} with state {} not exists
version {}", tablet_id,
+ tablet_state_name(tablet->tablet_state()),
par_ver_info.version);
}
- LOG(WARNING) << "publish version failed on
transaction, tablet version not "
- "exists. "
- << "transaction_id=" << transaction_id
- << ", tablet_id=" << tablet_id
- << ", version=" << par_ver_info.version;
+ LOG(WARNING)
+ << "publish version failed on transaction,
tablet version not "
+ "exists. "
+ << "transaction_id=" << transaction_id
+ << ", tablet_id=" << tablet_id
+ << ", tablet_state=" <<
tablet_state_name(tablet->tablet_state())
+ << ", version=" << par_ver_info.version;
}
}
}
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 50c31755a06..2b8e3414038 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -439,6 +439,13 @@ public class Config extends ConfigBase {
+ "then the load task will be successful." })
public static int publish_wait_time_second = 300;
+ @ConfField(mutable = true, masterOnly = true, description = {"导入 Publish
阶段是否检查正在做 Schema 变更的副本。"
+ + "正常情况下,不要关闭此检查。除非在极端情况下出现导入和 Schema 变更出现互相等待死锁时才临时打开。",
+ "Check the replicas which are doing schema change when publish
transaction. Do not turn off this check "
+ + " under normal circumstances. It's only temporarily skip check
if publish version and schema change have"
+ + " dead lock" })
+ public static boolean publish_version_check_alter_replica = true;
+
@ConfField(mutable = true, masterOnly = true, description =
{"提交事务的最大超时时间,单位是秒。"
+ "该参数仅用于事务型 insert 操作中。",
"Maximal waiting time for all data inserted before one transaction
to be committed, in seconds. "
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java
index fb616fe429e..c1984d31d47 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java
@@ -89,6 +89,11 @@ public abstract class AlterJobV2 implements Writable {
@SerializedName(value = "rawSql")
protected String rawSql;
+ // The job will wait all transactions before this txn id finished, then
send the schema_change/rollup tasks.
+ @SerializedName(value = "watershedTxnId")
+ protected long watershedTxnId = -1;
+
+
public AlterJobV2(String rawSql, long jobId, JobType jobType, long dbId,
long tableId, String tableName,
long timeoutMs) {
this.rawSql = rawSql;
@@ -135,6 +140,10 @@ public abstract class AlterJobV2 implements Writable {
return tableName;
}
+ public long getWatershedTxnId() {
+ return watershedTxnId;
+ }
+
public boolean isTimeout() {
return System.currentTimeMillis() - createTimeMs > timeoutMs;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
index 6736bd3aa5b..dfc825dbbc1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
@@ -134,10 +134,6 @@ public class RollupJobV2 extends AlterJobV2 implements
GsonPostProcessable {
@SerializedName(value = "storageFormat")
private TStorageFormat storageFormat = TStorageFormat.DEFAULT;
- // The rollup job will wait all transactions before this txn id finished,
then send the rollup tasks.
- @SerializedName(value = "watershedTxnId")
- protected long watershedTxnId = -1;
-
// save all create rollup tasks
private AgentBatchTask rollupBatchTask = new AgentBatchTask();
// save failed task after retry three times, tabletId -> agentTask
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index 60b19e7f368..a65612989ab 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -126,9 +126,6 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
@SerializedName(value = "indexes")
private List<Index> indexes = null;
- // The schema change job will wait all transactions before this txn id
finished, then send the schema change tasks.
- @SerializedName(value = "watershedTxnId")
- protected long watershedTxnId = -1;
@SerializedName(value = "storageFormat")
private TStorageFormat storageFormat = TStorageFormat.DEFAULT;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index 42cac0c9690..cac271af4aa 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -17,6 +17,7 @@
package org.apache.doris.transaction;
+import org.apache.doris.alter.AlterJobV2;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
@@ -535,6 +536,7 @@ public class DatabaseTransactionMgr {
transactionState.prolongPublishTimeout();
}
+ // (TODO): ignore the alter index if txn id is less than sc
sched watermark
int loadRequiredReplicaNum =
table.getLoadRequiredReplicaNum(partition.getId());
for (MaterializedIndex index : allIndices) {
for (Tablet tablet : index.getTablets()) {
@@ -553,6 +555,7 @@ public class DatabaseTransactionMgr {
throw new
TransactionCommitFailedException("could not find replica for tablet ["
+ tabletId + "], backend [" +
tabletBackend + "]");
}
+
// if the tablet have no replica's to commit or
the tablet is a rolling up tablet,
// the commit backends maybe null
// if the commit backends is null, set all
replicas as error replicas
@@ -985,6 +988,7 @@ public class DatabaseTransactionMgr {
continue;
}
+ boolean alterReplicaLoadedTxn =
isAlterReplicaLoadedTxn(transactionId, table);
Iterator<PartitionCommitInfo> partitionCommitInfoIterator
=
tableCommitInfo.getIdToPartitionCommitInfo().values().iterator();
while (partitionCommitInfoIterator.hasNext()) {
@@ -1037,7 +1041,7 @@ public class DatabaseTransactionMgr {
tabletWriteFailedReplicas.clear();
tabletVersionFailedReplicas.clear();
for (Replica replica : tablet.getReplicas()) {
-
checkReplicaContinuousVersionSucc(tablet.getId(), replica,
+
checkReplicaContinuousVersionSucc(tablet.getId(), replica,
alterReplicaLoadedTxn,
partitionCommitInfo.getVersion(),
publishTasks.get(replica.getBackendId()),
errorReplicaIds, tabletSuccReplicas,
tabletWriteFailedReplicas,
tabletVersionFailedReplicas);
@@ -1132,8 +1136,24 @@ public class DatabaseTransactionMgr {
LOG.info("finish transaction {} successfully, publish result: {}",
transactionState, publishResult.name());
}
- private void checkReplicaContinuousVersionSucc(long tabletId, Replica
replica, long version,
- PublishVersionTask backendPublishTask, Set<Long> errorReplicaIds,
List<Replica> tabletSuccReplicas,
+ private boolean isAlterReplicaLoadedTxn(long transactionId, OlapTable
table) {
+ List<AlterJobV2> unfinishedAlterJobs = null;
+ if (table.getState() == OlapTable.OlapTableState.SCHEMA_CHANGE) {
+ unfinishedAlterJobs =
Env.getCurrentEnv().getAlterInstance().getSchemaChangeHandler()
+ .getUnfinishedAlterJobV2ByTableId(table.getId());
+ } else if (table.getState() == OlapTable.OlapTableState.ROLLUP) {
+ unfinishedAlterJobs =
Env.getCurrentEnv().getAlterInstance().getMaterializedViewHandler()
+ .getUnfinishedAlterJobV2ByTableId(table.getId());
+ } else {
+ return true;
+ }
+
+ return unfinishedAlterJobs.stream().allMatch(job -> transactionId >
job.getWatershedTxnId());
+ }
+
+ private void checkReplicaContinuousVersionSucc(long tabletId, Replica
replica, boolean alterReplicaLoadedTxn,
+ long version, PublishVersionTask backendPublishTask,
+ Set<Long> errorReplicaIds, List<Replica> tabletSuccReplicas,
List<Replica> tabletWriteFailedReplicas, List<Replica>
tabletVersionFailedReplicas) {
if (backendPublishTask == null || !backendPublishTask.isFinished()) {
errorReplicaIds.add(replica.getId());
@@ -1155,6 +1175,17 @@ public class DatabaseTransactionMgr {
}
}
+ // Schema change and rollup has a sched watermark,
+ // it's ensure that alter replicas will load those txns whose txn id >
sched watermark.
+ // But for txns before the sched watermark, the alter replicas maynot
load the txns,
+ // publish will ignore checking them and treat them as success in
advance.
+ // Later be will fill the alter replicas's history data which before
sched watermark.
+ // If failed to fill, fe will set the alter replica bad.
+ if (replica.getState() == Replica.ReplicaState.ALTER
+ && (!alterReplicaLoadedTxn ||
!Config.publish_version_check_alter_replica)) {
+ errorReplicaIds.remove(replica.getId());
+ }
+
if (!errorReplicaIds.contains(replica.getId())) {
if (replica.checkVersionCatchUp(version - 1, true)) {
tabletSuccReplicas.add(replica);
diff --git
a/regression-test/data/schema_change/test_schema_change_concurrent_with_txn.out
b/regression-test/data/schema_change/test_schema_change_concurrent_with_txn.out
new file mode 100644
index 00000000000..1f531cb7703
--- /dev/null
+++
b/regression-test/data/schema_change/test_schema_change_concurrent_with_txn.out
@@ -0,0 +1,61 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_1_1 --
+1 10
+2 20
+
+-- !select_2_1 --
+1 10
+2 20
+
+-- !select_3_1 --
+1 11 111
+2 22 222
+
+-- !select_4_1 --
+1 10
+2 20
+
+-- !select_1_2 --
+1 10
+2 20
+
+-- !select_2_2 --
+1 10
+2 20
+
+-- !select_3_2 --
+1 11 111
+2 22 222
+
+-- !select_4_2 --
+1 10
+2 20
+
+-- !select_1_3 --
+1 10 -1
+2 20 -1
+3 30 -1
+4 40 -1
+5 50 -1
+
+-- !select_2_3 --
+1 10
+2 20
+3 30
+4 40
+5 50
+
+-- !select_3_3 --
+1 11 111
+2 22 222
+3 33 333
+4 44 444
+5 55 555
+
+-- !select_4_3 --
+10 1
+20 2
+30 3
+40 4
+50 5
+
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
index 0c02028c3e7..c9e8fe4215e 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
@@ -168,12 +168,7 @@ class Suite implements GroovyInterceptable {
try {
Thread.currentThread().setName(threadName == null ?
originThreadName : threadName)
if (connInfo != null) {
- def newConnInfo = new ConnectionInfo()
- newConnInfo.conn =
DriverManager.getConnection(connInfo.conn.getMetaData().getURL(),
- connInfo.username, connInfo.password)
- newConnInfo.username = connInfo.username
- newConnInfo.password = connInfo.password
- context.threadLocalConn.set(newConnInfo)
+ context.connectTo(connInfo.conn.getMetaData().getURL(),
connInfo.username, connInfo.password);
}
context.scriptContext.eventListeners.each {
it.onThreadStarted(context) }
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteContext.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteContext.groovy
index d31364eb0ef..31eed478399 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteContext.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteContext.groovy
@@ -250,6 +250,32 @@ class SuiteContext implements Closeable {
}
}
+ public void reconnectFe() {
+ ConnectionInfo connInfo = threadLocalConn.get()
+ if (connInfo == null) {
+ return
+ }
+ connectTo(connInfo.conn.getMetaData().getURL(), connInfo.username,
connInfo.password);
+ }
+
+ public void connectTo(String url, String username, String password) {
+ ConnectionInfo oldConn = threadLocalConn.get()
+ if (oldConn != null) {
+ threadLocalConn.remove()
+ try {
+ oldConn.conn.close()
+ } catch (Throwable t) {
+ log.warn("Close connection failed", t)
+ }
+ }
+
+ def newConnInfo = new ConnectionInfo()
+ newConnInfo.conn = DriverManager.getConnection(url, username, password)
+ newConnInfo.username = username
+ newConnInfo.password = password
+ threadLocalConn.set(newConnInfo)
+ }
+
OutputUtils.OutputBlocksIterator getOutputIterator() {
def outputIt = threadLocalOutputIterator.get()
if (outputIt == null) {
diff --git
a/regression-test/suites/schema_change/test_schema_change_concurrent_with_txn.groovy
b/regression-test/suites/schema_change/test_schema_change_concurrent_with_txn.groovy
new file mode 100644
index 00000000000..4e2f717ed46
--- /dev/null
+++
b/regression-test/suites/schema_change/test_schema_change_concurrent_with_txn.groovy
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.doris.regression.util.NodeType
+
+suite('test_schema_change_concurrent_with_txn') {
+ def options = new ClusterOptions()
+ options.enableDebugPoints()
+ options.feConfigs.add('publish_wait_time_second=-1')
+ docker(options) {
+ sql 'SET GLOBAL insert_visible_timeout_ms = 2000'
+
+ def result = sql 'SELECT DATABASE()'
+ def dbName = result[0][0]
+
+ sql 'CREATE TABLE tbl_1 (k1 INT, k2 INT) PROPERTIES (
"light_schema_change" = "false")'
+ sql 'INSERT INTO tbl_1 VALUES (1, 10)'
+ sql 'INSERT INTO tbl_1 VALUES (2, 20)'
+ order_qt_select_1_1 'SELECT * FROM tbl_1'
+
+ sql 'CREATE TABLE tbl_2 AS SELECT * FROM tbl_1'
+ order_qt_select_2_1 'SELECT * FROM tbl_2'
+
+ sql 'CREATE TABLE tbl_3 (k1 INT, k2 INT, v INT SUM) AGGREGATE KEY (k1,
k2)'
+ sql 'INSERT INTO tbl_3 VALUES (1, 11, 111)'
+ sql 'INSERT INTO tbl_3 VALUES (2, 22, 222)'
+ order_qt_select_3_1 'SELECT * FROM tbl_3'
+
+ sql 'CREATE TABLE tbl_4 (k1 INT, k2 INT)'
+ sql 'INSERT INTO tbl_4 VALUES (1, 10)'
+ sql 'INSERT INTO tbl_4 VALUES (2, 20)'
+ order_qt_select_4_1 'SELECT * FROM tbl_4'
+
+ // stop publish, insert succ, txn is commit but not visible
+ cluster.injectDebugPoints(NodeType.FE,
['PublishVersionDaemon.stop_publish':null])
+
+ sql 'INSERT INTO tbl_1 VALUES (3, 30)'
+ sql 'INSERT INTO tbl_1 VALUES (4, 40)'
+ order_qt_select_1_2 'SELECT * FROM tbl_1'
+
+ sql 'INSERT INTO tbl_2 VALUES (3, 30)'
+ sql 'INSERT INTO tbl_2 VALUES (4, 40)'
+ order_qt_select_2_2 'SELECT * FROM tbl_2'
+
+ sql 'INSERT INTO tbl_3 VALUES (3, 33, 333)'
+ sql 'INSERT INTO tbl_3 VALUES (4, 44, 444)'
+ order_qt_select_3_2 'SELECT * FROM tbl_3'
+
+ sql 'INSERT INTO tbl_4 VALUES (3, 30)'
+ sql 'INSERT INTO tbl_4 VALUES (4, 40)'
+ order_qt_select_4_2 'SELECT * FROM tbl_4'
+
+ result = sql 'SHOW PROC "/transactions"'
+ def runningTxn = result.find({ it[1].indexOf(dbName) > 0 })[2]
+ assertEquals(8, runningTxn as int)
+
+ sql "ALTER TABLE tbl_1 ADD COLUMN k3 INT DEFAULT '-1'"
+ sql 'CREATE MATERIALIZED VIEW tbl_2_mv AS SELECT k1, k1 + k2 FROM
tbl_2'
+ sql 'ALTER TABLE tbl_3 ADD ROLLUP tbl_3_r1(k1, v)'
+ sql 'ALTER TABLE tbl_4 ORDER BY (k2, k1)'
+
+ sleep(5000)
+
+ def jobs = null
+ def scJobState = job -> job[9]
+ def rollupJobState = job -> job[8]
+
+ jobs = sql "SHOW ALTER TABLE COLUMN WHERE TableName = 'tbl_1'"
+ assertEquals(1, jobs.size())
+ assertEquals('WAITING_TXN', scJobState(jobs[0]))
+
+ jobs = sql "SHOW ALTER TABLE MATERIALIZED VIEW WHERE TableName =
'tbl_2'"
+ assertEquals(1, jobs.size())
+ assertEquals('WAITING_TXN', rollupJobState(jobs[0]))
+
+ jobs = sql "SHOW ALTER TABLE ROLLUP WHERE TableName = 'tbl_3'"
+ assertEquals(1, jobs.size())
+ assertEquals('WAITING_TXN', rollupJobState(jobs[0]))
+
+ jobs = sql "SHOW ALTER TABLE COLUMN WHERE TableName = 'tbl_4'"
+ assertEquals(1, jobs.size())
+ assertEquals('WAITING_TXN', scJobState(jobs[0]))
+
+ sql 'INSERT INTO tbl_1(k1, k2) VALUES (5, 50)'
+ sql 'INSERT INTO tbl_2 VALUES (5, 50)'
+ sql 'INSERT INTO tbl_3 VALUES (5, 55, 555)'
+ sql 'INSERT INTO tbl_4(k1, k2) VALUES (5, 50)'
+
+ // After fe restart, transaction's loadedTblIndexes will clear,
+ // then fe will send publish task to all indexes.
+ // But the alter index may add after commit txn, then publish will
failed.
+ cluster.restartFrontends()
+ sleep(30000)
+ context.reconnectFe()
+
+ //cluster.clearFrontendDebugPoints()
+
+ // should publish visible
+ order_qt_select_1_3 'SELECT * FROM tbl_1'
+ order_qt_select_2_3 'SELECT * FROM tbl_2'
+ order_qt_select_3_3 'SELECT * FROM tbl_3'
+ order_qt_select_4_3 'SELECT * FROM tbl_4'
+
+ jobs = sql "SHOW ALTER TABLE COLUMN WHERE TableName = 'tbl_1'"
+ assertEquals(1, jobs.size())
+ assertEquals('FINISHED', scJobState(jobs[0]))
+
+ jobs = sql "SHOW ALTER TABLE MATERIALIZED VIEW WHERE TableName =
'tbl_2'"
+ assertEquals(1, jobs.size())
+ assertEquals('FINISHED', rollupJobState(jobs[0]))
+
+ jobs = sql "SHOW ALTER TABLE ROLLUP WHERE TableName = 'tbl_3'"
+ assertEquals(1, jobs.size())
+ assertEquals('FINISHED', rollupJobState(jobs[0]))
+
+ jobs = sql "SHOW ALTER TABLE COLUMN WHERE TableName = 'tbl_4'"
+ assertEquals(1, jobs.size())
+ assertEquals('FINISHED', scJobState(jobs[0]))
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]