This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ebc15fc6ccf [fix](transaction) Fix concurrent schema change and txn 
cause dead lock (#26428)
ebc15fc6ccf is described below

commit ebc15fc6ccf012d3bd7765fce33518b85adfcf30
Author: yujun <[email protected]>
AuthorDate: Mon Nov 13 21:39:28 2023 +0800

    [fix](transaction) Fix concurrent schema change and txn cause dead lock 
(#26428)
    
    Concurrent schema change and txn may cause dead lock. An example:
    
    Txn T commit but not publish;
    Run schema change or rollup on T's related partition, add alter replica R;
    sc/rollup add a sched txn watermark M;
    Restart fe;
    After fe restart, T's loadedTblIndexes will clear because it's not save to 
disk;
    T will publish version to all tablet, including sc/rollup's new alter 
replica R;
    Since R not contains txn data, so the T will fail. It will then always 
waitting for R's data;
    sc/rollup wait for txn before M to finish, only after that it will let R 
copy history data;
    Since T's not finished, so sc/rollup will always wait, so R will nerver 
copy history data;
    Txn T and sc/rollup will wait each other forever, cause dead lock;
    Fix: because sc/rollup will ensure double write after the sched watermark 
M, so for finish transaction, when checking a alter replica:
    
    if txn id is bigger than M, check it just like a normal replica;
    otherwise skip check this replica, the BE will modify history data later.
---
 be/src/olap/tablet_meta.cpp                        |  22 ++++
 be/src/olap/tablet_meta.h                          |   2 +
 be/src/olap/task/engine_publish_version_task.cpp   |  15 ++-
 .../main/java/org/apache/doris/common/Config.java  |   7 ++
 .../java/org/apache/doris/alter/AlterJobV2.java    |   9 ++
 .../java/org/apache/doris/alter/RollupJobV2.java   |   4 -
 .../org/apache/doris/alter/SchemaChangeJobV2.java  |   3 -
 .../doris/transaction/DatabaseTransactionMgr.java  |  37 +++++-
 .../test_schema_change_concurrent_with_txn.out     |  61 ++++++++++
 .../org/apache/doris/regression/suite/Suite.groovy |   7 +-
 .../doris/regression/suite/SuiteContext.groovy     |  26 ++++
 .../test_schema_change_concurrent_with_txn.groovy  | 135 +++++++++++++++++++++
 12 files changed, 306 insertions(+), 22 deletions(-)

diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index 84160c411e2..76cf14a75ba 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -1074,4 +1074,26 @@ std::shared_ptr<roaring::Roaring> 
DeleteBitmap::get_agg(const BitmapKey& bmk) co
 
 std::atomic<ShardedLRUCache*> DeleteBitmap::AggCache::s_repr {nullptr};
 
+std::string tablet_state_name(TabletState state) {
+    switch (state) {
+    case TABLET_NOTREADY:
+        return "TABLET_NOTREADY";
+
+    case TABLET_RUNNING:
+        return "TABLET_RUNNING";
+
+    case TABLET_TOMBSTONED:
+        return "TABLET_TOMBSTONED";
+
+    case TABLET_STOPPED:
+        return "TABLET_STOPPED";
+
+    case TABLET_SHUTDOWN:
+        return "TABLET_SHUTDOWN";
+
+    default:
+        return "TabletState(" + std::to_string(state) + ")";
+    }
+}
+
 } // namespace doris
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index 622d5c44f98..60cc485882a 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -632,6 +632,8 @@ inline bool TabletMeta::all_beta() const {
     return true;
 }
 
+std::string tablet_state_name(TabletState state);
+
 // Only for unit test now.
 bool operator==(const TabletMeta& a, const TabletMeta& b);
 bool operator!=(const TabletMeta& a, const TabletMeta& b);
diff --git a/be/src/olap/task/engine_publish_version_task.cpp 
b/be/src/olap/task/engine_publish_version_task.cpp
index d914816e0ad..040b5a7edea 100644
--- a/be/src/olap/task/engine_publish_version_task.cpp
+++ b/be/src/olap/task/engine_publish_version_task.cpp
@@ -244,14 +244,17 @@ Status EnginePublishVersionTask::finish() {
                         add_error_tablet_id(tablet_id);
                         if (res.ok()) {
                             res = Status::Error<VERSION_NOT_EXIST>(
-                                    "tablet {} not exists version {}", 
tablet_id,
+                                    "tablet {} with state {} not exists 
version {}", tablet_id,
+                                    tablet_state_name(tablet->tablet_state()),
                                     par_ver_info.version);
                         }
-                        LOG(WARNING) << "publish version failed on 
transaction, tablet version not "
-                                        "exists. "
-                                     << "transaction_id=" << transaction_id
-                                     << ", tablet_id=" << tablet_id
-                                     << ", version=" << par_ver_info.version;
+                        LOG(WARNING)
+                                << "publish version failed on transaction, 
tablet version not "
+                                   "exists. "
+                                << "transaction_id=" << transaction_id
+                                << ", tablet_id=" << tablet_id
+                                << ", tablet_state=" << 
tablet_state_name(tablet->tablet_state())
+                                << ", version=" << par_ver_info.version;
                     }
                 }
             }
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 50c31755a06..2b8e3414038 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -439,6 +439,13 @@ public class Config extends ConfigBase {
             + "then the load task will be successful." })
     public static int publish_wait_time_second = 300;
 
+    @ConfField(mutable = true, masterOnly = true, description = {"导入 Publish 
阶段是否检查正在做 Schema 变更的副本。"
+            + "正常情况下,不要关闭此检查。除非在极端情况下出现导入和 Schema 变更出现互相等待死锁时才临时打开。",
+            "Check the replicas which are doing schema change when publish 
transaction. Do not turn off this check "
+            + " under normal circumstances. It's only temporarily skip check 
if publish version and schema change have"
+            + " dead lock" })
+    public static boolean publish_version_check_alter_replica = true;
+
     @ConfField(mutable = true, masterOnly = true, description = 
{"提交事务的最大超时时间,单位是秒。"
             + "该参数仅用于事务型 insert 操作中。",
             "Maximal waiting time for all data inserted before one transaction 
to be committed, in seconds. "
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java
index fb616fe429e..c1984d31d47 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java
@@ -89,6 +89,11 @@ public abstract class AlterJobV2 implements Writable {
     @SerializedName(value = "rawSql")
     protected String rawSql;
 
+    // The job will wait all transactions before this txn id finished, then 
send the schema_change/rollup tasks.
+    @SerializedName(value = "watershedTxnId")
+    protected long watershedTxnId = -1;
+
+
     public AlterJobV2(String rawSql, long jobId, JobType jobType, long dbId, 
long tableId, String tableName,
                       long timeoutMs) {
         this.rawSql = rawSql;
@@ -135,6 +140,10 @@ public abstract class AlterJobV2 implements Writable {
         return tableName;
     }
 
+    public long getWatershedTxnId() {
+        return watershedTxnId;
+    }
+
     public boolean isTimeout() {
         return System.currentTimeMillis() - createTimeMs > timeoutMs;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
index 6736bd3aa5b..dfc825dbbc1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
@@ -134,10 +134,6 @@ public class RollupJobV2 extends AlterJobV2 implements 
GsonPostProcessable {
     @SerializedName(value = "storageFormat")
     private TStorageFormat storageFormat = TStorageFormat.DEFAULT;
 
-    // The rollup job will wait all transactions before this txn id finished, 
then send the rollup tasks.
-    @SerializedName(value = "watershedTxnId")
-    protected long watershedTxnId = -1;
-
     // save all create rollup tasks
     private AgentBatchTask rollupBatchTask = new AgentBatchTask();
     // save failed task after retry three times, tabletId -> agentTask
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index 60b19e7f368..a65612989ab 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -126,9 +126,6 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
     @SerializedName(value = "indexes")
     private List<Index> indexes = null;
 
-    // The schema change job will wait all transactions before this txn id 
finished, then send the schema change tasks.
-    @SerializedName(value = "watershedTxnId")
-    protected long watershedTxnId = -1;
     @SerializedName(value = "storageFormat")
     private TStorageFormat storageFormat = TStorageFormat.DEFAULT;
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index 42cac0c9690..cac271af4aa 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.transaction;
 
+import org.apache.doris.alter.AlterJobV2;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.DatabaseIf;
 import org.apache.doris.catalog.Env;
@@ -535,6 +536,7 @@ public class DatabaseTransactionMgr {
                     transactionState.prolongPublishTimeout();
                 }
 
+                // (TODO): ignore the alter index if txn id is less than sc 
sched watermark
                 int loadRequiredReplicaNum = 
table.getLoadRequiredReplicaNum(partition.getId());
                 for (MaterializedIndex index : allIndices) {
                     for (Tablet tablet : index.getTablets()) {
@@ -553,6 +555,7 @@ public class DatabaseTransactionMgr {
                                 throw new 
TransactionCommitFailedException("could not find replica for tablet ["
                                         + tabletId + "], backend [" + 
tabletBackend + "]");
                             }
+
                             // if the tablet have no replica's to commit or 
the tablet is a rolling up tablet,
                             // the commit backends maybe null
                             // if the commit backends is null, set all 
replicas as error replicas
@@ -985,6 +988,7 @@ public class DatabaseTransactionMgr {
                     continue;
                 }
 
+                boolean alterReplicaLoadedTxn = 
isAlterReplicaLoadedTxn(transactionId, table);
                 Iterator<PartitionCommitInfo> partitionCommitInfoIterator
                         = 
tableCommitInfo.getIdToPartitionCommitInfo().values().iterator();
                 while (partitionCommitInfoIterator.hasNext()) {
@@ -1037,7 +1041,7 @@ public class DatabaseTransactionMgr {
                             tabletWriteFailedReplicas.clear();
                             tabletVersionFailedReplicas.clear();
                             for (Replica replica : tablet.getReplicas()) {
-                                
checkReplicaContinuousVersionSucc(tablet.getId(), replica,
+                                
checkReplicaContinuousVersionSucc(tablet.getId(), replica, 
alterReplicaLoadedTxn,
                                         partitionCommitInfo.getVersion(), 
publishTasks.get(replica.getBackendId()),
                                         errorReplicaIds, tabletSuccReplicas, 
tabletWriteFailedReplicas,
                                         tabletVersionFailedReplicas);
@@ -1132,8 +1136,24 @@ public class DatabaseTransactionMgr {
         LOG.info("finish transaction {} successfully, publish result: {}", 
transactionState, publishResult.name());
     }
 
-    private void checkReplicaContinuousVersionSucc(long tabletId, Replica 
replica, long version,
-            PublishVersionTask backendPublishTask, Set<Long> errorReplicaIds, 
List<Replica> tabletSuccReplicas,
+    private boolean isAlterReplicaLoadedTxn(long transactionId, OlapTable 
table) {
+        List<AlterJobV2> unfinishedAlterJobs = null;
+        if (table.getState() == OlapTable.OlapTableState.SCHEMA_CHANGE) {
+            unfinishedAlterJobs = 
Env.getCurrentEnv().getAlterInstance().getSchemaChangeHandler()
+                    .getUnfinishedAlterJobV2ByTableId(table.getId());
+        } else if (table.getState() == OlapTable.OlapTableState.ROLLUP) {
+            unfinishedAlterJobs = 
Env.getCurrentEnv().getAlterInstance().getMaterializedViewHandler()
+                    .getUnfinishedAlterJobV2ByTableId(table.getId());
+        } else {
+            return true;
+        }
+
+        return unfinishedAlterJobs.stream().allMatch(job -> transactionId > 
job.getWatershedTxnId());
+    }
+
+    private void checkReplicaContinuousVersionSucc(long tabletId, Replica 
replica, boolean alterReplicaLoadedTxn,
+            long version, PublishVersionTask backendPublishTask,
+            Set<Long> errorReplicaIds, List<Replica> tabletSuccReplicas,
             List<Replica> tabletWriteFailedReplicas, List<Replica> 
tabletVersionFailedReplicas) {
         if (backendPublishTask == null || !backendPublishTask.isFinished()) {
             errorReplicaIds.add(replica.getId());
@@ -1155,6 +1175,17 @@ public class DatabaseTransactionMgr {
             }
         }
 
+        // Schema change and rollup has a sched watermark,
+        // it's ensure that alter replicas will load those txns whose txn id > 
sched watermark.
+        // But for txns before the sched watermark, the alter replicas maynot 
load the txns,
+        // publish will ignore checking them and treat them as success in 
advance.
+        // Later be will fill the alter replicas's history data which before 
sched watermark.
+        // If failed to fill, fe will set the alter replica bad.
+        if (replica.getState() == Replica.ReplicaState.ALTER
+                && (!alterReplicaLoadedTxn || 
!Config.publish_version_check_alter_replica)) {
+            errorReplicaIds.remove(replica.getId());
+        }
+
         if (!errorReplicaIds.contains(replica.getId())) {
             if (replica.checkVersionCatchUp(version - 1, true)) {
                 tabletSuccReplicas.add(replica);
diff --git 
a/regression-test/data/schema_change/test_schema_change_concurrent_with_txn.out 
b/regression-test/data/schema_change/test_schema_change_concurrent_with_txn.out
new file mode 100644
index 00000000000..1f531cb7703
--- /dev/null
+++ 
b/regression-test/data/schema_change/test_schema_change_concurrent_with_txn.out
@@ -0,0 +1,61 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_1_1 --
+1      10
+2      20
+
+-- !select_2_1 --
+1      10
+2      20
+
+-- !select_3_1 --
+1      11      111
+2      22      222
+
+-- !select_4_1 --
+1      10
+2      20
+
+-- !select_1_2 --
+1      10
+2      20
+
+-- !select_2_2 --
+1      10
+2      20
+
+-- !select_3_2 --
+1      11      111
+2      22      222
+
+-- !select_4_2 --
+1      10
+2      20
+
+-- !select_1_3 --
+1      10      -1
+2      20      -1
+3      30      -1
+4      40      -1
+5      50      -1
+
+-- !select_2_3 --
+1      10
+2      20
+3      30
+4      40
+5      50
+
+-- !select_3_3 --
+1      11      111
+2      22      222
+3      33      333
+4      44      444
+5      55      555
+
+-- !select_4_3 --
+10     1
+20     2
+30     3
+40     4
+50     5
+
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
index 0c02028c3e7..c9e8fe4215e 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
@@ -168,12 +168,7 @@ class Suite implements GroovyInterceptable {
             try {
                 Thread.currentThread().setName(threadName == null ? 
originThreadName : threadName)
                 if (connInfo != null) {
-                    def newConnInfo = new ConnectionInfo()
-                    newConnInfo.conn = 
DriverManager.getConnection(connInfo.conn.getMetaData().getURL(),
-                            connInfo.username, connInfo.password)
-                    newConnInfo.username = connInfo.username
-                    newConnInfo.password = connInfo.password
-                    context.threadLocalConn.set(newConnInfo)
+                    context.connectTo(connInfo.conn.getMetaData().getURL(), 
connInfo.username, connInfo.password);
                 }
                 context.scriptContext.eventListeners.each { 
it.onThreadStarted(context) }
 
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteContext.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteContext.groovy
index d31364eb0ef..31eed478399 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteContext.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteContext.groovy
@@ -250,6 +250,32 @@ class SuiteContext implements Closeable {
         }
     }
 
+    public void reconnectFe() {
+        ConnectionInfo connInfo = threadLocalConn.get()
+        if (connInfo == null) {
+            return
+        }
+        connectTo(connInfo.conn.getMetaData().getURL(), connInfo.username, 
connInfo.password);
+    }
+
+    public void connectTo(String url, String username, String password) {
+        ConnectionInfo oldConn = threadLocalConn.get()
+        if (oldConn != null) {
+            threadLocalConn.remove()
+            try {
+                oldConn.conn.close()
+            } catch (Throwable t) {
+                log.warn("Close connection failed", t)
+            }
+        }
+
+        def newConnInfo = new ConnectionInfo()
+        newConnInfo.conn = DriverManager.getConnection(url, username, password)
+        newConnInfo.username = username
+        newConnInfo.password = password
+        threadLocalConn.set(newConnInfo)
+    }
+
     OutputUtils.OutputBlocksIterator getOutputIterator() {
         def outputIt = threadLocalOutputIterator.get()
         if (outputIt == null) {
diff --git 
a/regression-test/suites/schema_change/test_schema_change_concurrent_with_txn.groovy
 
b/regression-test/suites/schema_change/test_schema_change_concurrent_with_txn.groovy
new file mode 100644
index 00000000000..4e2f717ed46
--- /dev/null
+++ 
b/regression-test/suites/schema_change/test_schema_change_concurrent_with_txn.groovy
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.doris.regression.util.NodeType
+
+suite('test_schema_change_concurrent_with_txn') {
+    def options = new ClusterOptions()
+    options.enableDebugPoints()
+    options.feConfigs.add('publish_wait_time_second=-1')
+    docker(options) {
+        sql 'SET GLOBAL insert_visible_timeout_ms = 2000'
+
+        def result = sql 'SELECT DATABASE()'
+        def dbName = result[0][0]
+
+        sql 'CREATE TABLE tbl_1 (k1 INT, k2 INT) PROPERTIES ( 
"light_schema_change" = "false")'
+        sql 'INSERT INTO tbl_1 VALUES (1, 10)'
+        sql 'INSERT INTO tbl_1 VALUES (2, 20)'
+        order_qt_select_1_1 'SELECT * FROM tbl_1'
+
+        sql 'CREATE TABLE tbl_2 AS SELECT * FROM tbl_1'
+        order_qt_select_2_1 'SELECT * FROM tbl_2'
+
+        sql 'CREATE TABLE tbl_3 (k1 INT, k2 INT, v INT SUM) AGGREGATE KEY (k1, 
k2)'
+        sql 'INSERT INTO tbl_3 VALUES (1, 11, 111)'
+        sql 'INSERT INTO tbl_3 VALUES (2, 22, 222)'
+        order_qt_select_3_1 'SELECT * FROM tbl_3'
+
+        sql 'CREATE TABLE tbl_4 (k1 INT, k2 INT)'
+        sql 'INSERT INTO tbl_4 VALUES (1, 10)'
+        sql 'INSERT INTO tbl_4 VALUES (2, 20)'
+        order_qt_select_4_1 'SELECT * FROM tbl_4'
+
+        // stop publish, insert succ, txn is commit but not visible
+        cluster.injectDebugPoints(NodeType.FE, 
['PublishVersionDaemon.stop_publish':null])
+
+        sql 'INSERT INTO tbl_1 VALUES (3, 30)'
+        sql 'INSERT INTO tbl_1 VALUES (4, 40)'
+        order_qt_select_1_2 'SELECT * FROM tbl_1'
+
+        sql 'INSERT INTO tbl_2 VALUES (3, 30)'
+        sql 'INSERT INTO tbl_2 VALUES (4, 40)'
+        order_qt_select_2_2 'SELECT * FROM tbl_2'
+
+        sql 'INSERT INTO tbl_3 VALUES (3, 33, 333)'
+        sql 'INSERT INTO tbl_3 VALUES (4, 44, 444)'
+        order_qt_select_3_2 'SELECT * FROM tbl_3'
+
+        sql 'INSERT INTO tbl_4 VALUES (3, 30)'
+        sql 'INSERT INTO tbl_4 VALUES (4, 40)'
+        order_qt_select_4_2 'SELECT * FROM tbl_4'
+
+        result = sql 'SHOW PROC "/transactions"'
+        def runningTxn = result.find({ it[1].indexOf(dbName) > 0 })[2]
+        assertEquals(8, runningTxn as int)
+
+        sql "ALTER TABLE tbl_1 ADD COLUMN k3 INT DEFAULT '-1'"
+        sql 'CREATE MATERIALIZED VIEW tbl_2_mv AS SELECT k1, k1 + k2 FROM 
tbl_2'
+        sql 'ALTER TABLE tbl_3 ADD ROLLUP tbl_3_r1(k1, v)'
+        sql 'ALTER TABLE tbl_4 ORDER BY (k2, k1)'
+
+        sleep(5000)
+
+        def jobs = null
+        def scJobState = job -> job[9]
+        def rollupJobState = job -> job[8]
+
+        jobs = sql "SHOW ALTER TABLE COLUMN WHERE TableName = 'tbl_1'"
+        assertEquals(1, jobs.size())
+        assertEquals('WAITING_TXN', scJobState(jobs[0]))
+
+        jobs = sql "SHOW ALTER TABLE MATERIALIZED VIEW WHERE TableName = 
'tbl_2'"
+        assertEquals(1, jobs.size())
+        assertEquals('WAITING_TXN', rollupJobState(jobs[0]))
+
+        jobs = sql "SHOW ALTER TABLE ROLLUP WHERE TableName = 'tbl_3'"
+        assertEquals(1, jobs.size())
+        assertEquals('WAITING_TXN', rollupJobState(jobs[0]))
+
+        jobs = sql "SHOW ALTER TABLE COLUMN WHERE TableName = 'tbl_4'"
+        assertEquals(1, jobs.size())
+        assertEquals('WAITING_TXN', scJobState(jobs[0]))
+
+        sql 'INSERT INTO tbl_1(k1, k2) VALUES (5, 50)'
+        sql 'INSERT INTO tbl_2 VALUES (5, 50)'
+        sql 'INSERT INTO tbl_3 VALUES (5, 55, 555)'
+        sql 'INSERT INTO tbl_4(k1, k2) VALUES (5, 50)'
+
+        // After fe restart, transaction's loadedTblIndexes will clear,
+        // then fe will send publish task to all indexes.
+        // But the alter index  may add after commit txn, then publish will 
failed.
+        cluster.restartFrontends()
+        sleep(30000)
+        context.reconnectFe()
+
+        //cluster.clearFrontendDebugPoints()
+
+        // should publish visible
+        order_qt_select_1_3 'SELECT * FROM tbl_1'
+        order_qt_select_2_3 'SELECT * FROM tbl_2'
+        order_qt_select_3_3 'SELECT * FROM tbl_3'
+        order_qt_select_4_3 'SELECT * FROM tbl_4'
+
+        jobs = sql "SHOW ALTER TABLE COLUMN WHERE TableName = 'tbl_1'"
+        assertEquals(1, jobs.size())
+        assertEquals('FINISHED', scJobState(jobs[0]))
+
+        jobs = sql "SHOW ALTER TABLE MATERIALIZED VIEW WHERE TableName = 
'tbl_2'"
+        assertEquals(1, jobs.size())
+        assertEquals('FINISHED', rollupJobState(jobs[0]))
+
+        jobs = sql "SHOW ALTER TABLE ROLLUP WHERE TableName = 'tbl_3'"
+        assertEquals(1, jobs.size())
+        assertEquals('FINISHED', rollupJobState(jobs[0]))
+
+        jobs = sql "SHOW ALTER TABLE COLUMN WHERE TableName = 'tbl_4'"
+        assertEquals(1, jobs.size())
+        assertEquals('FINISHED', scJobState(jobs[0]))
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to