This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c92e090cd04 [improvement](partition rebalance) improve partition
rebalance choose candidate speed (#36509)
c92e090cd04 is described below
commit c92e090cd04a2530630b310b1a7aecbc50dc1407
Author: yujun <[email protected]>
AuthorDate: Thu Jun 27 22:56:24 2024 +0800
[improvement](partition rebalance) improve partition rebalance choose
candidate speed (#36509)
When partition reblancer choose candidate tablets, it will call
tabletListOfA.removeAll(tabletListOfB), but list.removeAll(list)'s
runtime is O(n^2). Then if each BE contains 10w+ tablets, it's rather
slow. And we found a online case the tablet scheduler thread is busy at
it.
So need improve this search.
---
.../apache/doris/clone/PartitionRebalancer.java | 79 +++++++++++++++-------
.../java/org/apache/doris/clone/Rebalancer.java | 3 +
.../org/apache/doris/clone/TabletScheduler.java | 13 ++--
.../java/org/apache/doris/clone/PathSlotTest.java | 5 +-
.../doris/cluster/DecommissionBackendTest.java | 1 -
5 files changed, 66 insertions(+), 35 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java
index 23e13e9161b..7095ad8dc54 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java
@@ -30,8 +30,8 @@ import org.apache.doris.thrift.TStorageMedium;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
+import com.google.common.collect.Sets;
import com.google.common.collect.TreeMultimap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -41,7 +41,9 @@ import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Random;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiPredicate;
import java.util.stream.Collectors;
/*
@@ -121,44 +123,64 @@ public class PartitionRebalancer extends Rebalancer {
= algo.getNextMoves(clusterBalanceInfo,
Config.partition_rebalance_max_moves_num_per_selection);
List<TabletSchedCtx> alternativeTablets = Lists.newArrayList();
- List<Long> inProgressIds = movesInProgressList.stream().map(m ->
m.tabletId).collect(Collectors.toList());
+ Set<Long> inProgressIds = movesInProgressList.stream().map(m ->
m.tabletId).collect(Collectors.toSet());
+ Random rand = new SecureRandom();
for (TwoDimensionalGreedyRebalanceAlgo.PartitionMove move : moves) {
// Find all tablets of the specified partition that would have a
replica at the source be,
// but would not have a replica at the destination be. That is to
satisfy the restriction
// of having no more than one replica of the same tablet per be.
List<Long> tabletIds =
invertedIndex.getTabletIdsByBackendIdAndStorageMedium(move.fromBe, medium);
- List<Long> invalidIds =
invertedIndex.getTabletIdsByBackendIdAndStorageMedium(move.toBe, medium);
- tabletIds.removeAll(invalidIds);
- // In-progress tablets can't be the candidate too.
- tabletIds.removeAll(inProgressIds);
+ if (tabletIds.isEmpty()) {
+ continue;
+ }
+
+ Set<Long> invalidIds = Sets.newHashSet(
+
invertedIndex.getTabletIdsByBackendIdAndStorageMedium(move.toBe, medium));
- Map<Long, TabletMeta> tabletCandidates = Maps.newHashMap();
- for (long tabletId : tabletIds) {
+ BiPredicate<Long, TabletMeta> canMoveTablet = (Long tabletId,
TabletMeta tabletMeta) -> {
+ return tabletMeta != null
+ && tabletMeta.getPartitionId() == move.partitionId
+ && tabletMeta.getIndexId() == move.indexId
+ && !invalidIds.contains(tabletId)
+ && !inProgressIds.contains(tabletId);
+ };
+
+ // Random pick one candidate to create tabletSchedCtx
+ int startIdx = rand.nextInt(tabletIds.size());
+ long pickedTabletId = -1L;
+ TabletMeta pickedTabletMeta = null;
+ for (int i = startIdx; i < tabletIds.size(); i++) {
+ long tabletId = tabletIds.get(i);
TabletMeta tabletMeta = invertedIndex.getTabletMeta(tabletId);
- if (tabletMeta != null && tabletMeta.getPartitionId() ==
move.partitionId
- && tabletMeta.getIndexId() == move.indexId) {
- tabletCandidates.put(tabletId, tabletMeta);
+ if (canMoveTablet.test(tabletId, tabletMeta)) {
+ pickedTabletId = tabletId;
+ pickedTabletMeta = tabletMeta;
+ break;
}
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("Find {} candidates for move {}",
tabletCandidates.size(), move);
- }
- if (tabletCandidates.isEmpty()) {
- continue;
+
+ if (pickedTabletId == -1L) {
+ for (int i = 0; i < startIdx; i++) {
+ long tabletId = tabletIds.get(i);
+ TabletMeta tabletMeta =
invertedIndex.getTabletMeta(tabletId);
+ if (canMoveTablet.test(tabletId, tabletMeta)) {
+ pickedTabletId = tabletId;
+ pickedTabletMeta = tabletMeta;
+ break;
+ }
+ }
}
- // Random pick one candidate to create tabletSchedCtx
- Random rand = new SecureRandom();
- Object[] keys = tabletCandidates.keySet().toArray();
- long pickedTabletId = (long) keys[rand.nextInt(keys.length)];
- if (LOG.isDebugEnabled()) {
- LOG.debug("Picked tablet id for move {}: {}", move,
pickedTabletId);
+ if (pickedTabletId == -1L) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cann't picked tablet id for move {}", move);
+ }
+ continue;
}
- TabletMeta tabletMeta = tabletCandidates.get(pickedTabletId);
TabletSchedCtx tabletCtx = new
TabletSchedCtx(TabletSchedCtx.Type.BALANCE,
- tabletMeta.getDbId(), tabletMeta.getTableId(),
tabletMeta.getPartitionId(),
- tabletMeta.getIndexId(), pickedTabletId, null /* replica
alloc is not used for balance*/,
+ pickedTabletMeta.getDbId(), pickedTabletMeta.getTableId(),
pickedTabletMeta.getPartitionId(),
+ pickedTabletMeta.getIndexId(), pickedTabletId, null /*
replica alloc is not used for balance*/,
System.currentTimeMillis());
tabletCtx.setTag(clusterStat.getTag());
// Balance task's priority is always LOW
@@ -282,7 +304,7 @@ public class PartitionRebalancer extends Rebalancer {
List<Long> availPath = paths.stream().filter(path ->
path.getStorageMedium() == tabletCtx.getStorageMedium()
&& path.isFit(tabletCtx.getTabletSize(), false) ==
BalanceStatus.OK)
.map(RootPathLoadStatistic::getPathHash).collect(Collectors.toList());
- long pathHash = slot.takeAnAvailBalanceSlotFrom(availPath);
+ long pathHash = slot.takeAnAvailBalanceSlotFrom(availPath,
tabletCtx.getStorageMedium());
if (pathHash == -1) {
throw new
SchedException(SchedException.Status.SCHEDULE_FAILED, SubCode.WAITING_SLOT,
"paths has no available balance slot: " + availPath);
@@ -329,6 +351,11 @@ public class PartitionRebalancer extends Rebalancer {
}
}
+ @Override
+ public void invalidateToDeleteReplicaId(TabletSchedCtx tabletCtx) {
+ movesCacheMap.invalidateTablet(tabletCtx);
+ }
+
@Override
public void updateLoadStatistic(Map<Tag, LoadStatisticForTag>
statisticMap) {
super.updateLoadStatistic(statisticMap);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java
index afcce785393..682c2915989 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java
@@ -129,6 +129,9 @@ public abstract class Rebalancer {
return -1L;
}
+ public void invalidateToDeleteReplicaId(TabletSchedCtx tabletCtx) {
+ }
+
public void onTabletFailed(TabletSchedCtx tabletCtx) {
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
index bf8b2a74f25..df0fb2309b3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -994,7 +994,9 @@ public class TabletScheduler extends MasterDaemon {
if (chosenReplica == null) {
return false;
}
+
deleteReplicaInternal(tabletCtx, chosenReplica, "src replica of
rebalance", force);
+ rebalancer.invalidateToDeleteReplicaId(tabletCtx);
return true;
}
@@ -2006,11 +2008,10 @@ public class TabletScheduler extends MasterDaemon {
private Map<Long, Slot> pathSlots = Maps.newConcurrentMap();
private long beId;
// only use in takeAnAvailBalanceSlotFrom, make pick RR
- private long lastPickPathHash;
+ private Map<TStorageMedium, Long> lastPickPathHashs =
Maps.newHashMap();
public PathSlot(Map<Long, TStorageMedium> paths, long beId) {
this.beId = beId;
- this.lastPickPathHash = -1;
for (Map.Entry<Long, TStorageMedium> entry : paths.entrySet()) {
pathSlots.put(entry.getKey(), new Slot(entry.getValue()));
}
@@ -2157,14 +2158,14 @@ public class TabletScheduler extends MasterDaemon {
return -1;
}
- public long takeAnAvailBalanceSlotFrom(List<Long> pathHashs) {
+ public long takeAnAvailBalanceSlotFrom(List<Long> pathHashs,
TStorageMedium medium) {
if (pathHashs.isEmpty()) {
return -1;
}
Collections.sort(pathHashs);
synchronized (this) {
- int preferSlotIndex = pathHashs.indexOf(lastPickPathHash) + 1;
+ int preferSlotIndex =
pathHashs.indexOf(lastPickPathHashs.getOrDefault(medium, -1L)) + 1;
if (preferSlotIndex < 0 || preferSlotIndex >=
pathHashs.size()) {
preferSlotIndex = 0;
}
@@ -2172,14 +2173,14 @@ public class TabletScheduler extends MasterDaemon {
for (int i = preferSlotIndex; i < pathHashs.size(); i++) {
long pathHash = pathHashs.get(i);
if (takeBalanceSlot(pathHash) != -1) {
- lastPickPathHash = pathHash;
+ lastPickPathHashs.put(medium, pathHash);
return pathHash;
}
}
for (int i = 0; i < preferSlotIndex; i++) {
long pathHash = pathHashs.get(i);
if (takeBalanceSlot(pathHash) != -1) {
- lastPickPathHash = pathHash;
+ lastPickPathHashs.put(medium, pathHash);
return pathHash;
}
}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/PathSlotTest.java
b/fe/fe-core/src/test/java/org/apache/doris/clone/PathSlotTest.java
index e26e3042fb8..99d49ceb30c 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/clone/PathSlotTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/PathSlotTest.java
@@ -39,10 +39,11 @@ class PathSlotTest {
List<Long> availPathHashs = Lists.newArrayList();
List<Long> expectPathHashs = Lists.newArrayList();
List<Long> gotPathHashs = Lists.newArrayList();
+ TStorageMedium medium = TStorageMedium.HDD;
long startPath = 10001L;
long endPath = 10006L;
for (long pathHash = startPath; pathHash < endPath; pathHash++) {
- paths.put(pathHash, TStorageMedium.HDD);
+ paths.put(pathHash, medium);
availPathHashs.add(pathHash);
expectPathHashs.add(pathHash);
}
@@ -56,7 +57,7 @@ class PathSlotTest {
PathSlot ps = new PathSlot(paths, 1L);
for (int i = 0; i < expectPathHashs.size(); i++) {
Collections.shuffle(availPathHashs);
- gotPathHashs.add(ps.takeAnAvailBalanceSlotFrom(availPathHashs));
+ gotPathHashs.add(ps.takeAnAvailBalanceSlotFrom(availPathHashs,
medium));
}
Assert.assertEquals(expectPathHashs, gotPathHashs);
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
index 5b2f65d771c..d4bbf6e8899 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java
@@ -49,7 +49,6 @@ public class DecommissionBackendTest extends
TestWithFeService {
@Override
protected void beforeCluster() {
FeConstants.runningUnitTest = true;
- needCleanDir = false;
}
@BeforeAll
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]