This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-0.15
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 1241992fbe5f620c6e97ff0752c3e773bea5042c
Author: Mingyu Chen <[email protected]>
AuthorDate: Mon Oct 25 10:07:04 2021 +0800

    [Bug] Fix bug of decommission (#6826)
---
 .../main/java/org/apache/doris/catalog/Tablet.java | 15 ++++++
 .../apache/doris/clone/ClusterLoadStatistic.java   |  7 +++
 .../org/apache/doris/clone/TabletSchedCtx.java     | 23 ++++++---
 .../org/apache/doris/clone/TabletScheduler.java    |  7 +--
 .../doris/clone/TabletRepairAndBalanceTest.java    | 58 ++++++++++++++++++++++
 .../doris/utframe/DemoMultiBackendsTest.java       |  3 +-
 6 files changed, 101 insertions(+), 12 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
index cec8d8f..f40216c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
@@ -562,6 +562,8 @@ public class Tablet extends MetaObject implements Writable {
         // in ColocateTableCheckerAndBalancer.
         Short totalReplicaNum = replicaAlloc.getTotalReplicaNum();
         // 1. check if replicas' backends are mismatch
+        //    There is no REPLICA_MISSING status for colocate table's tablet.
+        //    Because if the following check doesn't pass, the 
COLOCATE_MISMATCH will return.
         Set<Long> replicaBackendIds = getBackendIds();
         if (!replicaBackendIds.containsAll(backendsSet)) {
             return TabletStatus.COLOCATE_MISMATCH;
@@ -574,6 +576,19 @@ public class Tablet extends MetaObject implements Writable 
{
                 // eg:  replicaBackendIds=(1,2,3,4); backendsSet=(1,2,3), then 
replica 4 should be skipped here and then goto ```COLOCATE_REDUNDANT``` in step 
3
                 continue;
             }
+
+            if (!replica.isAlive()) {
+                if (replica.isBad()) {
+                    // If this replica is bad but located on one of 
backendsSet,
+                    // we have drop it first, or we can find any other BE for 
new replica.
+                    return TabletStatus.COLOCATE_REDUNDANT;
+                } else {
+                    // maybe in replica's DECOMMISSION state
+                    // Here we return VERSION_INCOMPLETE, and the tablet 
scheduler will finally set it's state to NORMAL.
+                    return TabletStatus.VERSION_INCOMPLETE;
+                }
+            }
+
             if (replica.getLastFailedVersion() > 0 || replica.getVersion() < 
visibleVersion) {
                 // this replica is alive but version incomplete
                 return TabletStatus.VERSION_INCOMPLETE;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/ClusterLoadStatistic.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/ClusterLoadStatistic.java
index 6dd18c1..14d7230 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/ClusterLoadStatistic.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/ClusterLoadStatistic.java
@@ -83,6 +83,13 @@ public class ClusterLoadStatistic {
     public void init() {
         List<Backend> backends = 
infoService.getBackendsByTagInCluster(clusterName, tag);
         for (Backend backend : backends) {
+            if (backend.isDecommissioned()) {
+                // Skip the decommission backend.
+                // Otherwise, before the decommission is done, these BE will 
always to treated
+                // as LOW Backends. But the balancer will not handle BE in 
decommission state.
+                // So balance will be blocked.
+                continue;
+            }
             BackendLoadStatistic beStatistic = new 
BackendLoadStatistic(backend.getId(),
                     backend.getOwnerClusterName(), backend.getTag(), 
infoService, invertedIndex);
             try {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
index b34af4e..d86c2c5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
@@ -487,14 +487,19 @@ public class TabletSchedCtx implements 
Comparable<TabletSchedCtx> {
     }
 
     // database lock should be held.
-    public void chooseSrcReplica(Map<Long, PathSlot> backendsWorkingSlots) 
throws SchedException {
+    // If exceptBeId != -1, should not choose src replica with same BE id as 
exceptBeId
+    public void chooseSrcReplica(Map<Long, PathSlot> backendsWorkingSlots, 
long exceptBeId) throws SchedException {
         /*
          * get all candidate source replicas
          * 1. source replica should be healthy.
-         * 2. slot of this source replica is available. 
+         * 2. slot of this source replica is available.
          */
         List<Replica> candidates = Lists.newArrayList();
         for (Replica replica : tablet.getReplicas()) {
+            if (exceptBeId != -1 && replica.getBackendId() == exceptBeId) {
+                continue;
+            }
+
             if (replica.isBad()) {
                 continue;
             }
@@ -504,7 +509,7 @@ public class TabletSchedCtx implements 
Comparable<TabletSchedCtx> {
                 // backend which is in decommission can still be the source 
backend
                 continue;
             }
-            
+
             if (replica.getLastFailedVersion() > 0) {
                 continue;
             }
@@ -545,10 +550,9 @@ public class TabletSchedCtx implements 
Comparable<TabletSchedCtx> {
      */
     public void chooseSrcReplicaForVersionIncomplete(Map<Long, PathSlot> 
backendsWorkingSlots)
             throws SchedException {
-        chooseSrcReplica(backendsWorkingSlots);
-        if (srcReplica.getBackendId() == destBackendId) {
-            throw new SchedException(Status.SCHEDULE_FAILED, "the chosen 
source replica is in dest backend");
-        }
+        chooseSrcReplica(backendsWorkingSlots, destBackendId);
+        Preconditions.checkState(srcReplica.getBackendId() != destBackendId,
+                "wrong be id: " + destBackendId);
     }
     
     /*
@@ -573,9 +577,12 @@ public class TabletSchedCtx implements 
Comparable<TabletSchedCtx> {
                 continue;
             }
 
+            // check version and replica state.
+            // if the replica's state is DECOMMISSION, it may be chose as dest 
replica,
+            // and its state will be set to NORMAL later.
             if (replica.getLastFailedVersion() <= 0
                     && ((replica.getVersion() == visibleVersion && 
replica.getVersionHash() == visibleVersionHash)
-                            || replica.getVersion() > visibleVersion)) {
+                    || replica.getVersion() > visibleVersion) && 
replica.getState() != ReplicaState.DECOMMISSION) {
                 // skip healthy replica
                 continue;
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
index 39804ae..7aca236 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -625,7 +625,7 @@ public class TabletScheduler extends MasterDaemon {
         tabletCtx.setDest(destPath.getBeId(), destPath.getPathHash());
 
         // choose a source replica for cloning from
-        tabletCtx.chooseSrcReplica(backendsWorkingSlots);
+        tabletCtx.chooseSrcReplica(backendsWorkingSlots, -1);
 
         // create clone task
         batchTask.addTask(tabletCtx.createCloneReplicaAndTask());
@@ -955,10 +955,11 @@ public class TabletScheduler extends MasterDaemon {
     private boolean handleColocateRedundant(TabletSchedCtx tabletCtx) throws 
SchedException {
         Preconditions.checkNotNull(tabletCtx.getColocateBackendsSet());
         for (Replica replica : tabletCtx.getReplicas()) {
-            if 
(tabletCtx.getColocateBackendsSet().contains(replica.getBackendId())) {
+            if 
(tabletCtx.getColocateBackendsSet().contains(replica.getBackendId()) && 
!replica.isBad()) {
                 continue;
             }
 
+            // If the replica is not in ColocateBackendsSet or is bad, delete 
it.
             deleteReplicaInternal(tabletCtx, replica, "colocate redundant", 
false);
             throw new SchedException(Status.FINISHED, "colocate redundant 
replica is deleted");
         }
@@ -1074,7 +1075,7 @@ public class TabletScheduler extends MasterDaemon {
         tabletCtx.setDest(destPath.getBeId(), destPath.getPathHash());
 
         // choose a source replica for cloning from
-        tabletCtx.chooseSrcReplica(backendsWorkingSlots);
+        tabletCtx.chooseSrcReplica(backendsWorkingSlots, -1);
 
         // create clone task
         batchTask.addTask(tabletCtx.createCloneReplicaAndTask());
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java
index 5099696..65a9a24 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java
@@ -27,11 +27,13 @@ import org.apache.doris.catalog.ColocateGroupSchema;
 import org.apache.doris.catalog.ColocateTableIndex;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.DiskInfo;
+import org.apache.doris.catalog.MaterializedIndex;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Partition;
 import org.apache.doris.catalog.PartitionInfo;
 import org.apache.doris.catalog.Replica;
 import org.apache.doris.catalog.ReplicaAllocation;
+import org.apache.doris.catalog.Tablet;
 import org.apache.doris.catalog.TabletInvertedIndex;
 import org.apache.doris.catalog.TabletMeta;
 import org.apache.doris.common.AnalysisException;
@@ -456,6 +458,62 @@ public class TabletRepairAndBalanceTest {
 
         // test colocate table index persist
         ExceptionChecker.expectThrowsNoException(() -> 
testColocateTableIndexSerialization(colocateTableIndex));
+
+        // test colocate tablet repair
+        String createStr6 = "create table test.col_tbl3\n" +
+                "(k1 date, k2 int)\n" +
+                "distributed by hash(k2) buckets 1\n" +
+                "properties\n" +
+                "(\n" +
+                "    \"replication_num\" = \"3\",\n" +
+                "    \"colocate_with\" = \"g3\"\n" +
+                ")";
+        ExceptionChecker.expectThrowsNoException(() -> 
createTable(createStr6));
+
+        OlapTable tbl3 = db.getOlapTableOrDdlException("col_tbl3");
+        updateReplicaPathHash();
+        // Set one replica's state as DECOMMISSION, see if it can be changed 
to NORMAL
+        Tablet oneTablet = null;
+        Replica oneReplica = null;
+        for (Partition partition : tbl3.getPartitions()) {
+            for (MaterializedIndex mIndex : 
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
+                for (Tablet tablet : mIndex.getTablets()) {
+                    oneTablet = tablet;
+                    for (Replica replica : tablet.getReplicas()) {
+                        oneReplica = replica;
+                        oneReplica.setState(Replica.ReplicaState.DECOMMISSION);
+                        break;
+                    }
+                }
+            }
+        }
+        Assert.assertTrue(checkReplicaState(oneReplica));
+
+        // set one replica to bad, see if it can be repaired
+        oneReplica.setBad(true);
+        Assert.assertTrue(checkReplicaBad(oneTablet, oneReplica));
+    }
+
+    private static boolean checkReplicaState(Replica replica) throws Exception 
{
+        for (int i = 0; i < 10; i++) {
+            Thread.sleep(1000);
+            if (replica.getState() != Replica.ReplicaState.NORMAL) {
+                continue;
+            }
+            return true;
+        }
+        return false;
+    }
+
+    private static boolean checkReplicaBad(Tablet tablet, Replica replica) 
throws Exception {
+        for (int i = 0; i < 10; i++) {
+            Thread.sleep(1000);
+            if (tablet.getReplicaById(replica.getId()) != null) {
+                continue;
+            }
+            return true;
+        }
+        return false;
     }
 
     private void testColocateTableIndexSerialization(ColocateTableIndex 
colocateTableIndex) throws IOException {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java
index 460f49b..70578f4 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/utframe/DemoMultiBackendsTest.java
@@ -132,7 +132,8 @@ public class DemoMultiBackendsTest {
         Catalog.getCurrentCatalog().createDb(createDbStmt);
         System.out.println(Catalog.getCurrentCatalog().getDbNames());
         // 3. create table tbl1
-        String createTblStmtStr = "create table db1.tbl1(k1 int) distributed 
by hash(k1) buckets 3 properties('replication_num' = '3');";
+        String createTblStmtStr = "create table db1.tbl1(k1 int) distributed 
by hash(k1) buckets 3 properties('replication_num' = '3'," +
+                "'colocate_with' = 'g1');";
         CreateTableStmt createTableStmt = (CreateTableStmt) 
UtFrameUtils.parseAndAnalyzeStmt(createTblStmtStr, ctx);
         Catalog.getCurrentCatalog().createTable(createTableStmt);
         // must set replicas' path hash, or the tablet scheduler won't work

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to