This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 620a137bd7 [enhancement](test) support tablet repair and balance
process in ut (#13940)
620a137bd7 is described below
commit 620a137bd77df22016805e060f9762d5d311ee5d
Author: wxy <[email protected]>
AuthorDate: Sat Nov 5 19:20:23 2022 +0800
[enhancement](test) support tablet repair and balance process in ut (#13940)
---
.../main/java/org/apache/doris/catalog/Tablet.java | 9 ++-
.../org/apache/doris/clone/TabletSchedCtx.java | 9 +--
.../org/apache/doris/clone/TabletScheduler.java | 4 +-
.../main/java/org/apache/doris/common/Config.java | 2 +-
.../doris/cluster/DecommissionBackendTest.java | 91 ++++++++++++++++++++++
.../apache/doris/utframe/TestWithFeService.java | 55 ++++++++++++-
6 files changed, 159 insertions(+), 11 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
index 1af5cb5ad9..fbe60b74c8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
@@ -21,6 +21,7 @@ import org.apache.doris.catalog.Replica.ReplicaState;
import org.apache.doris.clone.TabletSchedCtx;
import org.apache.doris.clone.TabletSchedCtx.Priority;
import org.apache.doris.common.Config;
+import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.common.io.Writable;
import org.apache.doris.resource.Tag;
@@ -429,8 +430,8 @@ public class Tablet extends MetaObject implements Writable {
ArrayList<Long> versions = new ArrayList<>();
for (Replica replica : replicas) {
Backend backend =
systemInfoService.getBackend(replica.getBackendId());
- if (backend == null || !backend.isAlive() || !replica.isAlive() ||
!hosts.add(backend.getHost())
- || replica.tooSlow() || !backend.isMixNode()) {
+ if (backend == null || !backend.isAlive() || !replica.isAlive()
+ || checkHost(hosts, backend) || replica.tooSlow() ||
!backend.isMixNode()) {
// this replica is not alive,
// or if this replica is on same host with another replica, we
also treat it as 'dead',
// so that Tablet Scheduler will create a new replica on
different host.
@@ -563,6 +564,10 @@ public class Tablet extends MetaObject implements Writable
{
return Pair.of(TabletStatus.HEALTHY, TabletSchedCtx.Priority.NORMAL);
}
+ private boolean checkHost(Set<String> hosts, Backend backend) {
+ return !Config.allow_replica_on_same_host &&
!FeConstants.runningUnitTest && !hosts.add(backend.getHost());
+ }
+
/**
* Check colocate table's tablet health
* 1. Mismatch:
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
index 108eef9c1d..990153a00a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
@@ -480,10 +480,10 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
}
/*
- * check if existing replicas are on same BE.
+ * check if existing replicas are on same BE or Host.
* database lock should be held.
*/
- public boolean containsBE(long beId) {
+ public boolean filterDestBE(long beId) {
Backend backend = infoService.getBackend(beId);
if (backend == null) {
// containsBE() is currently only used for choosing dest backend
to do clone task.
@@ -497,11 +497,10 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
// BE has been dropped, skip it
continue;
}
- if (host.equals(be.getHost())) {
+ if (!Config.allow_replica_on_same_host &&
!FeConstants.runningUnitTest && host.equals(be.getHost())) {
return true;
}
- // actually there is no need to check BE id anymore, because if
hosts are not same, BE ids are
- // not same either. But for psychological comfort, leave this
check here.
+
if (replica.getBackendId() == beId) {
return true;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
index 5a73d0230c..294dc3fcea 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -1314,8 +1314,8 @@ public class TabletScheduler extends MasterDaemon {
continue;
}
- // exclude host which already has replica of this tablet
- if (tabletCtx.containsBE(bes.getBeId())) {
+ // exclude BE which already has replica of this tablet or another
BE at same host has this replica
+ if (tabletCtx.filterDestBE(bes.getBeId())) {
continue;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index f4141bf36c..de5197ace4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -1578,7 +1578,7 @@ public class Config extends ConfigBase {
/*
* If set to true, when creating table, Doris will allow to locate
replicas of a tablet
- * on same host. And also the tablet repair and balance will be disabled.
+ * on same host.
* This is only for local test, so that we can deploy multi BE on same
host and create table
* with multi replicas.
* DO NOT use it for production env.
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
new file mode 100644
index 0000000000..07faf89a0e
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cluster;
+
+import org.apache.doris.analysis.AlterSystemStmt;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.system.Backend;
+import org.apache.doris.utframe.TestWithFeService;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class DecommissionBackendTest extends TestWithFeService {
+
+ @Override
+ protected int backendNum() {
+ return 3;
+ }
+
+ @Override
+ protected void beforeCreatingConnectContext() throws Exception {
+ FeConstants.default_scheduler_interval_millisecond = 1000;
+ FeConstants.tablet_checker_interval_ms = 1000;
+ Config.tablet_repair_delay_factor_second = 1;
+ Config.allow_replica_on_same_host = true;
+ }
+
+ @Test
+ public void testDecommissionBackend() throws Exception {
+ // 1. create connect context
+ connectContext = createDefaultCtx();
+
+ ImmutableMap<Long, Backend> idToBackendRef =
Env.getCurrentSystemInfo().getIdToBackend();
+ Assertions.assertEquals(backendNum(), idToBackendRef.size());
+
+ // 2. create database db1
+ createDatabase("db1");
+ System.out.println(Env.getCurrentInternalCatalog().getDbNames());
+
+ // 3. create table tbl1
+ createTable("create table db1.tbl1(k1 int) distributed by hash(k1)
buckets 3 properties('replication_num' = '1');");
+
+ // 4. query tablet num
+ int tabletNum =
Env.getCurrentInvertedIndex().getTabletMetaMap().size();
+
+ // 5. execute decommission
+ Backend srcBackend = null;
+ for (Backend backend : idToBackendRef.values()) {
+ if
(Env.getCurrentInvertedIndex().getTabletIdsByBackendId(backend.getId()).size()
> 0) {
+ srcBackend = backend;
+ break;
+ }
+ }
+
+ Assertions.assertTrue(srcBackend != null);
+ String decommissionStmtStr = "alter system decommission backend
\"127.0.0.1:" + srcBackend.getHeartbeatPort() + "\"";
+ AlterSystemStmt decommissionStmt = (AlterSystemStmt)
parseAndAnalyzeStmt(decommissionStmtStr);
+
Env.getCurrentEnv().getAlterInstance().processAlterCluster(decommissionStmt);
+
+ Assertions.assertEquals(true, srcBackend.isDecommissioned());
+ long startTimestamp = System.currentTimeMillis();
+ while (System.currentTimeMillis() - startTimestamp < 90000
+ &&
Env.getCurrentSystemInfo().getIdToBackend().containsKey(srcBackend.getId())) {
+ Thread.sleep(1000);
+ }
+
+ Assertions.assertEquals(backendNum() - 1,
Env.getCurrentSystemInfo().getIdToBackend().size());
+ Assertions.assertEquals(tabletNum,
Env.getCurrentInvertedIndex().getTabletMetaMap().size());
+
+ }
+
+}
+
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
index c8f8f867bf..d89e029b7e 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
@@ -41,7 +41,9 @@ import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DiskInfo;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
@@ -149,6 +151,11 @@ public abstract class TestWithFeService {
protected void runBeforeEach() throws Exception {
}
+ // Override this method if you want to start multi BE
+ protected int backendNum() {
+ return 1;
+ }
+
// Help to create a mocked ConnectContext.
protected ConnectContext createDefaultCtx() throws IOException {
return createCtx(UserIdentity.ROOT, "127.0.0.1");
@@ -248,6 +255,18 @@ public abstract class TestWithFeService {
protected int startFEServer(String runningDir)
throws EnvVarNotSetException, IOException, FeStartException,
NotInitException, DdlException,
InterruptedException {
+ IOException exception = null;
+ try {
+ return startFEServerWithoutRetry(runningDir);
+ } catch (IOException ignore) {
+ exception = ignore;
+ }
+ throw exception;
+ }
+
+ protected int startFEServerWithoutRetry(String runningDir)
+ throws EnvVarNotSetException, IOException, FeStartException,
NotInitException, DdlException,
+ InterruptedException {
// get DORIS_HOME
dorisHome = System.getenv("DORIS_HOME");
if (Strings.isNullOrEmpty(dorisHome)) {
@@ -284,7 +303,7 @@ public abstract class TestWithFeService {
protected void createDorisCluster()
throws InterruptedException, NotInitException, IOException,
DdlException, EnvVarNotSetException,
FeStartException {
- createDorisCluster(runningDir, 1);
+ createDorisCluster(runningDir, backendNum());
}
protected void createDorisCluster(String runningDir, int backendNum)
@@ -335,6 +354,18 @@ public abstract class TestWithFeService {
}
protected Backend createBackend(String beHost, int feRpcPort) throws
IOException, InterruptedException {
+ IOException exception = null;
+ for (int i = 0; i <= 3; i++) {
+ try {
+ return createBackendWithoutRetry(beHost, feRpcPort);
+ } catch (IOException ignore) {
+ exception = ignore;
+ }
+ }
+ throw exception;
+ }
+
+ private Backend createBackendWithoutRetry(String beHost, int feRpcPort)
throws IOException, InterruptedException {
int beHeartbeatPort = findValidPort();
int beThriftPort = findValidPort();
int beBrpcPort = findValidPort();
@@ -350,6 +381,7 @@ public abstract class TestWithFeService {
// add be
Backend be = new Backend(Env.getCurrentEnv().getNextId(),
backend.getHost(), backend.getHeartbeatPort());
DiskInfo diskInfo1 = new DiskInfo("/path" + be.getId());
+ diskInfo1.setPathHash(be.getId());
diskInfo1.setTotalCapacityB(1000000);
diskInfo1.setAvailableCapacityB(500000);
diskInfo1.setDataUsedCapacityB(480000);
@@ -476,6 +508,7 @@ public abstract class TestWithFeService {
CreateTableStmt stmt = (CreateTableStmt) parseAndAnalyzeStmt(sql);
Env.getCurrentEnv().createTable(stmt);
}
+ updateReplicaPathHash();
}
public void createView(String sql) throws Exception {
@@ -545,6 +578,26 @@ public abstract class TestWithFeService {
Thread.sleep(100);
}
+ private void updateReplicaPathHash() {
+ com.google.common.collect.Table<Long, Long, Replica> replicaMetaTable
= Env.getCurrentInvertedIndex().getReplicaMetaTable();
+ for (com.google.common.collect.Table.Cell<Long, Long, Replica> cell :
replicaMetaTable.cellSet()) {
+ long beId = cell.getColumnKey();
+ Backend be = Env.getCurrentSystemInfo().getBackend(beId);
+ if (be == null) {
+ continue;
+ }
+ Replica replica = cell.getValue();
+ TabletMeta tabletMeta =
Env.getCurrentInvertedIndex().getTabletMeta(cell.getRowKey());
+ ImmutableMap<String, DiskInfo> diskMap = be.getDisks();
+ for (DiskInfo diskInfo : diskMap.values()) {
+ if (diskInfo.getStorageMedium() ==
tabletMeta.getStorageMedium()) {
+ replica.setPathHash(diskInfo.getPathHash());
+ break;
+ }
+ }
+ }
+ }
+
private void checkAlterJob() throws InterruptedException {
// check alter job
Map<Long, AlterJobV2> alterJobs =
Env.getCurrentEnv().getMaterializedViewHandler().getAlterJobsV2();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]