This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 620a137bd7 [enhancement](test) support tablet repair and balance 
process in ut (#13940)
620a137bd7 is described below

commit 620a137bd77df22016805e060f9762d5d311ee5d
Author: wxy <[email protected]>
AuthorDate: Sat Nov 5 19:20:23 2022 +0800

    [enhancement](test) support tablet repair and balance process in ut (#13940)
---
 .../main/java/org/apache/doris/catalog/Tablet.java |  9 ++-
 .../org/apache/doris/clone/TabletSchedCtx.java     |  9 +--
 .../org/apache/doris/clone/TabletScheduler.java    |  4 +-
 .../main/java/org/apache/doris/common/Config.java  |  2 +-
 .../doris/cluster/DecommissionBackendTest.java     | 91 ++++++++++++++++++++++
 .../apache/doris/utframe/TestWithFeService.java    | 55 ++++++++++++-
 6 files changed, 159 insertions(+), 11 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
index 1af5cb5ad9..fbe60b74c8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
@@ -21,6 +21,7 @@ import org.apache.doris.catalog.Replica.ReplicaState;
 import org.apache.doris.clone.TabletSchedCtx;
 import org.apache.doris.clone.TabletSchedCtx.Priority;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.resource.Tag;
@@ -429,8 +430,8 @@ public class Tablet extends MetaObject implements Writable {
         ArrayList<Long> versions = new ArrayList<>();
         for (Replica replica : replicas) {
             Backend backend = 
systemInfoService.getBackend(replica.getBackendId());
-            if (backend == null || !backend.isAlive() || !replica.isAlive() || 
!hosts.add(backend.getHost())
-                    || replica.tooSlow() || !backend.isMixNode()) {
+            if (backend == null || !backend.isAlive() || !replica.isAlive()
+                    || checkHost(hosts, backend) || replica.tooSlow() || 
!backend.isMixNode()) {
                 // this replica is not alive,
                 // or if this replica is on same host with another replica, we 
also treat it as 'dead',
                 // so that Tablet Scheduler will create a new replica on 
different host.
@@ -563,6 +564,10 @@ public class Tablet extends MetaObject implements Writable 
{
         return Pair.of(TabletStatus.HEALTHY, TabletSchedCtx.Priority.NORMAL);
     }
 
+    private boolean checkHost(Set<String> hosts, Backend backend) {
+        return !Config.allow_replica_on_same_host && 
!FeConstants.runningUnitTest && !hosts.add(backend.getHost());
+    }
+
     /**
      * Check colocate table's tablet health
      * 1. Mismatch:
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
index 108eef9c1d..990153a00a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
@@ -480,10 +480,10 @@ public class TabletSchedCtx implements 
Comparable<TabletSchedCtx> {
     }
 
     /*
-     * check if existing replicas are on same BE.
+     * check if existing replicas are on same BE or Host.
      * database lock should be held.
      */
-    public boolean containsBE(long beId) {
+    public boolean filterDestBE(long beId) {
         Backend backend = infoService.getBackend(beId);
         if (backend == null) {
             // containsBE() is currently only used for choosing dest backend 
to do clone task.
@@ -497,11 +497,10 @@ public class TabletSchedCtx implements 
Comparable<TabletSchedCtx> {
                 // BE has been dropped, skip it
                 continue;
             }
-            if (host.equals(be.getHost())) {
+            if (!Config.allow_replica_on_same_host && 
!FeConstants.runningUnitTest && host.equals(be.getHost())) {
                 return true;
             }
-            // actually there is no need to check BE id anymore, because if 
hosts are not same, BE ids are
-            // not same either. But for psychological comfort, leave this 
check here.
+
             if (replica.getBackendId() == beId) {
                 return true;
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
index 5a73d0230c..294dc3fcea 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -1314,8 +1314,8 @@ public class TabletScheduler extends MasterDaemon {
                 continue;
             }
 
-            // exclude host which already has replica of this tablet
-            if (tabletCtx.containsBE(bes.getBeId())) {
+            // exclude BE which already has replica of this tablet or another 
BE at same host has this replica
+            if (tabletCtx.filterDestBE(bes.getBeId())) {
                 continue;
             }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index f4141bf36c..de5197ace4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -1578,7 +1578,7 @@ public class Config extends ConfigBase {
 
     /*
      * If set to true, when creating table, Doris will allow to locate 
replicas of a tablet
-     * on same host. And also the tablet repair and balance will be disabled.
+     * on same host.
      * This is only for local test, so that we can deploy multi BE on same 
host and create table
      * with multi replicas.
      * DO NOT use it for production env.
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
new file mode 100644
index 0000000000..07faf89a0e
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cluster;
+
+import org.apache.doris.analysis.AlterSystemStmt;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.system.Backend;
+import org.apache.doris.utframe.TestWithFeService;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class DecommissionBackendTest extends TestWithFeService {
+
+    @Override
+    protected int backendNum() {
+        return 3;
+    }
+
+    @Override
+    protected void beforeCreatingConnectContext() throws Exception {
+        FeConstants.default_scheduler_interval_millisecond = 1000;
+        FeConstants.tablet_checker_interval_ms = 1000;
+        Config.tablet_repair_delay_factor_second = 1;
+        Config.allow_replica_on_same_host = true;
+    }
+
+    @Test
+    public void testDecommissionBackend() throws Exception {
+        // 1. create connect context
+        connectContext = createDefaultCtx();
+
+        ImmutableMap<Long, Backend> idToBackendRef = 
Env.getCurrentSystemInfo().getIdToBackend();
+        Assertions.assertEquals(backendNum(), idToBackendRef.size());
+
+        // 2. create database db1
+        createDatabase("db1");
+        System.out.println(Env.getCurrentInternalCatalog().getDbNames());
+
+        // 3. create table tbl1
+        createTable("create table db1.tbl1(k1 int) distributed by hash(k1) 
buckets 3 properties('replication_num' = '1');");
+
+        // 4. query tablet num
+        int tabletNum = 
Env.getCurrentInvertedIndex().getTabletMetaMap().size();
+
+        // 5. execute decommission
+        Backend srcBackend = null;
+        for (Backend backend : idToBackendRef.values()) {
+            if 
(Env.getCurrentInvertedIndex().getTabletIdsByBackendId(backend.getId()).size() 
> 0) {
+                srcBackend = backend;
+                break;
+            }
+        }
+
+        Assertions.assertTrue(srcBackend != null);
+        String decommissionStmtStr = "alter system decommission backend 
\"127.0.0.1:" + srcBackend.getHeartbeatPort() + "\"";
+        AlterSystemStmt decommissionStmt = (AlterSystemStmt) 
parseAndAnalyzeStmt(decommissionStmtStr);
+        
Env.getCurrentEnv().getAlterInstance().processAlterCluster(decommissionStmt);
+
+        Assertions.assertEquals(true, srcBackend.isDecommissioned());
+        long startTimestamp = System.currentTimeMillis();
+        while (System.currentTimeMillis() - startTimestamp < 90000
+            && 
Env.getCurrentSystemInfo().getIdToBackend().containsKey(srcBackend.getId())) {
+            Thread.sleep(1000);
+        }
+
+        Assertions.assertEquals(backendNum() - 1, 
Env.getCurrentSystemInfo().getIdToBackend().size());
+        Assertions.assertEquals(tabletNum, 
Env.getCurrentInvertedIndex().getTabletMetaMap().size());
+
+    }
+
+}
+
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java 
b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
index c8f8f867bf..d89e029b7e 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
@@ -41,7 +41,9 @@ import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.DiskInfo;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Replica;
 import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.TabletMeta;
 import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
@@ -149,6 +151,11 @@ public abstract class TestWithFeService {
     protected void runBeforeEach() throws Exception {
     }
 
+    // Override this method if you want to start multi BE
+    protected int backendNum() {
+        return 1;
+    }
+
     // Help to create a mocked ConnectContext.
     protected ConnectContext createDefaultCtx() throws IOException {
         return createCtx(UserIdentity.ROOT, "127.0.0.1");
@@ -248,6 +255,18 @@ public abstract class TestWithFeService {
     protected int startFEServer(String runningDir)
             throws EnvVarNotSetException, IOException, FeStartException, 
NotInitException, DdlException,
             InterruptedException {
+        IOException exception = null;
+        try {
+            return startFEServerWithoutRetry(runningDir);
+        } catch (IOException ignore) {
+            exception = ignore;
+        }
+        throw exception;
+    }
+
+    protected int startFEServerWithoutRetry(String runningDir)
+            throws EnvVarNotSetException, IOException, FeStartException, 
NotInitException, DdlException,
+            InterruptedException {
         // get DORIS_HOME
         dorisHome = System.getenv("DORIS_HOME");
         if (Strings.isNullOrEmpty(dorisHome)) {
@@ -284,7 +303,7 @@ public abstract class TestWithFeService {
     protected void createDorisCluster()
             throws InterruptedException, NotInitException, IOException, 
DdlException, EnvVarNotSetException,
             FeStartException {
-        createDorisCluster(runningDir, 1);
+        createDorisCluster(runningDir, backendNum());
     }
 
     protected void createDorisCluster(String runningDir, int backendNum)
@@ -335,6 +354,18 @@ public abstract class TestWithFeService {
     }
 
     protected Backend createBackend(String beHost, int feRpcPort) throws 
IOException, InterruptedException {
+        IOException exception = null;
+        for (int i = 0; i <= 3; i++) {
+            try {
+                return createBackendWithoutRetry(beHost, feRpcPort);
+            } catch (IOException ignore) {
+                exception = ignore;
+            }
+        }
+        throw exception;
+    }
+
+    private Backend createBackendWithoutRetry(String beHost, int feRpcPort) 
throws IOException, InterruptedException {
         int beHeartbeatPort = findValidPort();
         int beThriftPort = findValidPort();
         int beBrpcPort = findValidPort();
@@ -350,6 +381,7 @@ public abstract class TestWithFeService {
         // add be
         Backend be = new Backend(Env.getCurrentEnv().getNextId(), 
backend.getHost(), backend.getHeartbeatPort());
         DiskInfo diskInfo1 = new DiskInfo("/path" + be.getId());
+        diskInfo1.setPathHash(be.getId());
         diskInfo1.setTotalCapacityB(1000000);
         diskInfo1.setAvailableCapacityB(500000);
         diskInfo1.setDataUsedCapacityB(480000);
@@ -476,6 +508,7 @@ public abstract class TestWithFeService {
             CreateTableStmt stmt = (CreateTableStmt) parseAndAnalyzeStmt(sql);
             Env.getCurrentEnv().createTable(stmt);
         }
+        updateReplicaPathHash();
     }
 
     public void createView(String sql) throws Exception {
@@ -545,6 +578,26 @@ public abstract class TestWithFeService {
         Thread.sleep(100);
     }
 
+    private void updateReplicaPathHash() {
+        com.google.common.collect.Table<Long, Long, Replica> replicaMetaTable 
= Env.getCurrentInvertedIndex().getReplicaMetaTable();
+        for (com.google.common.collect.Table.Cell<Long, Long, Replica> cell : 
replicaMetaTable.cellSet()) {
+            long beId = cell.getColumnKey();
+            Backend be = Env.getCurrentSystemInfo().getBackend(beId);
+            if (be == null) {
+                continue;
+            }
+            Replica replica = cell.getValue();
+            TabletMeta tabletMeta = 
Env.getCurrentInvertedIndex().getTabletMeta(cell.getRowKey());
+            ImmutableMap<String, DiskInfo> diskMap = be.getDisks();
+            for (DiskInfo diskInfo : diskMap.values()) {
+                if (diskInfo.getStorageMedium() == 
tabletMeta.getStorageMedium()) {
+                    replica.setPathHash(diskInfo.getPathHash());
+                    break;
+                }
+            }
+        }
+    }
+
     private void checkAlterJob() throws InterruptedException {
         // check alter job
         Map<Long, AlterJobV2> alterJobs = 
Env.getCurrentEnv().getMaterializedViewHandler().getAlterJobsV2();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to