This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 02b9d7a26d6 [improvement](clone) dead be will abort sched task #36795
(#36895)
02b9d7a26d6 is described below
commit 02b9d7a26d6cae300d09b05e6ad2d80b7d907121
Author: yujun <[email protected]>
AuthorDate: Thu Jun 27 22:51:05 2024 +0800
[improvement](clone) dead be will abort sched task #36795 (#36895)
cherry pick from #36795
---
.../org/apache/doris/clone/TabletScheduler.java | 52 ++++++--
.../apache/doris/common/util/DebugPointUtil.java | 10 +-
.../java/org/apache/doris/system/HeartbeatMgr.java | 10 ++
.../apache/doris/clone/BeDownCancelCloneTest.java | 147 +++++++++++++++++++++
.../apache/doris/utframe/MockedBackendFactory.java | 8 ++
5 files changed, 213 insertions(+), 14 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
index 3341f5bb305..715739310bd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -1822,36 +1822,64 @@ public class TabletScheduler extends MasterDaemon {
* If task is timeout, remove the tablet.
*/
public void handleRunningTablets() {
+ Set<Long> aliveBeIds =
Sets.newHashSet(Env.getCurrentSystemInfo().getAllBackendIds(true));
// 1. remove the tablet ctx if timeout
- List<TabletSchedCtx> timeoutTablets = Lists.newArrayList();
+ List<TabletSchedCtx> cancelTablets = Lists.newArrayList();
synchronized (this) {
-
runningTablets.values().stream().filter(TabletSchedCtx::isTimeout).forEach(timeoutTablets::add);
+ for (TabletSchedCtx tabletCtx : runningTablets.values()) {
+ long srcBeId = tabletCtx.getSrcBackendId();
+ long destBeId = tabletCtx.getDestBackendId();
+ if (Config.disable_tablet_scheduler) {
+ tabletCtx.setErrMsg("tablet scheduler is disabled");
+ cancelTablets.add(tabletCtx);
+ } else if (Config.disable_balance && tabletCtx.getType() ==
Type.BALANCE) {
+ tabletCtx.setErrMsg("balance is disabled");
+ cancelTablets.add(tabletCtx);
+ } else if (tabletCtx.isTimeout()) {
+ tabletCtx.setErrMsg("timeout");
+ cancelTablets.add(tabletCtx);
+ stat.counterCloneTaskTimeout.incrementAndGet();
+ } else if (destBeId > 0 && !aliveBeIds.contains(destBeId)) {
+ tabletCtx.setErrMsg("dest be " + destBeId + " is dead");
+ cancelTablets.add(tabletCtx);
+ } else if (srcBeId > 0 && !aliveBeIds.contains(srcBeId)) {
+ tabletCtx.setErrMsg("src be " + srcBeId + " is dead");
+ cancelTablets.add(tabletCtx);
+ }
+ }
}
// 2. release ctx
- timeoutTablets.forEach(t -> {
+ cancelTablets.forEach(t -> {
// Set "resetReplicaState" to true because
// the timeout task should also be considered as UNRECOVERABLE,
// so need to reset replica state.
- t.setErrMsg("timeout");
- finalizeTabletCtx(t, TabletSchedCtx.State.CANCELLED,
Status.UNRECOVERABLE, "timeout");
- stat.counterCloneTaskTimeout.incrementAndGet();
+ finalizeTabletCtx(t, TabletSchedCtx.State.CANCELLED,
Status.UNRECOVERABLE, t.getErrMsg());
});
}
public List<List<String>> getPendingTabletsInfo(int limit) {
- List<TabletSchedCtx> tabletCtxs = getCopiedTablets(pendingTablets,
limit);
- return collectTabletCtx(tabletCtxs);
+ return collectTabletCtx(getPendingTablets(limit));
+ }
+
+ public List<TabletSchedCtx> getPendingTablets(int limit) {
+ return getCopiedTablets(pendingTablets, limit);
}
public List<List<String>> getRunningTabletsInfo(int limit) {
- List<TabletSchedCtx> tabletCtxs =
getCopiedTablets(runningTablets.values(), limit);
- return collectTabletCtx(tabletCtxs);
+ return collectTabletCtx(getRunningTablets(limit));
+ }
+
+ public List<TabletSchedCtx> getRunningTablets(int limit) {
+ return getCopiedTablets(runningTablets.values(), limit);
}
public List<List<String>> getHistoryTabletsInfo(int limit) {
- List<TabletSchedCtx> tabletCtxs = getCopiedTablets(schedHistory,
limit);
- return collectTabletCtx(tabletCtxs);
+ return collectTabletCtx(getHistoryTablets(limit));
+ }
+
+ public List<TabletSchedCtx> getHistoryTablets(int limit) {
+ return getCopiedTablets(schedHistory, limit);
}
private List<List<String>> collectTabletCtx(List<TabletSchedCtx>
tabletCtxs) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugPointUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugPointUtil.java
index da06232f0c0..420cee77631 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugPointUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugPointUtil.java
@@ -134,12 +134,18 @@ public class DebugPointUtil {
addDebugPoint(name, new DebugPoint());
}
- public static <E> void addDebugPointWithValue(String name, E value) {
+ public static void addDebugPointWithParams(String name, Map<String,
String> params) {
DebugPoint debugPoint = new DebugPoint();
- debugPoint.params.put("value", String.format("%s", value));
+ debugPoint.params = params;
addDebugPoint(name, debugPoint);
}
+ public static <E> void addDebugPointWithValue(String name, E value) {
+ Map<String, String> params = Maps.newHashMap();
+ params.put("value", String.format("%s", value));
+ addDebugPointWithParams(name, params);
+ }
+
public static void removeDebugPoint(String name) {
DebugPoint debugPoint = debugPoints.remove(name);
LOG.info("remove debug point: name={}, exists={}", name, debugPoint !=
null);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
index dc4c28217b9..7c081c12cd0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
@@ -24,6 +24,7 @@ import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.Version;
+import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.persist.HbPackage;
import org.apache.doris.resource.Tag;
@@ -53,6 +54,7 @@ import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
@@ -237,6 +239,14 @@ public class HeartbeatMgr extends MasterDaemon {
result.setBackendInfo(backendInfo);
}
+ String debugDeadBeIds = DebugPointUtil.getDebugParamOrDefault(
+ "HeartbeatMgr.BackendHeartbeatHandler", "deadBeIds",
"");
+ if (!Strings.isNullOrEmpty(debugDeadBeIds)
+ &&
Arrays.stream(debugDeadBeIds.split(",")).anyMatch(id -> Long.parseLong(id) ==
backendId)) {
+
result.getStatus().setStatusCode(TStatusCode.INTERNAL_ERROR);
+ result.getStatus().addToErrorMsgs("debug point
HeartbeatMgr.deadBeIds set dead be");
+ }
+
ok = true;
if (result.getStatus().getStatusCode() == TStatusCode.OK) {
TBackendInfo tBackendInfo = result.getBackendInfo();
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/clone/BeDownCancelCloneTest.java
b/fe/fe-core/src/test/java/org/apache/doris/clone/BeDownCancelCloneTest.java
new file mode 100644
index 00000000000..4a413495e98
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/BeDownCancelCloneTest.java
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.clone;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.util.DebugPointUtil;
+import org.apache.doris.system.Backend;
+import org.apache.doris.utframe.TestWithFeService;
+
+import com.google.common.collect.Maps;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+
+public class BeDownCancelCloneTest extends TestWithFeService {
+
+ @Override
+ protected int backendNum() {
+ return 4;
+ }
+
+ @Override
+ protected void beforeCreatingConnectContext() throws Exception {
+ FeConstants.runningUnitTest = true;
+ FeConstants.default_scheduler_interval_millisecond = 1000;
+ Config.enable_debug_points = true;
+ FeConstants.tablet_checker_interval_ms = 100;
+ Config.tablet_repair_delay_factor_second = 1;
+ Config.allow_replica_on_same_host = true;
+ Config.disable_balance = true;
+ Config.schedule_batch_size = 1000;
+ Config.schedule_slot_num_per_hdd_path = 1000;
+ FeConstants.heartbeat_interval_second = 5;
+ Config.max_backend_heartbeat_failure_tolerance_count = 1;
+ Config.min_clone_task_timeout_sec = 20 * 60 * 1000;
+ }
+
+ @Test
+ public void test() throws Exception {
+ connectContext = createDefaultCtx();
+
+ createDatabase("db1");
+ System.out.println(Env.getCurrentInternalCatalog().getDbNames());
+
+ // 3. create table tbl1
+ createTable("create table db1.tbl1(k1 int) distributed by hash(k1)
buckets 1;");
+ RebalancerTestUtil.updateReplicaPathHash();
+
+ Database db =
Env.getCurrentInternalCatalog().getDbOrMetaException("default_cluster:db1");
+ OlapTable tbl = (OlapTable) db.getTableOrMetaException("tbl1");
+ Assertions.assertNotNull(tbl);
+ Tablet tablet = tbl.getPartitions().iterator().next()
+
.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL).iterator().next()
+ .getTablets().iterator().next();
+
+ Assertions.assertEquals(3, tablet.getReplicas().size());
+ long destBeId =
Env.getCurrentSystemInfo().getAllBackendIds(true).stream()
+ .filter(beId -> tablet.getReplicaByBackendId(beId) == null)
+ .findFirst()
+ .orElse(-1L);
+ Assertions.assertTrue(destBeId != -1L);
+ Backend destBe = Env.getCurrentSystemInfo().getBackend(destBeId);
+ Assertions.assertNotNull(destBe);
+ Assertions.assertTrue(destBe.isAlive());
+
+ // add debug point, make clone wait
+
DebugPointUtil.addDebugPoint("MockedBackendFactory.handleCloneTablet.block");
+
+ // move replica[0] to destBeId
+ Replica srcReplica = tablet.getReplicas().get(0);
+ String moveTabletSql = "ADMIN SET REPLICA STATUS
PROPERTIES(\"tablet_id\" = \"" + tablet.getId() + "\", "
+ + "\"backend_id\" = \"" + srcReplica.getBackendId() + "\",
\"status\" = \"drop\")";
+ Assertions.assertNotNull(getSqlStmtExecutor(moveTabletSql));
+ Assertions.assertFalse(srcReplica.isScheduleAvailable());
+
+ Thread.sleep(3000);
+
+ Assertions.assertEquals(0,
Env.getCurrentEnv().getTabletScheduler().getHistoryTablets(100).size());
+ Assertions.assertEquals(4, tablet.getReplicas().size());
+ Replica destReplica = tablet.getReplicaByBackendId(destBeId);
+ Assertions.assertNotNull(destReplica);
+ Assertions.assertEquals(Replica.ReplicaState.CLONE,
destReplica.getState());
+
+ // clone a replica on destBe
+ List<TabletSchedCtx> runningTablets =
Env.getCurrentEnv().getTabletScheduler().getRunningTablets(100);
+ Assertions.assertEquals(1, runningTablets.size());
+ Assertions.assertEquals(destBeId,
runningTablets.get(0).getDestBackendId());
+
+ Map<String, String> params2 = Maps.newHashMap();
+ params2.put("deadBeIds", String.valueOf(destBeId));
+
DebugPointUtil.addDebugPointWithParams("HeartbeatMgr.BackendHeartbeatHandler",
params2);
+
+ Thread.sleep((FeConstants.heartbeat_interval_second
+ * Config.max_backend_heartbeat_failure_tolerance_count + 4) *
1000L);
+
+ destBe = Env.getCurrentSystemInfo().getBackend(destBeId);
+ Assertions.assertNotNull(destBe);
+ Assertions.assertFalse(destBe.isAlive());
+
+ // delete clone dest task
+
Assertions.assertFalse(Env.getCurrentEnv().getTabletScheduler().getHistoryTablets(100).isEmpty());
+
+ // first drop dest replica (its backend is dead) and src replica (it's
mark as drop)
+ // then re clone a replica to src be, and waiting for cloning.
+ runningTablets =
Env.getCurrentEnv().getTabletScheduler().getRunningTablets(100);
+ Assertions.assertEquals(1, runningTablets.size());
+ Assertions.assertEquals(srcReplica.getBackendId(),
runningTablets.get(0).getDestBackendId());
+
+
DebugPointUtil.removeDebugPoint("MockedBackendFactory.handleCloneTablet.block");
+ Thread.sleep(2000);
+
+ // destBe is dead, cancel clone task
+ runningTablets =
Env.getCurrentEnv().getTabletScheduler().getRunningTablets(100);
+ Assertions.assertEquals(0, runningTablets.size());
+
+ Assertions.assertEquals(3, tablet.getReplicas().size());
+ for (Replica replica : tablet.getReplicas()) {
+ Assertions.assertTrue(replica.getBackendId() != destBeId);
+ Assertions.assertTrue(replica.isScheduleAvailable());
+ Assertions.assertEquals(Replica.ReplicaState.NORMAL,
replica.getState());
+ }
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
index cb228043fdd..665dc8163ae 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
@@ -21,6 +21,7 @@ import org.apache.doris.catalog.CatalogTestUtil;
import org.apache.doris.catalog.DiskInfo;
import org.apache.doris.catalog.DiskInfo.DiskState;
import org.apache.doris.common.ClientPool;
+import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.proto.Data;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.PBackendServiceGrpc;
@@ -228,6 +229,13 @@ public class MockedBackendFactory {
}
private void handleCloneTablet(TAgentTaskRequest request,
TFinishTaskRequest finishTaskRequest) {
+ while
(DebugPointUtil.isEnable("MockedBackendFactory.handleCloneTablet.block")) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
TCloneReq req = request.getCloneReq();
long dataSize = Math.max(1,
CatalogTestUtil.getTabletDataSize(req.tablet_id));
long pathHash = req.dest_path_hash;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]