This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-0.15 in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit 1241992fbe5f620c6e97ff0752c3e773bea5042c Author: Mingyu Chen <[email protected]> AuthorDate: Mon Oct 25 10:07:04 2021 +0800 [Bug] Fix bug of decommission (#6826) --- .../main/java/org/apache/doris/catalog/Tablet.java | 15 ++++++ .../apache/doris/clone/ClusterLoadStatistic.java | 7 +++ .../org/apache/doris/clone/TabletSchedCtx.java | 23 ++++++--- .../org/apache/doris/clone/TabletScheduler.java | 7 +-- .../doris/clone/TabletRepairAndBalanceTest.java | 58 ++++++++++++++++++++++ .../doris/utframe/DemoMultiBackendsTest.java | 3 +- 6 files changed, 101 insertions(+), 12 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java index cec8d8f..f40216c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java @@ -562,6 +562,8 @@ public class Tablet extends MetaObject implements Writable { // in ColocateTableCheckerAndBalancer. Short totalReplicaNum = replicaAlloc.getTotalReplicaNum(); // 1. check if replicas' backends are mismatch + // There is no REPLICA_MISSING status for colocate table's tablet. + // Because if the following check doesn't pass, the COLOCATE_MISMATCH will return. Set<Long> replicaBackendIds = getBackendIds(); if (!replicaBackendIds.containsAll(backendsSet)) { return TabletStatus.COLOCATE_MISMATCH; @@ -574,6 +576,19 @@ public class Tablet extends MetaObject implements Writable { // eg: replicaBackendIds=(1,2,3,4); backendsSet=(1,2,3), then replica 4 should be skipped here and then goto ```COLOCATE_REDUNDANT``` in step 3 continue; } + + if (!replica.isAlive()) { + if (replica.isBad()) { + // If this replica is bad but located on one of backendsSet, + // we have drop it first, or we can find any other BE for new replica. + return TabletStatus.COLOCATE_REDUNDANT; + } else { + // maybe in replica's DECOMMISSION state + // Here we return VERSION_INCOMPLETE, and the tablet scheduler will finally set it's state to NORMAL. + return TabletStatus.VERSION_INCOMPLETE; + } + } + if (replica.getLastFailedVersion() > 0 || replica.getVersion() < visibleVersion) { // this replica is alive but version incomplete return TabletStatus.VERSION_INCOMPLETE; diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/ClusterLoadStatistic.java b/fe/fe-core/src/main/java/org/apache/doris/clone/ClusterLoadStatistic.java index 6dd18c1..14d7230 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/ClusterLoadStatistic.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/ClusterLoadStatistic.java @@ -83,6 +83,13 @@ public class ClusterLoadStatistic { public void init() { List<Backend> backends = infoService.getBackendsByTagInCluster(clusterName, tag); for (Backend backend : backends) { + if (backend.isDecommissioned()) { + // Skip the decommission backend. + // Otherwise, before the decommission is done, these BE will always to treated + // as LOW Backends. But the balancer will not handle BE in decommission state. + // So balance will be blocked. + continue; + } BackendLoadStatistic beStatistic = new BackendLoadStatistic(backend.getId(), backend.getOwnerClusterName(), backend.getTag(), infoService, invertedIndex); try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java index b34af4e..d86c2c5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java @@ -487,14 +487,19 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> { } // database lock should be held. - public void chooseSrcReplica(Map<Long, PathSlot> backendsWorkingSlots) throws SchedException { + // If exceptBeId != -1, should not choose src replica with same BE id as exceptBeId + public void chooseSrcReplica(Map<Long, PathSlot> backendsWorkingSlots, long exceptBeId) throws SchedException { /* * get all candidate source replicas * 1. source replica should be healthy. - * 2. slot of this source replica is available. + * 2. slot of this source replica is available. */ List<Replica> candidates = Lists.newArrayList(); for (Replica replica : tablet.getReplicas()) { + if (exceptBeId != -1 && replica.getBackendId() == exceptBeId) { + continue; + } + if (replica.isBad()) { continue; } @@ -504,7 +509,7 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> { // backend which is in decommission can still be the source backend continue; } - + if (replica.getLastFailedVersion() > 0) { continue; } @@ -545,10 +550,9 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> { */ public void chooseSrcReplicaForVersionIncomplete(Map<Long, PathSlot> backendsWorkingSlots) throws SchedException { - chooseSrcReplica(backendsWorkingSlots); - if (srcReplica.getBackendId() == destBackendId) { - throw new SchedException(Status.SCHEDULE_FAILED, "the chosen source replica is in dest backend"); - } + chooseSrcReplica(backendsWorkingSlots, destBackendId); + Preconditions.checkState(srcReplica.getBackendId() != destBackendId, + "wrong be id: " + destBackendId); } /* @@ -573,9 +577,12 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> { continue; } + // check version and replica state. + // if the replica's state is DECOMMISSION, it may be chose as dest replica, + // and its state will be set to NORMAL later. if (replica.getLastFailedVersion() <= 0 && ((replica.getVersion() == visibleVersion && replica.getVersionHash() == visibleVersionHash) - || replica.getVersion() > visibleVersion)) { + || replica.getVersion() > visibleVersion) && replica.getState() != ReplicaState.DECOMMISSION) { // skip healthy replica continue; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java index 39804ae..7aca236 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -625,7 +625,7 @@ public class TabletScheduler extends MasterDaemon { tabletCtx.setDest(destPath.getBeId(), destPath.getPathHash()); // choose a source replica for cloning from - tabletCtx.chooseSrcReplica(backendsWorkingSlots); + tabletCtx.chooseSrcReplica(backendsWorkingSlots, -1); // create clone task batchTask.addTask(tabletCtx.createCloneReplicaAndTask()); @@ -955,10 +955,11 @@ public class TabletScheduler extends MasterDaemon { private boolean handleColocateRedundant(TabletSchedCtx tabletCtx) throws SchedException { Preconditions.checkNotNull(tabletCtx.getColocateBackendsSet()); for (Replica replica : tabletCtx.getReplicas()) { - if (tabletCtx.getColocateBackendsSet().contains(replica.getBackendId())) { + if (tabletCtx.getColocateBackendsSet().contains(replica.getBackendId()) && !replica.isBad()) { continue; } + // If the replica is not in ColocateBackendsSet or is bad, delete it. deleteReplicaInternal(tabletCtx, replica, "colocate redundant", false); throw new SchedException(Status.FINISHED, "colocate redundant replica is deleted"); } @@ -1074,7 +1075,7 @@ public class TabletScheduler extends MasterDaemon { tabletCtx.setDest(destPath.getBeId(), destPath.getPathHash()); // choose a source replica for cloning from - tabletCtx.chooseSrcReplica(backendsWorkingSlots); + tabletCtx.chooseSrcReplica(backendsWorkingSlots, -1); // create clone task batchTask.addTask(tabletCtx.createCloneReplicaAndTask()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java index 5099696..65a9a24 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java @@ -27,11 +27,13 @@ import org.apache.doris.catalog.ColocateGroupSchema; import org.apache.doris.catalog.ColocateTableIndex; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.DiskInfo; +import org.apache.doris.catalog.MaterializedIndex; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.PartitionInfo; import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.ReplicaAllocation; +import org.apache.doris.catalog.Tablet; import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.catalog.TabletMeta; import org.apache.doris.common.AnalysisException; @@ -456,6 +458,62 @@ public class TabletRepairAndBalanceTest { // test colocate table index persist ExceptionChecker.expectThrowsNoException(() -> testColocateTableIndexSerialization(colocateTableIndex)); + + // test colocate tablet repair + String createStr6 = "create table test.col_tbl3\n" + + "(k1 date, k2 int)\n" + + "distributed by hash(k2) buckets 1\n" + + "properties\n" + + "(\n" + + " \"replication_num\" = \"3\",\n" + + " \"colocate_with\" = \"g3\"\n" + + ")"; + ExceptionChecker.expectThrowsNoException(() -> createTable(createStr6)); + + OlapTable tbl3 = db.getOlapTableOrDdlException("col_tbl3"); + updateReplicaPathHash(); + // Set one replica's state as DECOMMISSION, see if it can be changed to NORMAL + Tablet oneTablet = null; + Replica oneReplica = null; + for (Partition partition : tbl3.getPartitions()) { + for (MaterializedIndex mIndex : partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) { + for (Tablet tablet : mIndex.getTablets()) { + oneTablet = tablet; + for (Replica replica : tablet.getReplicas()) { + oneReplica = replica; + oneReplica.setState(Replica.ReplicaState.DECOMMISSION); + break; + } + } + } + } + Assert.assertTrue(checkReplicaState(oneReplica)); + + // set one replica to bad, see if it can be repaired + oneReplica.setBad(true); + Assert.assertTrue(checkReplicaBad(oneTablet, oneReplica)); + } + + private static boolean checkReplicaState(Replica replica) throws Exception { + for (int i = 0; i < 10; i++) { + Thread.sleep(1000); + if (replica.getState() != Replica.ReplicaState.NORMAL) { + continue; + } + return true; + } + return false; + } + + private static boolean checkReplicaBad(Tablet tablet, Replica replica) throws Exception { + for (int i = 0; i < 10; i++) { + Thread.sleep(1000); + if (tablet.getReplicaById(replica.getId()) != null) { + continue; + } + return true; + } + return false; } private void testColocateTableIndexSerialization(ColocateTableIndex colocateTableIndex) throws IOException { diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java index 460f49b..70578f4 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java @@ -132,7 +132,8 @@ public class DemoMultiBackendsTest { Catalog.getCurrentCatalog().createDb(createDbStmt); System.out.println(Catalog.getCurrentCatalog().getDbNames()); // 3. create table tbl1 - String createTblStmtStr = "create table db1.tbl1(k1 int) distributed by hash(k1) buckets 3 properties('replication_num' = '3');"; + String createTblStmtStr = "create table db1.tbl1(k1 int) distributed by hash(k1) buckets 3 properties('replication_num' = '3'," + + "'colocate_with' = 'g1');"; CreateTableStmt createTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(createTblStmtStr, ctx); Catalog.getCurrentCatalog().createTable(createTableStmt); // must set replicas' path hash, or the tablet scheduler won't work --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
