This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a2d82220e2a [fix](tablet clone) clone add replica prefer choose the
same medium (#25640)
a2d82220e2a is described below
commit a2d82220e2aa6003cf1a2e5f53c292e101c7a023
Author: yujun <[email protected]>
AuthorDate: Tue Oct 24 15:18:32 2023 +0800
[fix](tablet clone) clone add replica prefer choose the same medium (#25640)
---
.../apache/doris/clone/BackendLoadStatistic.java | 2 +-
.../clone/ColocateTableCheckerAndBalancer.java | 2 +-
.../apache/doris/clone/RootPathLoadStatistic.java | 10 ++-
.../org/apache/doris/clone/TabletScheduler.java | 42 +++++-----
.../doris/clone/AddReplicaChoseMediumTest.java | 97 ++++++++++++++++++++++
.../org/apache/doris/clone/DecommissionTest.java | 4 +-
.../doris/clone/TabletRepairAndBalanceTest.java | 4 +-
.../apache/doris/utframe/TestWithFeService.java | 4 +-
.../org/apache/doris/utframe/UtFrameUtils.java | 4 +-
9 files changed, 133 insertions(+), 36 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java
index 2d6401b8eb7..c23bcd76329 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java
@@ -414,7 +414,7 @@ public class BackendLoadStatistic {
for (int i = 0; i < pathStatistics.size(); i++) {
RootPathLoadStatistic pathStatistic = pathStatistics.get(i);
// if this is a supplement task, ignore the storage medium
- if (!isSupplement && pathStatistic.getStorageMedium() != medium) {
+ if (!isSupplement && medium != null &&
pathStatistic.getStorageMedium() != medium) {
LOG.debug("backend {} path {}'s storage medium {} is not {}
storage medium, actual: {}",
beId, pathStatistic.getPath(),
pathStatistic.getStorageMedium(), medium);
continue;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
index 358185dd609..89259600530 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
@@ -866,7 +866,7 @@ public class ColocateTableCheckerAndBalancer extends
MasterDaemon {
globalColocateStatistic.getBucketTotalReplicaDataSize(groupId, bucketIndex);
resultPaths.clear();
- BalanceStatus st = beStat.isFit(bucketDataSize, null,
resultPaths, true);
+ BalanceStatus st = beStat.isFit(bucketDataSize, null,
resultPaths, false);
if (!st.ok()) {
LOG.debug("backend {} is unable to fit in group {},
tablet order idx {}, data size {}",
destBeId, groupId, bucketIndex,
bucketDataSize);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/RootPathLoadStatistic.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/RootPathLoadStatistic.java
index 1a51276f7d6..d2f1983a831 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/RootPathLoadStatistic.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/RootPathLoadStatistic.java
@@ -98,9 +98,11 @@ public class RootPathLoadStatistic implements
Comparable<RootPathLoadStatistic>
toString() + " does not fit tablet with size: " +
tabletSize + ", offline");
}
+ double newUsagePerc = (usedCapacityB + tabletSize) / (double)
capacityB;
+ long newLeftCapacity = capacityB - usedCapacityB - tabletSize;
if (isSupplement) {
- if ((usedCapacityB + tabletSize) / (double) capacityB >
(Config.storage_flood_stage_usage_percent / 100.0)
- && capacityB - usedCapacityB - tabletSize <
Config.storage_flood_stage_left_capacity_bytes) {
+ if (newUsagePerc > (Config.storage_flood_stage_usage_percent /
100.0)
+ || newLeftCapacity <
Config.storage_flood_stage_left_capacity_bytes) {
return new BalanceStatus(ErrCode.COMMON_ERROR,
toString() + " does not fit tablet with size: " +
tabletSize + ", limitation reached");
} else {
@@ -108,8 +110,8 @@ public class RootPathLoadStatistic implements
Comparable<RootPathLoadStatistic>
}
}
- if ((usedCapacityB + tabletSize) / (double) capacityB >
(Config.storage_high_watermark_usage_percent / 100.0)
- || capacityB - usedCapacityB - tabletSize <
Config.storage_min_left_capacity_bytes) {
+ if (newUsagePerc > (Config.storage_high_watermark_usage_percent /
100.0)
+ || newLeftCapacity < Config.storage_min_left_capacity_bytes) {
return new BalanceStatus(ErrCode.COMMON_ERROR,
toString() + " does not fit tablet with size: " +
tabletSize);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
index b3cbe4546b9..572199f941d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -1318,7 +1318,8 @@ public class TabletScheduler extends MasterDaemon {
// get all available paths which this tablet can fit in.
// beStatistics is sorted by mix load score in ascend order, so select
from first to last.
- List<BePathLoadStatPair> allFitPaths = Lists.newArrayList();
+ List<BePathLoadStatPair> allFitPathsSameMedium = Lists.newArrayList();
+ List<BePathLoadStatPair> allFitPathsDiffMedium = Lists.newArrayList();
for (BackendLoadStatistic bes : beStatistics) {
if (!bes.isAvailable()) {
LOG.debug("backend {} is not available, skip. tablet: {}",
bes.getBeId(), tabletCtx.getTabletId());
@@ -1349,27 +1350,27 @@ public class TabletScheduler extends MasterDaemon {
List<RootPathLoadStatistic> resultPaths = Lists.newArrayList();
BalanceStatus st = bes.isFit(tabletCtx.getTabletSize(),
tabletCtx.getStorageMedium(),
- resultPaths, tabletCtx.getTabletStatus() !=
TabletStatus.REPLICA_RELOCATING
- /* if REPLICA_RELOCATING, then it is not a supplement task
*/);
- if (!st.ok()) {
- LOG.debug("unable to find path for tablet: {}. {}", tabletCtx,
st);
- // This is to solve, when we decommission some BEs with SSD
disks,
- // if there are no SSD disks on the remaining BEs, it will be
impossible to select a
- // suitable destination path.
- // In this case, we need to ignore the storage medium property
- // and try to select the destination path again.
- // Set `isSupplement` to true will ignore the storage medium
property.
- st = bes.isFit(tabletCtx.getTabletSize(),
tabletCtx.getStorageMedium(),
- resultPaths, true);
- if (!st.ok()) {
- LOG.debug("unable to find path for supplementing tablet:
{}. {}", tabletCtx, st);
- continue;
+ resultPaths, false);
+ if (st.ok()) {
+ resultPaths.stream().forEach(path ->
allFitPathsSameMedium.add(new BePathLoadStatPair(bes, path)));
+ } else {
+ LOG.debug("backend {} unable to find path for tablet: {}. {}",
bes.getBeId(), tabletCtx, st);
+ resultPaths.clear();
+ st = bes.isFit(tabletCtx.getTabletSize(),
tabletCtx.getStorageMedium(), resultPaths, true);
+ if (st.ok()) {
+ resultPaths.stream().forEach(path ->
allFitPathsDiffMedium.add(new BePathLoadStatPair(bes, path)));
+ } else {
+ LOG.debug("backend {} unable to find path for
supplementing tablet: {}. {}",
+ bes.getBeId(), tabletCtx, st);
}
}
-
- resultPaths.stream().forEach(path -> allFitPaths.add(new
BePathLoadStatPair(bes, path)));
}
+ // all fit paths has already been sorted by load score in
'allFitPaths' in ascend order.
+ // just get first available path.
+ // we try to find a path with specified media type, if not find,
arbitrarily use one.
+ List<BePathLoadStatPair> allFitPaths =
+ !allFitPathsSameMedium.isEmpty() ? allFitPathsSameMedium :
allFitPathsDiffMedium;
if (allFitPaths.isEmpty()) {
throw new SchedException(Status.UNRECOVERABLE, "unable to find
dest path for new replica");
}
@@ -1377,14 +1378,11 @@ public class TabletScheduler extends MasterDaemon {
BePathLoadStatPairComparator comparator = new
BePathLoadStatPairComparator(allFitPaths);
Collections.sort(allFitPaths, comparator);
- // all fit paths has already been sorted by load score in
'allFitPaths' in ascend order.
- // just get first available path.
- // we try to find a path with specified media type, if not find,
arbitrarily use one.
for (BePathLoadStatPair bePathLoadStat : allFitPaths) {
RootPathLoadStatistic rootPathLoadStatistic =
bePathLoadStat.getPathLoadStatistic();
if (rootPathLoadStatistic.getStorageMedium() !=
tabletCtx.getStorageMedium()) {
LOG.debug("backend {}'s path {}'s storage medium {} "
- + "is not equal to tablet's storage medium {},
skip. tablet: {}",
+ + "is not equal to tablet's storage medium {}, skip.
tablet: {}",
rootPathLoadStatistic.getBeId(),
rootPathLoadStatistic.getPathHash(),
rootPathLoadStatistic.getStorageMedium(),
tabletCtx.getStorageMedium(),
tabletCtx.getTabletId());
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/clone/AddReplicaChoseMediumTest.java
b/fe/fe-core/src/test/java/org/apache/doris/clone/AddReplicaChoseMediumTest.java
new file mode 100644
index 00000000000..dee048223b5
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/clone/AddReplicaChoseMediumTest.java
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.clone;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.TabletInvertedIndex;
+import org.apache.doris.common.Config;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TStorageMedium;
+import org.apache.doris.utframe.TestWithFeService;
+
+import com.google.common.collect.Lists;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class AddReplicaChoseMediumTest extends TestWithFeService {
+
+ @Override
+ protected void beforeCreatingConnectContext() throws Exception {
+ Config.enable_round_robin_create_tablet = true;
+ Config.allow_replica_on_same_host = true;
+ Config.tablet_checker_interval_ms = 100;
+ Config.tablet_schedule_interval_ms = 100;
+ Config.schedule_slot_num_per_hdd_path = 1;
+ }
+
+ @Override
+ protected int backendNum() {
+ return 4;
+ }
+
+ @Test
+ public void testAddReplicaChoseMedium() throws Exception {
+ TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
+ List<Backend> backends = Env.getCurrentSystemInfo().getAllBackends();
+ Assertions.assertEquals(backendNum(), backends.size());
+ for (Backend be : backends) {
+ Assertions.assertEquals(0,
invertedIndex.getTabletNumByBackendId(be.getId()));
+ }
+
+ Backend beWithSsd = backends.get(3);
+ beWithSsd.getDisks().values().forEach(it -> {
+ it.setStorageMedium(TStorageMedium.SSD);
+ it.setTotalCapacityB(it.getTotalCapacityB() * 2);
+ });
+
+ createDatabase("test");
+ createTable("CREATE TABLE test.tbl1 (k INT) DISTRIBUTED BY HASH(k) "
+ + " BUCKETS 12 PROPERTIES ( \"replication_num\" = \"2\","
+ + " \"storage_medium\" = \"HDD\")");
+ RebalancerTestUtil.updateReplicaPathHash();
+
+ Assertions.assertEquals(0,
invertedIndex.getTabletNumByBackendId(beWithSsd.getId()));
+ for (Backend be : backends) {
+ if (be.getId() != beWithSsd.getId()) {
+ Assertions.assertEquals(8,
invertedIndex.getTabletNumByBackendId(be.getId()));
+ }
+ }
+
+ Backend decommissionBe = backends.get(0);
+ String decommissionStmtStr = "alter system decommission backend \"" +
decommissionBe.getHost()
+ + ":" + decommissionBe.getHeartbeatPort() + "\"";
+ Assertions.assertNotNull(getSqlStmtExecutor(decommissionStmtStr));
+ Assertions.assertTrue(decommissionBe.isDecommissioned());
+
+ List<Integer> gotTabletNums = null;
+ List<Integer> expectTabletNums = Lists.newArrayList(0, 12, 12, 0);
+ for (int i = 0; i < 10; i++) {
+ gotTabletNums = backends.stream().map(it ->
invertedIndex.getTabletNumByBackendId(it.getId()))
+ .collect(Collectors.toList());
+ if (expectTabletNums.equals(gotTabletNums)) {
+ break;
+ }
+ Thread.sleep(1000);
+ }
+
+ Assertions.assertEquals(expectTabletNums, gotTabletNums);
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/clone/DecommissionTest.java
b/fe/fe-core/src/test/java/org/apache/doris/clone/DecommissionTest.java
index 1764573d0fc..83393c47f17 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/clone/DecommissionTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/DecommissionTest.java
@@ -85,7 +85,7 @@ public class DecommissionTest {
Map<String, TDisk> backendDisks = Maps.newHashMap();
TDisk tDisk1 = new TDisk();
tDisk1.setRootPath("/home/doris1.HDD");
- tDisk1.setDiskTotalCapacity(20000000);
+ tDisk1.setDiskTotalCapacity(10L << 30);
tDisk1.setDataUsedCapacity(1);
tDisk1.setUsed(true);
tDisk1.setDiskAvailableCapacity(tDisk1.disk_total_capacity -
tDisk1.data_used_capacity);
@@ -95,7 +95,7 @@ public class DecommissionTest {
TDisk tDisk2 = new TDisk();
tDisk2.setRootPath("/home/doris2.HHD");
- tDisk2.setDiskTotalCapacity(20000000);
+ tDisk2.setDiskTotalCapacity(10L << 30);
tDisk2.setDataUsedCapacity(1);
tDisk2.setUsed(true);
tDisk2.setDiskAvailableCapacity(tDisk2.disk_total_capacity -
tDisk2.data_used_capacity);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java
b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java
index 3e178ef0903..9b43d34281f 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java
@@ -128,7 +128,7 @@ public class TabletRepairAndBalanceTest {
Map<String, TDisk> backendDisks = Maps.newHashMap();
TDisk tDisk1 = new TDisk();
tDisk1.setRootPath("/home/doris.HDD");
- tDisk1.setDiskTotalCapacity(2000000000);
+ tDisk1.setDiskTotalCapacity(10L << 30);
tDisk1.setDataUsedCapacity(1);
tDisk1.setUsed(true);
tDisk1.setDiskAvailableCapacity(tDisk1.disk_total_capacity -
tDisk1.data_used_capacity);
@@ -138,7 +138,7 @@ public class TabletRepairAndBalanceTest {
TDisk tDisk2 = new TDisk();
tDisk2.setRootPath("/home/doris.SSD");
- tDisk2.setDiskTotalCapacity(2000000000);
+ tDisk2.setDiskTotalCapacity(10L << 30);
tDisk2.setDataUsedCapacity(1);
tDisk2.setUsed(true);
tDisk2.setDiskAvailableCapacity(tDisk2.disk_total_capacity -
tDisk2.data_used_capacity);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
index 160bdee462e..880063196e5 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
@@ -466,8 +466,8 @@ public abstract class TestWithFeService {
Backend be = new Backend(Env.getCurrentEnv().getNextId(),
backend.getHost(), backend.getHeartbeatPort());
DiskInfo diskInfo1 = new DiskInfo("/path" + be.getId());
diskInfo1.setPathHash(be.getId());
- diskInfo1.setTotalCapacityB(1000000);
- diskInfo1.setAvailableCapacityB(500000);
+ diskInfo1.setTotalCapacityB(10L << 30);
+ diskInfo1.setAvailableCapacityB(5L << 30);
diskInfo1.setDataUsedCapacityB(480000);
diskInfo1.setPathHash(be.getId());
Map<String, DiskInfo> disks = Maps.newHashMap();
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java
b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java
index db3bc4dadb8..182450322c9 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java
@@ -298,8 +298,8 @@ public class UtFrameUtils {
Backend be = new Backend(Env.getCurrentEnv().getNextId(),
backend.getHost(), backend.getHeartbeatPort());
Map<String, DiskInfo> disks = Maps.newHashMap();
DiskInfo diskInfo1 = new DiskInfo("/path" + be.getId());
- diskInfo1.setTotalCapacityB(1000000);
- diskInfo1.setAvailableCapacityB(500000);
+ diskInfo1.setTotalCapacityB(10L << 30);
+ diskInfo1.setAvailableCapacityB(5L << 30);
diskInfo1.setDataUsedCapacityB(480000);
diskInfo1.setPathHash(be.getId());
disks.put(diskInfo1.getRootPath(), diskInfo1);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]