This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 2af313eecc3 [improvement](build index)Make build index and clone
mutually exclusive and add timeout for index change job (#35724) (#36296)
2af313eecc3 is described below
commit 2af313eecc3eb52365530b6418c4658bdd431e17
Author: qiye <[email protected]>
AuthorDate: Sun Jun 16 09:35:27 2024 +0800
[improvement](build index)Make build index and clone mutually exclusive and
add timeout for index change job (#35724) (#36296)
---
.../org/apache/doris/alter/IndexChangeJob.java | 75 ++++++++++++++++-
.../apache/doris/alter/SchemaChangeHandler.java | 3 +-
.../test_build_index_with_clone_fault.groovy | 93 +++++++++++++++++++++
.../test_build_index_with_clone_by_docker.groovy | 94 ++++++++++++++++++++++
4 files changed, 262 insertions(+), 3 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/IndexChangeJob.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/IndexChangeJob.java
index e6f4c4e0a0e..70ce1147516 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/IndexChangeJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/IndexChangeJob.java
@@ -37,6 +37,7 @@ import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.task.AgentBatchTask;
+import org.apache.doris.task.AgentTask;
import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.AlterInvertedIndexTask;
@@ -46,6 +47,7 @@ import org.apache.doris.thrift.TTaskType;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -54,6 +56,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
public class IndexChangeJob implements Writable {
@@ -106,6 +109,11 @@ public class IndexChangeJob implements Writable {
private long originIndexId;
@SerializedName(value = "invertedIndexBatchTask")
AgentBatchTask invertedIndexBatchTask = new AgentBatchTask();
+ // save failed task after retry three times, tablet -> backends
+ @SerializedName(value = "failedTabletBackends")
+ protected Map<Long, List<Long>> failedTabletBackends = Maps.newHashMap();
+ @SerializedName(value = "timeoutMs")
+ protected long timeoutMs = -1;
public IndexChangeJob() {
this.jobId = -1;
@@ -117,7 +125,7 @@ public class IndexChangeJob implements Writable {
this.jobState = JobState.WAITING_TXN;
}
- public IndexChangeJob(long jobId, long dbId, long tableId, String
tableName) {
+ public IndexChangeJob(long jobId, long dbId, long tableId, String
tableName, long timeoutMs) {
this.jobId = jobId;
this.dbId = dbId;
this.tableId = tableId;
@@ -127,6 +135,7 @@ public class IndexChangeJob implements Writable {
this.jobState = JobState.WAITING_TXN;
this.watershedTxnId = Env.getCurrentGlobalTransactionMgr()
.getTransactionIDGenerator().getNextTransactionId();
+ this.timeoutMs = timeoutMs;
}
public long getJobId() {
@@ -207,6 +216,10 @@ public class IndexChangeJob implements Writable {
this.finishedTimeMs = finishedTimeMs;
}
+ public boolean isTimeout() {
+ return System.currentTimeMillis() - createTimeMs > timeoutMs;
+ }
+
/**
* The keyword 'synchronized' only protects 2 methods:
* run() and cancel()
@@ -218,6 +231,10 @@ public class IndexChangeJob implements Writable {
* db lock
*/
public synchronized void run() {
+ if (isTimeout()) {
+ cancelImpl("Timeout");
+ return;
+ }
try {
switch (jobState) {
case WAITING_TXN:
@@ -238,6 +255,31 @@ public class IndexChangeJob implements Writable {
return cancelImpl(errMsg);
}
+ /**
+ * should be called before executing the job.
+ * return false if table is not stable.
+ */
+ protected boolean checkTableStable(OlapTable tbl, String clusterName)
throws AlterCancelException {
+ tbl.writeLockOrAlterCancelException();
+ try {
+ boolean isStable = tbl.isStable(Env.getCurrentSystemInfo(),
+ Env.getCurrentEnv().getTabletScheduler(), clusterName);
+
+ if (!isStable) {
+ errMsg = "table is unstable";
+ LOG.warn("wait table {} to be stable before doing index change
job", tableId);
+ return false;
+ } else {
+ // table is stable
+ LOG.info("table {} is stable, start index change job {}",
tableId, jobId);
+ errMsg = "";
+ return true;
+ }
+ } finally {
+ tbl.writeUnlock();
+ }
+ }
+
// Check whether transactions of the given database which txnId is less
than 'watershedTxnId' are finished.
protected boolean isPreviousLoadFinished() throws AnalysisException {
return
Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(
@@ -265,6 +307,9 @@ public class IndexChangeJob implements Writable {
} catch (MetaNotFoundException e) {
throw new AlterCancelException(e.getMessage());
}
+ if (!checkTableStable(olapTable, db.getClusterName())) {
+ return;
+ }
olapTable.readLock();
try {
@@ -308,10 +353,36 @@ public class IndexChangeJob implements Writable {
protected void runRunningJob() throws AlterCancelException {
Preconditions.checkState(jobState == JobState.RUNNING, jobState);
+ // must check if db or table still exist first.
+ // or if table is dropped, the tasks will never be finished,
+ // and the job will be in RUNNING state forever.
+ Database db = Env.getCurrentInternalCatalog()
+ .getDbOrException(dbId, s -> new
AlterCancelException("Database " + s + " does not exist"));
+ OlapTable tbl;
+ try {
+ tbl = (OlapTable) db.getTableOrMetaException(tableId,
TableType.OLAP);
+ } catch (MetaNotFoundException e) {
+ throw new AlterCancelException(e.getMessage());
+ }
if (!invertedIndexBatchTask.isFinished()) {
LOG.info("inverted index tasks not finished. job: {}, partitionId:
{}", jobId, partitionId);
- // TODO: task failed limit
+ List<AgentTask> tasks =
invertedIndexBatchTask.getUnfinishedTasks(2000);
+ for (AgentTask task : tasks) {
+ if (task.getFailedTimes() > 3) {
+ LOG.warn("alter inverted index task failed: " +
task.getErrorMsg());
+ List<Long> failedBackends =
failedTabletBackends.computeIfAbsent(task.getTabletId(),
+ k -> Lists.newArrayList());
+ failedBackends.add(task.getBackendId());
+ int expectSucceedTaskNum = tbl.getPartitionInfo()
+
.getReplicaAllocation(task.getPartitionId()).getTotalReplicaNum();
+ int failedTaskCount = failedBackends.size();
+ if (expectSucceedTaskNum - failedTaskCount <
expectSucceedTaskNum / 2 + 1) {
+ throw new AlterCancelException("inverted index tasks
failed on same tablet reach threshold "
+ + failedTaskCount);
+ }
+ }
+ }
return;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
index 0928927885b..976f837a311 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
@@ -2889,13 +2889,14 @@ public class SchemaChangeHandler extends AlterHandler {
throw new DdlException("Nothing is changed. please check your
alter stmt.");
}
+ long timeoutSecond = Config.alter_table_timeout_second;
for (Map.Entry<Long, List<Column>> entry :
changedIndexIdToSchema.entrySet()) {
long originIndexId = entry.getKey();
for (Partition partition : olapTable.getPartitions()) {
// create job
long jobId = Env.getCurrentEnv().getNextId();
IndexChangeJob indexChangeJob = new IndexChangeJob(
- jobId, db.getId(), olapTable.getId(),
olapTable.getName());
+ jobId, db.getId(), olapTable.getId(),
olapTable.getName(), timeoutSecond * 1000);
indexChangeJob.setOriginIndexId(originIndexId);
indexChangeJob.setAlterInvertedIndexInfo(isDropOp,
alterIndexes);
long partitionId = partition.getId();
diff --git
a/regression-test/suites/fault_injection_p0/test_build_index_with_clone_fault.groovy
b/regression-test/suites/fault_injection_p0/test_build_index_with_clone_fault.groovy
new file mode 100644
index 00000000000..bfbf5a4896e
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/test_build_index_with_clone_fault.groovy
@@ -0,0 +1,93 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_build_index_with_clone_fault_injection", "nonConcurrent"){
+ def backends = sql_return_maparray('show backends')
+ // if backens is less than 2, skip this case
+ if (backends.size() < 2) {
+ return
+ }
+ def timeout = 300000
+ def delta_time = 1000
+ def alter_res = "null"
+ def useTime = 0
+
+ def wait_for_last_build_index_on_table_finish = { table_name, OpTimeout ->
+ for(int t = delta_time; t <= OpTimeout; t += delta_time){
+ alter_res = sql """SHOW BUILD INDEX WHERE TableName =
"${table_name}" ORDER BY JobId """
+
+ if (alter_res.size() == 0) {
+ logger.info(table_name + " last index job finished")
+ return "SKIPPED"
+ }
+ if (alter_res.size() > 0) {
+ def last_job_state = alter_res[alter_res.size()-1][7];
+ if (last_job_state == "FINISHED" || last_job_state ==
"CANCELLED") {
+ sleep(10000) // wait change table state to normal
+ logger.info(table_name + " last index job finished, state:
" + last_job_state + ", detail: " + alter_res)
+ return last_job_state;
+ }
+ }
+ useTime = t
+ sleep(delta_time)
+ }
+ logger.info("wait_for_last_build_index_on_table_finish debug: " +
alter_res)
+ assertTrue(useTime <= OpTimeout,
"wait_for_last_build_index_on_table_finish timeout")
+ return "wait_timeout"
+ }
+
+ def tbl = 'test_build_index_with_clone'
+ try {
+ GetDebugPoint().enableDebugPointForAllBEs("EngineCloneTask.wait_clone")
+ logger.info("add debug point EngineCloneTask.wait_clone")
+ sql """ DROP TABLE IF EXISTS ${tbl} """
+ sql """
+ CREATE TABLE ${tbl} (
+ `k1` int(11) NULL,
+ `k2` int(11) NULL
+ )
+ DUPLICATE KEY(`k1`, `k2`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 1
+ PROPERTIES ("replication_num" = "1")
+ """
+ for (def i = 1; i <= 5; i++) {
+ sql "INSERT INTO ${tbl} VALUES (${i}, ${10 * i})"
+ }
+
+ sql """ sync """
+
+ // get tablets and set replica status to DROP
+ def tablet = sql_return_maparray("show tablets from ${tbl}")[0]
+ sql """
+ ADMIN SET REPLICA STATUS PROPERTIES("tablet_id" =
"${tablet.TabletId}", "backend_id" = "${tablet.BackendId}", "status" = "drop");
+ """
+ // create index on table
+ sql """ create index idx_k2 on ${tbl}(k2) using inverted """
+ sql """ build index idx_k2 on ${tbl} """
+ // sleep 5s to wait for the build index job report table is unstable
+ sleep(5000)
+ def show_build_index = sql_return_maparray("show build index where
TableName = \"${tbl}\" ORDER BY JobId DESC LIMIT 1")
+ assertEquals('WAITING_TXN', show_build_index[0].State)
+ assertEquals('table is unstable', show_build_index[0].Msg)
+
+ def state = wait_for_last_build_index_on_table_finish(tbl, timeout)
+ assertEquals(state, "FINISHED")
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("EngineCloneTask.wait_clone")
+ }
+}
diff --git
a/regression-test/suites/inverted_index_p0/test_build_index_with_clone_by_docker.groovy
b/regression-test/suites/inverted_index_p0/test_build_index_with_clone_by_docker.groovy
new file mode 100644
index 00000000000..9d30ca30c0c
--- /dev/null
+++
b/regression-test/suites/inverted_index_p0/test_build_index_with_clone_by_docker.groovy
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.doris.regression.util.NodeType
+import org.apache.doris.regression.suite.SuiteCluster
+
+suite("test_build_index_with_clone_by_docker"){
+ def timeout = 300000
+ def delta_time = 1000
+ def alter_res = "null"
+ def useTime = 0
+
+ def wait_for_last_build_index_on_table_finish = { table_name, OpTimeout ->
+ for(int t = delta_time; t <= OpTimeout; t += delta_time){
+ alter_res = sql """SHOW BUILD INDEX WHERE TableName =
"${table_name}" ORDER BY JobId """
+
+ if (alter_res.size() == 0) {
+ logger.info(table_name + " last index job finished")
+ return "SKIPPED"
+ }
+ if (alter_res.size() > 0) {
+ def last_job_state = alter_res[alter_res.size()-1][7];
+ if (last_job_state == "FINISHED" || last_job_state ==
"CANCELLED") {
+ sleep(10000) // wait change table state to normal
+ logger.info(table_name + " last index job finished, state:
" + last_job_state + ", detail: " + alter_res)
+ return last_job_state;
+ }
+ }
+ useTime = t
+ sleep(delta_time)
+ }
+ logger.info("wait_for_last_build_index_on_table_finish debug: " +
alter_res)
+ assertTrue(useTime <= OpTimeout,
"wait_for_last_build_index_on_table_finish timeout")
+ return "wait_timeout"
+ }
+
+ def options = new ClusterOptions()
+ options.enableDebugPoints()
+ options.setFeNum(1)
+ options.setBeNum(3)
+ options.cloudMode = false
+ def tbl = 'test_build_index_with_clone_by_docker'
+ docker(options) {
+ cluster.injectDebugPoints(NodeType.BE, ['EngineCloneTask.wait_clone' :
null])
+ sql """ DROP TABLE IF EXISTS ${tbl} """
+ sql """
+ CREATE TABLE ${tbl} (
+ `k1` int(11) NULL,
+ `k2` int(11) NULL
+ )
+ DUPLICATE KEY(`k1`, `k2`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 1
+ PROPERTIES ("replication_num" = "1")
+ """
+ for (def i = 1; i <= 5; i++) {
+ sql "INSERT INTO ${tbl} VALUES (${i}, ${10 * i})"
+ }
+
+ sql """ sync """
+
+ // get tablets and set replica status to DROP
+ def tablet = sql_return_maparray("show tablets from ${tbl}")[0]
+ sql """
+ ADMIN SET REPLICA STATUS PROPERTIES("tablet_id" =
"${tablet.TabletId}", "backend_id" = "${tablet.BackendId}", "status" = "drop");
+ """
+ // create index on table
+ sql """ create index idx_k2 on ${tbl}(k2) using inverted """
+ sql """ build index idx_k2 on ${tbl} """
+ // sleep 5s to wait for the build index job report table is unstable
+ sleep(5000)
+ def show_build_index = sql_return_maparray("show build index where
TableName = \"${tbl}\" ORDER BY JobId DESC LIMIT 1")
+ assertEquals('WAITING_TXN', show_build_index[0].State)
+ assertEquals('table is unstable', show_build_index[0].Msg)
+
+ def state = wait_for_last_build_index_on_table_finish(tbl, timeout)
+ assertEquals(state, "FINISHED")
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]