This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new f5c058527ce [improvement](balance) partition rebalance chose disk by
rr #36826 (#36901)
f5c058527ce is described below
commit f5c058527cee79ef4518e1e0726b05fcb3fca9f7
Author: yujun <[email protected]>
AuthorDate: Thu Jun 27 13:37:07 2024 +0800
[improvement](balance) partition rebalance chose disk by rr #36826 (#36901)
cherry pick from #36826
---
.../apache/doris/clone/PartitionRebalancer.java | 5 +-
.../org/apache/doris/clone/TabletScheduler.java | 48 +++++++++-------
.../java/org/apache/doris/clone/PathSlotTest.java | 64 ++++++++++++++++++++++
3 files changed, 93 insertions(+), 24 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java
index a6b8bf04b12..fd70e5116f2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java
@@ -39,7 +39,6 @@ import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Random;
-import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
@@ -266,9 +265,9 @@ public class PartitionRebalancer extends Rebalancer {
Preconditions.checkNotNull(slot, "unable to get slot of toBe " +
move.toBe);
List<RootPathLoadStatistic> paths = beStat.getPathStatistics();
- Set<Long> availPath = paths.stream().filter(path ->
path.getStorageMedium() == tabletCtx.getStorageMedium()
+ List<Long> availPath = paths.stream().filter(path ->
path.getStorageMedium() == tabletCtx.getStorageMedium()
&& path.isFit(tabletCtx.getTabletSize(), false) ==
BalanceStatus.OK)
-
.map(RootPathLoadStatistic::getPathHash).collect(Collectors.toSet());
+
.map(RootPathLoadStatistic::getPathHash).collect(Collectors.toList());
long pathHash = slot.takeAnAvailBalanceSlotFrom(availPath);
if (pathHash == -1) {
throw new
SchedException(SchedException.Status.SCHEDULE_FAILED,
SchedException.SubCode.WAITING_SLOT,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
index 8ae51be7a96..3341f5bb305 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -1933,9 +1933,12 @@ public class TabletScheduler extends MasterDaemon {
// path hash -> slot num
private Map<Long, Slot> pathSlots = Maps.newConcurrentMap();
private long beId;
+ // only use in takeAnAvailBalanceSlotFrom, make pick RR
+ private long lastPickPathHash;
public PathSlot(Map<Long, TStorageMedium> paths, long beId) {
this.beId = beId;
+ this.lastPickPathHash = -1;
for (Map.Entry<Long, TStorageMedium> entry : paths.entrySet()) {
pathSlots.put(entry.getKey(), new Slot(entry.getValue()));
}
@@ -2046,19 +2049,6 @@ public class TabletScheduler extends MasterDaemon {
return num;
}
- /**
- * get path whose balance slot num is larger than 0
- */
- public synchronized Set<Long> getAvailPathsForBalance() {
- Set<Long> pathHashs = Sets.newHashSet();
- for (Map.Entry<Long, Slot> entry : pathSlots.entrySet()) {
- if (entry.getValue().getAvailableBalance() > 0) {
- pathHashs.add(entry.getKey());
- }
- }
- return pathHashs;
- }
-
public synchronized List<List<String>> getSlotInfo(long beId) {
List<List<String>> results = Lists.newArrayList();
pathSlots.forEach((key, value) -> {
@@ -2091,15 +2081,31 @@ public class TabletScheduler extends MasterDaemon {
return -1;
}
- public synchronized long takeAnAvailBalanceSlotFrom(Set<Long>
pathHashs) {
- for (Long pathHash : pathHashs) {
- Slot slot = pathSlots.get(pathHash);
- if (slot == null) {
- continue;
+ public long takeAnAvailBalanceSlotFrom(List<Long> pathHashs) {
+ if (pathHashs.isEmpty()) {
+ return -1;
+ }
+
+ Collections.sort(pathHashs);
+ synchronized (this) {
+ int preferSlotIndex = pathHashs.indexOf(lastPickPathHash) + 1;
+ if (preferSlotIndex < 0 || preferSlotIndex >=
pathHashs.size()) {
+ preferSlotIndex = 0;
}
- if (slot.balanceUsed < slot.getBalanceTotal()) {
- slot.balanceUsed++;
- return pathHash;
+
+ for (int i = preferSlotIndex; i < pathHashs.size(); i++) {
+ long pathHash = pathHashs.get(i);
+ if (takeBalanceSlot(pathHash) != -1) {
+ lastPickPathHash = pathHash;
+ return pathHash;
+ }
+ }
+ for (int i = 0; i < preferSlotIndex; i++) {
+ long pathHash = pathHashs.get(i);
+ if (takeBalanceSlot(pathHash) != -1) {
+ lastPickPathHash = pathHash;
+ return pathHash;
+ }
}
}
return -1;
diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/PathSlotTest.java
b/fe/fe-core/src/test/java/org/apache/doris/clone/PathSlotTest.java
new file mode 100644
index 00000000000..e26e3042fb8
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/PathSlotTest.java
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.clone;
+
+import org.apache.doris.clone.TabletScheduler.PathSlot;
+import org.apache.doris.common.Config;
+import org.apache.doris.thrift.TStorageMedium;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+class PathSlotTest {
+
+ @Test
+ public void test() {
+ Config.balance_slot_num_per_path = 2;
+ Map<Long, TStorageMedium> paths = Maps.newHashMap();
+ List<Long> availPathHashs = Lists.newArrayList();
+ List<Long> expectPathHashs = Lists.newArrayList();
+ List<Long> gotPathHashs = Lists.newArrayList();
+ long startPath = 10001L;
+ long endPath = 10006L;
+ for (long pathHash = startPath; pathHash < endPath; pathHash++) {
+ paths.put(pathHash, TStorageMedium.HDD);
+ availPathHashs.add(pathHash);
+ expectPathHashs.add(pathHash);
+ }
+ for (long pathHash = startPath; pathHash < endPath; pathHash++) {
+ expectPathHashs.add(pathHash);
+ }
+ for (long pathHash = startPath; pathHash < endPath; pathHash++) {
+ expectPathHashs.add(-1L);
+ }
+
+ PathSlot ps = new PathSlot(paths, 1L);
+ for (int i = 0; i < expectPathHashs.size(); i++) {
+ Collections.shuffle(availPathHashs);
+ gotPathHashs.add(ps.takeAnAvailBalanceSlotFrom(availPathHashs));
+ }
+ Assert.assertEquals(expectPathHashs, gotPathHashs);
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]