This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 2af313eecc3 [improvement](build index)Make build index and clone 
mutually exclusive and add timeout for index change job (#35724) (#36296)
2af313eecc3 is described below

commit 2af313eecc3eb52365530b6418c4658bdd431e17
Author: qiye <[email protected]>
AuthorDate: Sun Jun 16 09:35:27 2024 +0800

    [improvement](build index)Make build index and clone mutually exclusive and 
add timeout for index change job (#35724) (#36296)
---
 .../org/apache/doris/alter/IndexChangeJob.java     | 75 ++++++++++++++++-
 .../apache/doris/alter/SchemaChangeHandler.java    |  3 +-
 .../test_build_index_with_clone_fault.groovy       | 93 +++++++++++++++++++++
 .../test_build_index_with_clone_by_docker.groovy   | 94 ++++++++++++++++++++++
 4 files changed, 262 insertions(+), 3 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/IndexChangeJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/IndexChangeJob.java
index e6f4c4e0a0e..70ce1147516 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/IndexChangeJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/IndexChangeJob.java
@@ -37,6 +37,7 @@ import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.task.AgentBatchTask;
+import org.apache.doris.task.AgentTask;
 import org.apache.doris.task.AgentTaskExecutor;
 import org.apache.doris.task.AgentTaskQueue;
 import org.apache.doris.task.AlterInvertedIndexTask;
@@ -46,6 +47,7 @@ import org.apache.doris.thrift.TTaskType;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.gson.annotations.SerializedName;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -54,6 +56,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 
 
 public class IndexChangeJob implements Writable {
@@ -106,6 +109,11 @@ public class IndexChangeJob implements Writable {
     private long originIndexId;
     @SerializedName(value = "invertedIndexBatchTask")
     AgentBatchTask invertedIndexBatchTask = new AgentBatchTask();
+    // save failed task after retry three times, tablet -> backends
+    @SerializedName(value = "failedTabletBackends")
+    protected Map<Long, List<Long>> failedTabletBackends = Maps.newHashMap();
+    @SerializedName(value = "timeoutMs")
+    protected long timeoutMs = -1;
 
     public IndexChangeJob() {
         this.jobId = -1;
@@ -117,7 +125,7 @@ public class IndexChangeJob implements Writable {
         this.jobState = JobState.WAITING_TXN;
     }
 
-    public IndexChangeJob(long jobId, long dbId, long tableId, String 
tableName) {
+    public IndexChangeJob(long jobId, long dbId, long tableId, String 
tableName, long timeoutMs) {
         this.jobId = jobId;
         this.dbId = dbId;
         this.tableId = tableId;
@@ -127,6 +135,7 @@ public class IndexChangeJob implements Writable {
         this.jobState = JobState.WAITING_TXN;
         this.watershedTxnId = Env.getCurrentGlobalTransactionMgr()
                         .getTransactionIDGenerator().getNextTransactionId();
+        this.timeoutMs = timeoutMs;
     }
 
     public long getJobId() {
@@ -207,6 +216,10 @@ public class IndexChangeJob implements Writable {
         this.finishedTimeMs = finishedTimeMs;
     }
 
+    public boolean isTimeout() {
+        return System.currentTimeMillis() - createTimeMs > timeoutMs;
+    }
+
     /**
      * The keyword 'synchronized' only protects 2 methods:
      * run() and cancel()
@@ -218,6 +231,10 @@ public class IndexChangeJob implements Writable {
      *      db lock
      */
     public synchronized void run() {
+        if (isTimeout()) {
+            cancelImpl("Timeout");
+            return;
+        }
         try {
             switch (jobState) {
                 case WAITING_TXN:
@@ -238,6 +255,31 @@ public class IndexChangeJob implements Writable {
         return cancelImpl(errMsg);
     }
 
+    /**
+     * should be called before executing the job.
+     * return false if table is not stable.
+     */
+    protected boolean checkTableStable(OlapTable tbl, String clusterName) 
throws AlterCancelException {
+        tbl.writeLockOrAlterCancelException();
+        try {
+            boolean isStable = tbl.isStable(Env.getCurrentSystemInfo(),
+                    Env.getCurrentEnv().getTabletScheduler(), clusterName);
+
+            if (!isStable) {
+                errMsg = "table is unstable";
+                LOG.warn("wait table {} to be stable before doing index change 
job", tableId);
+                return false;
+            } else {
+                // table is stable
+                LOG.info("table {} is stable, start index change job {}", 
tableId, jobId);
+                errMsg = "";
+                return true;
+            }
+        } finally {
+            tbl.writeUnlock();
+        }
+    }
+
     // Check whether transactions of the given database which txnId is less 
than 'watershedTxnId' are finished.
     protected boolean isPreviousLoadFinished() throws AnalysisException {
         return 
Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(
@@ -265,6 +307,9 @@ public class IndexChangeJob implements Writable {
         } catch (MetaNotFoundException e) {
             throw new AlterCancelException(e.getMessage());
         }
+        if (!checkTableStable(olapTable, db.getClusterName())) {
+            return;
+        }
 
         olapTable.readLock();
         try {
@@ -308,10 +353,36 @@ public class IndexChangeJob implements Writable {
 
     protected void runRunningJob() throws AlterCancelException {
         Preconditions.checkState(jobState == JobState.RUNNING, jobState);
+        // must check if db or table still exist first.
+        // or if table is dropped, the tasks will never be finished,
+        // and the job will be in RUNNING state forever.
+        Database db = Env.getCurrentInternalCatalog()
+                .getDbOrException(dbId, s -> new 
AlterCancelException("Database " + s + " does not exist"));
+        OlapTable tbl;
+        try {
+            tbl = (OlapTable) db.getTableOrMetaException(tableId, 
TableType.OLAP);
+        } catch (MetaNotFoundException e) {
+            throw new AlterCancelException(e.getMessage());
+        }
 
         if (!invertedIndexBatchTask.isFinished()) {
             LOG.info("inverted index tasks not finished. job: {}, partitionId: 
{}", jobId, partitionId);
-            // TODO: task failed limit
+            List<AgentTask> tasks = 
invertedIndexBatchTask.getUnfinishedTasks(2000);
+            for (AgentTask task : tasks) {
+                if (task.getFailedTimes() > 3) {
+                    LOG.warn("alter inverted index task failed: " + 
task.getErrorMsg());
+                    List<Long> failedBackends = 
failedTabletBackends.computeIfAbsent(task.getTabletId(),
+                            k -> Lists.newArrayList());
+                    failedBackends.add(task.getBackendId());
+                    int expectSucceedTaskNum = tbl.getPartitionInfo()
+                            
.getReplicaAllocation(task.getPartitionId()).getTotalReplicaNum();
+                    int failedTaskCount = failedBackends.size();
+                    if (expectSucceedTaskNum - failedTaskCount < 
expectSucceedTaskNum / 2 + 1) {
+                        throw new AlterCancelException("inverted index tasks 
failed on same tablet reach threshold "
+                            + failedTaskCount);
+                    }
+                }
+            }
             return;
         }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
index 0928927885b..976f837a311 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
@@ -2889,13 +2889,14 @@ public class SchemaChangeHandler extends AlterHandler {
             throw new DdlException("Nothing is changed. please check your 
alter stmt.");
         }
 
+        long timeoutSecond = Config.alter_table_timeout_second;
         for (Map.Entry<Long, List<Column>> entry : 
changedIndexIdToSchema.entrySet()) {
             long originIndexId = entry.getKey();
             for (Partition partition : olapTable.getPartitions()) {
                 // create job
                 long jobId = Env.getCurrentEnv().getNextId();
                 IndexChangeJob indexChangeJob = new IndexChangeJob(
-                        jobId, db.getId(), olapTable.getId(), 
olapTable.getName());
+                        jobId, db.getId(), olapTable.getId(), 
olapTable.getName(), timeoutSecond * 1000);
                 indexChangeJob.setOriginIndexId(originIndexId);
                 indexChangeJob.setAlterInvertedIndexInfo(isDropOp, 
alterIndexes);
                 long partitionId = partition.getId();
diff --git 
a/regression-test/suites/fault_injection_p0/test_build_index_with_clone_fault.groovy
 
b/regression-test/suites/fault_injection_p0/test_build_index_with_clone_fault.groovy
new file mode 100644
index 00000000000..bfbf5a4896e
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/test_build_index_with_clone_fault.groovy
@@ -0,0 +1,93 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_build_index_with_clone_fault_injection", "nonConcurrent"){
+    def backends = sql_return_maparray('show backends')
+    // if backens is less than 2, skip this case
+    if (backends.size() < 2) {
+        return
+    }
+    def timeout = 300000
+    def delta_time = 1000
+    def alter_res = "null"
+    def useTime = 0
+
+    def wait_for_last_build_index_on_table_finish = { table_name, OpTimeout ->
+        for(int t = delta_time; t <= OpTimeout; t += delta_time){
+            alter_res = sql """SHOW BUILD INDEX WHERE TableName = 
"${table_name}" ORDER BY JobId """
+
+            if (alter_res.size() == 0) {
+                logger.info(table_name + " last index job finished")
+                return "SKIPPED"
+            }
+            if (alter_res.size() > 0) {
+                def last_job_state = alter_res[alter_res.size()-1][7];
+                if (last_job_state == "FINISHED" || last_job_state == 
"CANCELLED") {
+                    sleep(10000) // wait change table state to normal
+                    logger.info(table_name + " last index job finished, state: 
" + last_job_state + ", detail: " + alter_res)
+                    return last_job_state;
+                }
+            }
+            useTime = t
+            sleep(delta_time)
+        }
+        logger.info("wait_for_last_build_index_on_table_finish debug: " + 
alter_res)
+        assertTrue(useTime <= OpTimeout, 
"wait_for_last_build_index_on_table_finish timeout")
+        return "wait_timeout"
+    }
+
+    def tbl = 'test_build_index_with_clone'
+    try {
+        GetDebugPoint().enableDebugPointForAllBEs("EngineCloneTask.wait_clone")
+        logger.info("add debug point EngineCloneTask.wait_clone")
+        sql """ DROP TABLE IF EXISTS ${tbl} """
+        sql """
+            CREATE TABLE ${tbl} (
+            `k1` int(11) NULL,
+            `k2` int(11) NULL
+            )
+            DUPLICATE KEY(`k1`, `k2`)
+            COMMENT 'OLAP'
+            DISTRIBUTED BY HASH(`k1`) BUCKETS 1
+            PROPERTIES ("replication_num" = "1")
+            """
+        for (def i = 1; i <= 5; i++) {
+            sql "INSERT INTO ${tbl} VALUES (${i}, ${10 * i})"
+        }
+
+        sql """ sync """
+
+        // get tablets and set replica status to DROP
+        def tablet = sql_return_maparray("show tablets from ${tbl}")[0]
+        sql """
+            ADMIN SET REPLICA STATUS PROPERTIES("tablet_id" = 
"${tablet.TabletId}", "backend_id" = "${tablet.BackendId}", "status" = "drop");
+            """
+        // create index on table 
+        sql """ create index idx_k2 on ${tbl}(k2) using inverted """
+        sql """ build index idx_k2 on ${tbl} """
+        // sleep 5s to wait for the build index job report table is unstable
+        sleep(5000)
+        def show_build_index = sql_return_maparray("show build index where 
TableName = \"${tbl}\" ORDER BY JobId DESC LIMIT 1")
+        assertEquals('WAITING_TXN', show_build_index[0].State)
+        assertEquals('table is unstable', show_build_index[0].Msg)
+
+        def state = wait_for_last_build_index_on_table_finish(tbl, timeout)
+        assertEquals(state, "FINISHED")
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("EngineCloneTask.wait_clone")
+    }
+}
diff --git 
a/regression-test/suites/inverted_index_p0/test_build_index_with_clone_by_docker.groovy
 
b/regression-test/suites/inverted_index_p0/test_build_index_with_clone_by_docker.groovy
new file mode 100644
index 00000000000..9d30ca30c0c
--- /dev/null
+++ 
b/regression-test/suites/inverted_index_p0/test_build_index_with_clone_by_docker.groovy
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.doris.regression.util.NodeType
+import org.apache.doris.regression.suite.SuiteCluster
+
+suite("test_build_index_with_clone_by_docker"){
+    def timeout = 300000
+    def delta_time = 1000
+    def alter_res = "null"
+    def useTime = 0
+
+    def wait_for_last_build_index_on_table_finish = { table_name, OpTimeout ->
+        for(int t = delta_time; t <= OpTimeout; t += delta_time){
+            alter_res = sql """SHOW BUILD INDEX WHERE TableName = 
"${table_name}" ORDER BY JobId """
+
+            if (alter_res.size() == 0) {
+                logger.info(table_name + " last index job finished")
+                return "SKIPPED"
+            }
+            if (alter_res.size() > 0) {
+                def last_job_state = alter_res[alter_res.size()-1][7];
+                if (last_job_state == "FINISHED" || last_job_state == 
"CANCELLED") {
+                    sleep(10000) // wait change table state to normal
+                    logger.info(table_name + " last index job finished, state: 
" + last_job_state + ", detail: " + alter_res)
+                    return last_job_state;
+                }
+            }
+            useTime = t
+            sleep(delta_time)
+        }
+        logger.info("wait_for_last_build_index_on_table_finish debug: " + 
alter_res)
+        assertTrue(useTime <= OpTimeout, 
"wait_for_last_build_index_on_table_finish timeout")
+        return "wait_timeout"
+    }
+
+    def options = new ClusterOptions()
+    options.enableDebugPoints()
+    options.setFeNum(1)
+    options.setBeNum(3)
+    options.cloudMode = false
+    def tbl = 'test_build_index_with_clone_by_docker'
+    docker(options) {
+        cluster.injectDebugPoints(NodeType.BE, ['EngineCloneTask.wait_clone' : 
null])
+        sql """ DROP TABLE IF EXISTS ${tbl} """
+        sql """
+            CREATE TABLE ${tbl} (
+            `k1` int(11) NULL,
+            `k2` int(11) NULL
+            )
+            DUPLICATE KEY(`k1`, `k2`)
+            COMMENT 'OLAP'
+            DISTRIBUTED BY HASH(`k1`) BUCKETS 1
+            PROPERTIES ("replication_num" = "1")
+            """
+        for (def i = 1; i <= 5; i++) {
+            sql "INSERT INTO ${tbl} VALUES (${i}, ${10 * i})"
+        }
+
+        sql """ sync """
+
+        // get tablets and set replica status to DROP
+        def tablet = sql_return_maparray("show tablets from ${tbl}")[0]
+        sql """
+            ADMIN SET REPLICA STATUS PROPERTIES("tablet_id" = 
"${tablet.TabletId}", "backend_id" = "${tablet.BackendId}", "status" = "drop");
+            """
+        // create index on table 
+        sql """ create index idx_k2 on ${tbl}(k2) using inverted """
+        sql """ build index idx_k2 on ${tbl} """
+        // sleep 5s to wait for the build index job report table is unstable
+        sleep(5000)
+        def show_build_index = sql_return_maparray("show build index where 
TableName = \"${tbl}\" ORDER BY JobId DESC LIMIT 1")
+        assertEquals('WAITING_TXN', show_build_index[0].State)
+        assertEquals('table is unstable', show_build_index[0].Msg)
+
+        def state = wait_for_last_build_index_on_table_finish(tbl, timeout)
+        assertEquals(state, "FINISHED")
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to