This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 971aa72dea5 [fix](schema change) follow fe set sc fail replicas as bad
#33569 (#34040)
971aa72dea5 is described below
commit 971aa72dea5e93c90572b7d892abad564fd4169d
Author: yujun <[email protected]>
AuthorDate: Sat Apr 27 13:59:22 2024 +0800
[fix](schema change) follow fe set sc fail replicas as bad #33569 (#34040)
---
be/src/olap/schema_change.cpp | 11 ++-
.../java/org/apache/doris/alter/AlterHandler.java | 1 +
.../java/org/apache/doris/alter/AlterJobV2.java | 6 ++
.../java/org/apache/doris/alter/RollupJobV2.java | 27 ++++---
.../org/apache/doris/alter/SchemaChangeJobV2.java | 27 ++++---
.../test_schema_change_fail.groovy | 90 ++++++++++++++++++++++
6 files changed, 139 insertions(+), 23 deletions(-)
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index 84ecbb10c60..4ea1ccbc3ff 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -790,11 +790,20 @@ Status
SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
do {
RowsetSharedPtr max_rowset;
// get history data to be converted and it will check if there is
hold in base tablet
- if (!_get_versions_to_be_changed(base_tablet,
&versions_to_be_changed, &max_rowset)) {
+ res = _get_versions_to_be_changed(base_tablet,
&versions_to_be_changed, &max_rowset);
+ if (!res) {
LOG(WARNING) << "fail to get version to be changed. res=" <<
res;
break;
}
+ DBUG_EXECUTE_IF("SchemaChangeJob.process_alter_tablet.alter_fail",
{
+ res = Status::InternalError(
+ "inject alter tablet failed. base_tablet={},
new_tablet={}",
+ request.base_tablet_id, request.new_tablet_id);
+ LOG(WARNING) << "inject error. res=" << res;
+ break;
+ });
+
// should check the max_version >= request.alter_version, if not
the convert is useless
if (max_rowset == nullptr || max_rowset->end_version() <
request.alter_version) {
res = Status::InternalError(
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java
index 7ab2f8732d0..dbc43059e5b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterHandler.java
@@ -269,6 +269,7 @@ public abstract class AlterHandler extends MasterDaemon {
alterJob.replay(alterJob);
alterJobsV2.put(alterJob.getJobId(), alterJob);
} else {
+ existingJob.failedTabletBackends = alterJob.failedTabletBackends;
existingJob.replay(alterJob);
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java
index 29bf3d994ed..78f34c2cccd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java
@@ -29,6 +29,7 @@ import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.persist.gson.GsonUtils;
+import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -36,6 +37,7 @@ import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
/*
* Version 2 of AlterJob, for replacing the old version of AlterJob.
@@ -90,6 +92,10 @@ public abstract class AlterJobV2 implements Writable {
@SerializedName(value = "rawSql")
protected String rawSql;
+ // save failed task after retry three times, tablet -> backends
+ @SerializedName(value = "failedTabletBackends")
+ protected Map<Long, List<Long>> failedTabletBackends = Maps.newHashMap();
+
public AlterJobV2(String rawSql, long jobId, JobType jobType, long dbId,
long tableId, String tableName,
long timeoutMs) {
this.rawSql = rawSql;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
index 482944acf8b..c127bd4b8d0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
@@ -140,8 +140,6 @@ public class RollupJobV2 extends AlterJobV2 implements
GsonPostProcessable {
// save all create rollup tasks
private AgentBatchTask rollupBatchTask = new AgentBatchTask();
- // save failed task after retry three times, tabletId -> agentTask
- private Map<Long, List<AgentTask>> failedAgentTasks = Maps.newHashMap();
private Analyzer analyzer;
@@ -529,17 +527,18 @@ public class RollupJobV2 extends AlterJobV2 implements
GsonPostProcessable {
task.setFinished(true);
AgentTaskQueue.removeTask(task.getBackendId(),
TTaskType.ALTER, task.getSignature());
LOG.warn("rollup task failed: " + task.getErrorMsg());
- if (!failedAgentTasks.containsKey(task.getTabletId())) {
- failedAgentTasks.put(task.getTabletId(),
Lists.newArrayList(task));
- } else {
- failedAgentTasks.get(task.getTabletId()).add(task);
+ List<Long> failedBackends =
failedTabletBackends.get(task.getTabletId());
+ if (failedBackends == null) {
+ failedBackends = Lists.newArrayList();
+ failedTabletBackends.put(task.getTabletId(),
failedBackends);
}
+ failedBackends.add(task.getBackendId());
int expectSucceedTaskNum = tbl.getPartitionInfo()
.getReplicaAllocation(task.getPartitionId()).getTotalReplicaNum();
- int failedTaskCount =
failedAgentTasks.get(task.getTabletId()).size();
+ int failedTaskCount = failedBackends.size();
if (expectSucceedTaskNum - failedTaskCount <
expectSucceedTaskNum / 2 + 1) {
throw new AlterCancelException("rollup tasks failed on
same tablet reach threshold "
- + failedAgentTasks.get(task.getTabletId()) +
", reason=" + task.getErrorMsg());
+ + failedTaskCount + ", reason=" +
task.getErrorMsg());
}
}
}
@@ -554,9 +553,11 @@ public class RollupJobV2 extends AlterJobV2 implements
GsonPostProcessable {
try {
Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP);
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
- for (List<AgentTask> tasks : failedAgentTasks.values()) {
- for (AgentTask task : tasks) {
- invertedIndex.getReplica(task.getTabletId(),
task.getBackendId()).setBad(true);
+ for (Map.Entry<Long, List<Long>> entry :
failedTabletBackends.entrySet()) {
+ long tabletId = entry.getKey();
+ List<Long> failedBackends = entry.getValue();
+ for (long backendId : failedBackends) {
+ invertedIndex.getReplica(tabletId, backendId).setBad(true);
}
}
for (Map.Entry<Long, MaterializedIndex> entry :
this.partitionIdToRollupIndex.entrySet()) {
@@ -606,8 +607,12 @@ public class RollupJobV2 extends AlterJobV2 implements
GsonPostProcessable {
MaterializedIndex rollupIndex = partition.getIndex(rollupIndexId);
Preconditions.checkNotNull(rollupIndex, rollupIndexId);
for (Tablet tablet : rollupIndex.getTablets()) {
+ List<Long> failedBackends =
failedTabletBackends.get(tablet.getId());
for (Replica replica : tablet.getReplicas()) {
replica.setState(ReplicaState.NORMAL);
+ if (failedBackends != null &&
failedBackends.contains(replica.getBackendId())) {
+ replica.setBad(true);
+ }
}
}
partition.visualiseShadowIndex(rollupIndexId, false);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index c1ecc202d79..d8c12066df6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -134,8 +134,6 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
// save all schema change tasks
private AgentBatchTask schemaChangeBatchTask = new AgentBatchTask();
- // save failed task after retry three times, tabletId -> agentTask
- private Map<Long, List<AgentTask>> failedAgentTasks = Maps.newHashMap();
private SchemaChangeJobV2() {
super(JobType.SCHEMA_CHANGE);
@@ -520,17 +518,18 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
task.setFinished(true);
AgentTaskQueue.removeTask(task.getBackendId(),
TTaskType.ALTER, task.getSignature());
LOG.warn("schema change task failed: " +
task.getErrorMsg());
- if (!failedAgentTasks.containsKey(task.getTabletId())) {
- failedAgentTasks.put(task.getTabletId(),
Lists.newArrayList(task));
- } else {
- failedAgentTasks.get(task.getTabletId()).add(task);
+ List<Long> failedBackends =
failedTabletBackends.get(task.getTabletId());
+ if (failedBackends == null) {
+ failedBackends = Lists.newArrayList();
+ failedTabletBackends.put(task.getTabletId(),
failedBackends);
}
+ failedBackends.add(task.getBackendId());
int expectSucceedTaskNum = tbl.getPartitionInfo()
.getReplicaAllocation(task.getPartitionId()).getTotalReplicaNum();
- int failedTaskCount =
failedAgentTasks.get(task.getTabletId()).size();
+ int failedTaskCount = failedBackends.size();
if (expectSucceedTaskNum - failedTaskCount <
expectSucceedTaskNum / 2 + 1) {
throw new AlterCancelException("schema change tasks
failed on same tablet reach threshold "
- +
failedAgentTasks.get(task.getTabletId()));
+ + failedTaskCount);
}
}
}
@@ -545,9 +544,11 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
try {
Preconditions.checkState(tbl.getState() ==
OlapTableState.SCHEMA_CHANGE);
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
- for (List<AgentTask> tasks : failedAgentTasks.values()) {
- for (AgentTask task : tasks) {
- invertedIndex.getReplica(task.getTabletId(),
task.getBackendId()).setBad(true);
+ for (Map.Entry<Long, List<Long>> entry :
failedTabletBackends.entrySet()) {
+ long tabletId = entry.getKey();
+ List<Long> failedBackends = entry.getValue();
+ for (long backendId : failedBackends) {
+ invertedIndex.getReplica(tabletId, backendId).setBad(true);
}
}
for (long partitionId : partitionIndexMap.rowKeySet()) {
@@ -622,8 +623,12 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
// set replica state
for (Tablet tablet : shadowIdx.getTablets()) {
+ List<Long> failedBackends =
failedTabletBackends.get(tablet.getId());
for (Replica replica : tablet.getReplicas()) {
replica.setState(ReplicaState.NORMAL);
+ if (failedBackends != null &&
failedBackends.contains(replica.getBackendId())) {
+ replica.setBad(true);
+ }
}
}
diff --git
a/regression-test/suites/schema_change_p2/test_schema_change_fail.groovy
b/regression-test/suites/schema_change_p2/test_schema_change_fail.groovy
new file mode 100644
index 00000000000..8dc65365c39
--- /dev/null
+++ b/regression-test/suites/schema_change_p2/test_schema_change_fail.groovy
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.util.DebugPoint
+import org.apache.doris.regression.util.NodeType
+
+suite('test_schema_change_fail', 'p0,p2,nonConcurrent') {
+ def frontends = sql_return_maparray('show frontends')
+ def backends = sql_return_maparray('show backends')
+ def forceReplicaNum =
getFeConfig('force_olap_table_replication_num').toInteger()
+ if (frontends.size() < 2 || backends.size() < 3 || forceReplicaNum == 1 ||
forceReplicaNum == 2) {
+ return
+ }
+
+ def tbl = 'test_schema_change_fail'
+
+ def beId = backends[0].BackendId.toLong()
+ def beHost = backends[0].Host
+ def beHttpPort = backends[0].HttpPort.toInteger()
+ def injectName = 'SchemaChangeJob.process_alter_tablet.alter_fail'
+
+ def checkReplicaBad = { ->
+ def tabletId = sql_return_maparray("SHOW TABLETS FROM
${tbl}")[0].TabletId.toLong()
+ def replicas = sql_return_maparray(sql_return_maparray("SHOW TABLET
${tabletId}").DetailCmd)
+ assertEquals(backends.size(), replicas.size())
+ for (def replica : replicas) {
+ if (replica.BackendId.toLong() == beId) {
+ assertEquals(true, replica.IsBad.toBoolean())
+ }
+ }
+ }
+
+ def followFe = frontends.stream().filter(fe ->
!fe.IsMaster.toBoolean()).findFirst().orElse(null)
+ def followFeUrl =
"jdbc:mysql://${followFe.Host}:${followFe.QueryPort}/?useLocalSessionState=false&allowLoadLocalInfile=false"
+ followFeUrl = context.config.buildUrlWithDb(followFeUrl, context.dbName)
+
+ sql "DROP TABLE IF EXISTS ${tbl} FORCE"
+ sql """
+ CREATE TABLE ${tbl}
+ (
+ `a` TINYINT NOT NULL,
+ `b` TINYINT NULL
+ )
+ UNIQUE KEY (`a`)
+ DISTRIBUTED BY HASH(`a`) BUCKETS 1
+ PROPERTIES
+ (
+ 'replication_num' = '${backends.size()}',
+ 'light_schema_change' = 'false'
+ )
+ """
+
+ sql "INSERT INTO ${tbl} VALUES (1, 2), (3, 4)"
+
+ try {
+ DebugPoint.enableDebugPoint(beHost, beHttpPort, NodeType.BE,
injectName)
+ setFeConfig('disable_tablet_scheduler', true)
+
+ sleep(1000)
+ sql "ALTER TABLE ${tbl} MODIFY COLUMN b DOUBLE"
+ sleep(5 * 1000)
+
+ def jobs = sql_return_maparray "SHOW ALTER TABLE COLUMN WHERE
TableName = '${tbl}' ORDER BY CreateTime DESC LIMIT 1"
+ assertEquals(1, jobs.size())
+ assertEquals('FINISHED', jobs[0].State)
+
+ checkReplicaBad()
+ connect('root', '', followFeUrl) {
+ checkReplicaBad()
+ }
+ } finally {
+ DebugPoint.disableDebugPoint(beHost, beHttpPort, NodeType.BE,
injectName)
+ setFeConfig('disable_tablet_scheduler', false)
+ sql "DROP TABLE IF EXISTS ${tbl} FORCE"
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]