This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 1f19d0db3e [improvement](tablet clone) improve tablet balance, scaling
speed etc (#22317)
1f19d0db3e is described below
commit 1f19d0db3eea27eeec7fca77ab4fd0e9558b9147
Author: yujun <[email protected]>
AuthorDate: Thu Aug 17 22:30:49 2023 +0800
[improvement](tablet clone) improve tablet balance, scaling speed etc
(#22317)
---
.../main/java/org/apache/doris/common/Config.java | 10 +-
.../apache/doris/clone/BackendLoadStatistic.java | 112 +++++++++-
.../org/apache/doris/clone/BeLoadRebalancer.java | 56 +++--
.../org/apache/doris/clone/DiskRebalancer.java | 20 +-
.../apache/doris/clone/LoadStatisticForTag.java | 4 +
.../apache/doris/clone/PartitionRebalancer.java | 12 +-
.../java/org/apache/doris/clone/Rebalancer.java | 11 +-
.../apache/doris/clone/RootPathLoadStatistic.java | 8 +-
.../org/apache/doris/clone/TabletSchedCtx.java | 70 +++++--
.../org/apache/doris/clone/TabletScheduler.java | 228 ++++++++++++++-------
.../java/org/apache/doris/common/FeConstants.java | 2 +
.../common/proc/TabletSchedulerDetailProcDir.java | 2 +-
.../main/java/org/apache/doris/task/CloneTask.java | 19 +-
.../org/apache/doris/clone/DecommissionTest.java | 174 ++++++++++++++++
.../org/apache/doris/clone/DiskRebalanceTest.java | 36 ++--
.../java/org/apache/doris/clone/RebalanceTest.java | 6 +-
.../org/apache/doris/clone/RebalancerTestUtil.java | 23 +++
.../doris/clone/RootPathLoadStatisticTest.java | 14 +-
.../doris/clone/TabletRepairAndBalanceTest.java | 27 +--
.../java/org/apache/doris/task/AgentTaskTest.java | 3 +-
20 files changed, 638 insertions(+), 199 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 7ed352eff5..2819439c15 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -932,17 +932,19 @@ public class Config extends ConfigBase {
public static long tablet_repair_delay_factor_second = 60;
/**
- * the default slot number per path in tablet scheduler
+ * the default slot number per path for hdd in tablet scheduler
* TODO(cmy): remove this config and dynamically adjust it by clone task
statistic
*/
@ConfField(mutable = true, masterOnly = true)
- public static int schedule_slot_num_per_path = 4;
+ public static int schedule_slot_num_per_hdd_path = 4;
+
/**
- * the default slot number per path in tablet scheduler for decommission
backend
+ * the default slot number per path for ssd in tablet scheduler
+ * TODO(cmy): remove this config and dynamically adjust it by clone task
statistic
*/
@ConfField(mutable = true, masterOnly = true)
- public static int schedule_decommission_slot_num_per_path = 8;
+ public static int schedule_slot_num_per_ssd_path = 8;
/**
* the default batch size in tablet scheduler for a single schedule.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java
index fdf45afe81..6263f53ebe 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java
@@ -68,6 +68,70 @@ public class BackendLoadStatistic {
}
}
+ public static class BePathLoadStatPair {
+ private BackendLoadStatistic beLoadStatistic;
+ private RootPathLoadStatistic pathLoadStatistic;
+
+ BePathLoadStatPair(BackendLoadStatistic beLoadStatistic,
RootPathLoadStatistic pathLoadStatistic) {
+ this.beLoadStatistic = beLoadStatistic;
+ this.pathLoadStatistic = pathLoadStatistic;
+ }
+
+ BackendLoadStatistic getBackendLoadStatistic() {
+ return beLoadStatistic;
+ }
+
+ RootPathLoadStatistic getPathLoadStatistic() {
+ return pathLoadStatistic;
+ }
+
+ @Override
+ public String toString() {
+ return "{ beId: " + beLoadStatistic.getBeId() + ", be score: "
+ +
beLoadStatistic.getLoadScore(pathLoadStatistic.getStorageMedium())
+ + ", path: " + pathLoadStatistic.getPath()
+ + ", path used percent: " +
pathLoadStatistic.getUsedPercent()
+ + " }";
+ }
+ }
+
+ public static class BePathLoadStatPairComparator implements
Comparator<BePathLoadStatPair> {
+ private double avgBackendLoadScore;
+ private double avgPathUsedPercent;
+
+ BePathLoadStatPairComparator(List<BePathLoadStatPair> loadStats) {
+ avgBackendLoadScore = 0.0;
+ avgPathUsedPercent = 0.0;
+ for (BePathLoadStatPair loadStat : loadStats) {
+ RootPathLoadStatistic pathStat =
loadStat.getPathLoadStatistic();
+ avgBackendLoadScore +=
loadStat.getBackendLoadStatistic().getLoadScore(pathStat.getStorageMedium());
+ avgPathUsedPercent += pathStat.getUsedPercent();
+ }
+ if (!loadStats.isEmpty()) {
+ avgPathUsedPercent /= loadStats.size();
+ avgBackendLoadScore /= loadStats.size();
+ }
+ if (avgBackendLoadScore == 0.0) {
+ avgBackendLoadScore = 1.0;
+ }
+ if (avgPathUsedPercent == 0.0) {
+ avgPathUsedPercent = 1.0;
+ }
+ }
+
+ @Override
+ public int compare(BePathLoadStatPair o1, BePathLoadStatPair o2) {
+ return Double.compare(getCompareValue(o1), getCompareValue(o2));
+ }
+
+ private double getCompareValue(BePathLoadStatPair loadStat) {
+ BackendLoadStatistic beStat = loadStat.getBackendLoadStatistic();
+ RootPathLoadStatistic pathStat = loadStat.getPathLoadStatistic();
+ return 0.5 * beStat.getLoadScore(pathStat.getStorageMedium()) /
avgBackendLoadScore
+ + 0.5 * pathStat.getUsedPercent() / avgPathUsedPercent;
+ }
+ }
+
public static final BeStatComparator HDD_COMPARATOR = new
BeStatComparator(TStorageMedium.HDD);
public static final BeStatComparator SSD_COMPARATOR = new
BeStatComparator(TStorageMedium.SSD);
public static final BeStatMixComparator MIX_COMPARATOR = new
BeStatMixComparator();
@@ -362,9 +426,9 @@ public class BackendLoadStatistic {
}
result.add(pathStatistic);
- return BalanceStatus.OK;
}
- return status;
+
+ return result.isEmpty() ? status : BalanceStatus.OK;
}
/**
@@ -456,6 +520,50 @@ public class BackendLoadStatistic {
beId, low.size(), mid.size(), high.size());
}
+ public void getPathStatisticByClass(List<RootPathLoadStatistic> low,
+ List<RootPathLoadStatistic> mid, List<RootPathLoadStatistic> high,
TStorageMedium storageMedium) {
+ for (RootPathLoadStatistic pathStat : pathStatistics) {
+ if (pathStat.getDiskState() == DiskState.OFFLINE
+ || (storageMedium != null && pathStat.getStorageMedium()
!= storageMedium)) {
+ continue;
+ }
+
+ if (pathStat.getClazz() == Classification.LOW) {
+ low.add(pathStat);
+ } else if (pathStat.getClazz() == Classification.HIGH) {
+ high.add(pathStat);
+ } else {
+ mid.add(pathStat);
+ }
+ }
+
+ LOG.debug("after adjust, backend {} path classification low/mid/high:
{}/{}/{}",
+ beId, low.size(), mid.size(), high.size());
+ }
+
+ public void incrPathsCopingSize(Map<Long, Long> pathsCopingSize) {
+ boolean updated = false;
+ for (RootPathLoadStatistic pathStat : pathStatistics) {
+ Long copingSize = pathsCopingSize.get(pathStat.getPathHash());
+ if (copingSize != null && copingSize > 0) {
+ pathStat.incrCopingSizeB(copingSize);
+ updated = true;
+ }
+ }
+ if (updated) {
+ Collections.sort(pathStatistics);
+ }
+ }
+
+ public void incrPathCopingSize(long pathHash, long copingSize) {
+ RootPathLoadStatistic pathStat = pathStatistics.stream().filter(
+ p -> p.getPathHash() == pathHash).findFirst().orElse(null);
+ if (pathStat != null) {
+ pathStat.incrCopingSizeB(copingSize);
+ Collections.sort(pathStatistics);
+ }
+ }
+
public List<RootPathLoadStatistic> getPathStatistics() {
return pathStatistics;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
index 5317725881..d388e5fd79 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
@@ -22,6 +22,8 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.TabletMeta;
+import org.apache.doris.clone.BackendLoadStatistic.BePathLoadStatPair;
+import
org.apache.doris.clone.BackendLoadStatistic.BePathLoadStatPairComparator;
import org.apache.doris.clone.SchedException.Status;
import org.apache.doris.clone.SchedException.SubCode;
import org.apache.doris.clone.TabletSchedCtx.Priority;
@@ -51,8 +53,9 @@ import java.util.Set;
public class BeLoadRebalancer extends Rebalancer {
private static final Logger LOG =
LogManager.getLogger(BeLoadRebalancer.class);
- public BeLoadRebalancer(SystemInfoService infoService, TabletInvertedIndex
invertedIndex) {
- super(infoService, invertedIndex);
+ public BeLoadRebalancer(SystemInfoService infoService, TabletInvertedIndex
invertedIndex,
+ Map<Long, PathSlot> backendsWorkingSlots) {
+ super(infoService, invertedIndex, backendsWorkingSlots);
}
/*
@@ -100,9 +103,16 @@ public class BeLoadRebalancer extends Rebalancer {
return alternativeTablets;
}
- // get the number of low load paths. and we should at most select this
number of tablets
- long numOfLowPaths = lowBEs.stream().filter(b -> b.isAvailable() &&
b.hasAvailDisk()).mapToLong(
- b -> b.getAvailPathNum(medium)).sum();
+ long numOfLowPaths = 0;
+ for (BackendLoadStatistic backendLoadStatistic : lowBEs) {
+ if (!backendLoadStatistic.isAvailable()) {
+ continue;
+ }
+ PathSlot pathSlot =
backendsWorkingSlots.get(backendLoadStatistic.getBeId());
+ if (pathSlot != null) {
+ numOfLowPaths += pathSlot.getTotalAvailBalanceSlotNum();
+ }
+ }
LOG.info("get number of low load paths: {}, with medium: {}",
numOfLowPaths, medium);
int clusterAvailableBEnum = infoService.getAllBackendIds(true).size();
@@ -113,6 +123,10 @@ public class BeLoadRebalancer extends Rebalancer {
OUTER:
for (int i = highBEs.size() - 1; i >= 0; i--) {
BackendLoadStatistic beStat = highBEs.get(i);
+ PathSlot pathSlot = backendsWorkingSlots.get(beStat.getBeId());
+ if (pathSlot == null) {
+ continue;
+ }
// classify the paths.
Set<Long> pathLow = Sets.newHashSet();
@@ -129,7 +143,10 @@ public class BeLoadRebalancer extends Rebalancer {
// for each path, we try to select at most
BALANCE_SLOT_NUM_FOR_PATH tablets
Map<Long, Integer> remainingPaths = Maps.newHashMap();
for (Long pathHash : pathHigh) {
- remainingPaths.put(pathHash, Config.balance_slot_num_per_path);
+ int availBalanceNum =
pathSlot.getAvailableBalanceNum(pathHash);
+ if (availBalanceNum > 0) {
+ remainingPaths.put(pathHash, availBalanceNum);
+ }
}
if (remainingPaths.isEmpty()) {
@@ -201,8 +218,7 @@ public class BeLoadRebalancer extends Rebalancer {
* 2. Select a low load backend as destination. And tablet should not has
replica on this backend.
*/
@Override
- public void completeSchedCtx(TabletSchedCtx tabletCtx,
- Map<Long, PathSlot> backendsWorkingSlots) throws SchedException {
+ public void completeSchedCtx(TabletSchedCtx tabletCtx) throws
SchedException {
LoadStatisticForTag clusterStat = statisticMap.get(tabletCtx.getTag());
if (clusterStat == null) {
throw new SchedException(Status.UNRECOVERABLE,
@@ -305,6 +321,7 @@ public class BeLoadRebalancer extends Rebalancer {
throw new SchedException(Status.UNRECOVERABLE, "unable to find low
backend");
}
+ List<BePathLoadStatPair> candFitPaths = Lists.newArrayList();
for (BackendLoadStatistic beStat : candidates) {
PathSlot slot = backendsWorkingSlots.get(beStat.getBeId());
if (slot == null) {
@@ -313,15 +330,26 @@ public class BeLoadRebalancer extends Rebalancer {
// classify the paths.
// And we only select path from 'low' and 'mid' paths
- Set<Long> pathLow = Sets.newHashSet();
- Set<Long> pathMid = Sets.newHashSet();
- Set<Long> pathHigh = Sets.newHashSet();
+ List<RootPathLoadStatistic> pathLow = Lists.newArrayList();
+ List<RootPathLoadStatistic> pathMid = Lists.newArrayList();
+ List<RootPathLoadStatistic> pathHigh = Lists.newArrayList();
beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh,
tabletCtx.getStorageMedium());
+
pathLow.addAll(pathMid);
+ pathLow.stream().forEach(path -> candFitPaths.add(new
BePathLoadStatPair(beStat, path)));
+ }
- long pathHash = slot.takeAnAvailBalanceSlotFrom(pathLow);
- if (pathHash != -1) {
- tabletCtx.setDest(beStat.getBeId(), pathHash);
+ BePathLoadStatPairComparator comparator = new
BePathLoadStatPairComparator(candFitPaths);
+ Collections.sort(candFitPaths, comparator);
+ for (BePathLoadStatPair bePathLoadStat : candFitPaths) {
+ BackendLoadStatistic beStat =
bePathLoadStat.getBackendLoadStatistic();
+ RootPathLoadStatistic pathStat =
bePathLoadStat.getPathLoadStatistic();
+ PathSlot slot = backendsWorkingSlots.get(beStat.getBeId());
+ if (slot == null) {
+ continue;
+ }
+ if (slot.takeBalanceSlot(pathStat.getPathHash()) != -1) {
+ tabletCtx.setDest(beStat.getBeId(), pathStat.getPathHash());
return;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java
index abac0c2d1a..fe835b94af 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java
@@ -24,7 +24,6 @@ import org.apache.doris.clone.SchedException.Status;
import org.apache.doris.clone.TabletSchedCtx.BalanceType;
import org.apache.doris.clone.TabletSchedCtx.Priority;
import org.apache.doris.clone.TabletScheduler.PathSlot;
-import org.apache.doris.common.Config;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TStorageMedium;
@@ -52,8 +51,9 @@ import java.util.Set;
public class DiskRebalancer extends Rebalancer {
private static final Logger LOG =
LogManager.getLogger(DiskRebalancer.class);
- public DiskRebalancer(SystemInfoService infoService, TabletInvertedIndex
invertedIndex) {
- super(infoService, invertedIndex);
+ public DiskRebalancer(SystemInfoService infoService, TabletInvertedIndex
invertedIndex,
+ Map<Long, PathSlot> backendsWorkingSlots) {
+ super(infoService, invertedIndex, backendsWorkingSlots);
}
public List<BackendLoadStatistic>
filterByPrioBackends(List<BackendLoadStatistic> bes) {
@@ -152,6 +152,10 @@ public class DiskRebalancer extends Rebalancer {
Collections.shuffle(midBEs);
for (int i = midBEs.size() - 1; i >= 0; i--) {
BackendLoadStatistic beStat = midBEs.get(i);
+ PathSlot pathSlot = backendsWorkingSlots.get(beStat.getBeId());
+ if (pathSlot == null) {
+ continue;
+ }
// classify the paths.
Set<Long> pathLow = Sets.newHashSet();
@@ -171,7 +175,10 @@ public class DiskRebalancer extends Rebalancer {
// for each path, we try to select at most
BALANCE_SLOT_NUM_FOR_PATH tablets
Map<Long, Integer> remainingPaths = Maps.newHashMap();
for (Long pathHash : pathHigh) {
- remainingPaths.put(pathHash, Config.balance_slot_num_per_path);
+ int availBalanceNum =
pathSlot.getAvailableBalanceNum(pathHash);
+ if (availBalanceNum > 0) {
+ remainingPaths.put(pathHash, availBalanceNum);
+ }
}
if (remainingPaths.isEmpty()) {
@@ -246,8 +253,7 @@ public class DiskRebalancer extends Rebalancer {
* 3. Select a low load path from this backend as destination.
*/
@Override
- public void completeSchedCtx(TabletSchedCtx tabletCtx,
- Map<Long, PathSlot> backendsWorkingSlots) throws SchedException {
+ public void completeSchedCtx(TabletSchedCtx tabletCtx) throws
SchedException {
LoadStatisticForTag clusterStat = statisticMap.get(tabletCtx.getTag());
if (clusterStat == null) {
throw new SchedException(Status.UNRECOVERABLE,
@@ -323,7 +329,7 @@ public class DiskRebalancer extends Rebalancer {
}
long destPathHash = slot.takeBalanceSlot(stat.getPathHash());
if (destPathHash == -1) {
- throw new SchedException(Status.UNRECOVERABLE, "unable to take
dest slot");
+ continue;
}
tabletCtx.setDest(beStat.getBeId(), destPathHash, stat.getPath());
setDest = true;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/LoadStatisticForTag.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/LoadStatisticForTag.java
index 224399f480..413a3b129f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/LoadStatisticForTag.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/LoadStatisticForTag.java
@@ -342,6 +342,10 @@ public class LoadStatisticForTag {
return null;
}
+ public List<BackendLoadStatistic> getBackendLoadStatistics() {
+ return beLoadStatistics;
+ }
+
/*
* If cluster is balance, all Backends will be in 'mid', and 'high' and
'low' is empty
* If both 'high' and 'low' has Backends, just return
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java
index d9d3f27cc7..6c83944462 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/PartitionRebalancer.java
@@ -20,6 +20,7 @@ package org.apache.doris.clone;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.TabletMeta;
+import org.apache.doris.clone.TabletScheduler.PathSlot;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.resource.Tag;
@@ -63,8 +64,9 @@ public class PartitionRebalancer extends Rebalancer {
private final AtomicLong counterBalanceMoveCreated = new AtomicLong(0);
private final AtomicLong counterBalanceMoveSucceeded = new AtomicLong(0);
- public PartitionRebalancer(SystemInfoService infoService,
TabletInvertedIndex invertedIndex) {
- super(infoService, invertedIndex);
+ public PartitionRebalancer(SystemInfoService infoService,
TabletInvertedIndex invertedIndex,
+ Map<Long, PathSlot> backendsWorkingSlots) {
+ super(infoService, invertedIndex, backendsWorkingSlots);
}
@Override
@@ -229,7 +231,7 @@ public class PartitionRebalancer extends Rebalancer {
}
@Override
- protected void completeSchedCtx(TabletSchedCtx tabletCtx, Map<Long,
TabletScheduler.PathSlot> backendsWorkingSlots)
+ protected void completeSchedCtx(TabletSchedCtx tabletCtx)
throws SchedException {
MovesCacheMap.MovesCache movesInProgress =
movesCacheMap.getCache(tabletCtx.getTag(),
tabletCtx.getStorageMedium());
@@ -271,10 +273,10 @@ public class PartitionRebalancer extends Rebalancer {
if (pathHash == -1) {
throw new
SchedException(SchedException.Status.SCHEDULE_FAILED,
SchedException.SubCode.WAITING_SLOT,
"paths has no available balance slot: " + availPath);
- } else {
- tabletCtx.setDest(beStat.getBeId(), pathHash);
}
+ tabletCtx.setDest(beStat.getBeId(), pathHash);
+
// ToDeleteReplica is the source replica
pair.second = srcReplica.getId();
} catch (IllegalStateException | NullPointerException e) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java
index 4c3ef57546..bee23747bc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java
@@ -49,14 +49,17 @@ public abstract class Rebalancer {
// When Rebalancer init, the statisticMap is usually empty. So it's no
need to be an arg.
// Only use updateLoadStatistic() to load stats.
protected Map<Tag, LoadStatisticForTag> statisticMap = Maps.newHashMap();
+ protected Map<Long, PathSlot> backendsWorkingSlots;
protected TabletInvertedIndex invertedIndex;
protected SystemInfoService infoService;
// be id -> end time of prio
protected Map<Long, Long> prioBackends = Maps.newConcurrentMap();
- public Rebalancer(SystemInfoService infoService, TabletInvertedIndex
invertedIndex) {
+ public Rebalancer(SystemInfoService infoService, TabletInvertedIndex
invertedIndex,
+ Map<Long, PathSlot> backendsWorkingSlots) {
this.infoService = infoService;
this.invertedIndex = invertedIndex;
+ this.backendsWorkingSlots = backendsWorkingSlots;
}
public List<TabletSchedCtx> selectAlternativeTablets() {
@@ -74,9 +77,9 @@ public abstract class Rebalancer {
protected abstract List<TabletSchedCtx> selectAlternativeTabletsForCluster(
LoadStatisticForTag clusterStat, TStorageMedium medium);
- public AgentTask createBalanceTask(TabletSchedCtx tabletCtx, Map<Long,
PathSlot> backendsWorkingSlots)
+ public AgentTask createBalanceTask(TabletSchedCtx tabletCtx)
throws SchedException {
- completeSchedCtx(tabletCtx, backendsWorkingSlots);
+ completeSchedCtx(tabletCtx);
if (tabletCtx.getBalanceType() ==
TabletSchedCtx.BalanceType.BE_BALANCE) {
return tabletCtx.createCloneReplicaAndTask();
} else {
@@ -90,7 +93,7 @@ public abstract class Rebalancer {
// You should check the moves' validation.
// 2. If you want to generate {srcReplica, destBe} here, just do it.
// 3. You should check the path slots of src & dest.
- protected abstract void completeSchedCtx(TabletSchedCtx tabletCtx,
Map<Long, PathSlot> backendsWorkingSlots)
+ protected abstract void completeSchedCtx(TabletSchedCtx tabletCtx)
throws SchedException;
public Long getToDeleteReplicaId(TabletSchedCtx tabletCtx) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/RootPathLoadStatistic.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/RootPathLoadStatistic.java
index 3aeb4069ab..1a51276f7d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/RootPathLoadStatistic.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/RootPathLoadStatistic.java
@@ -31,6 +31,7 @@ public class RootPathLoadStatistic implements
Comparable<RootPathLoadStatistic>
private TStorageMedium storageMedium;
private long capacityB;
private long usedCapacityB;
+ private long copingSizeB;
private DiskState diskState;
private Classification clazz = Classification.INIT;
@@ -43,6 +44,7 @@ public class RootPathLoadStatistic implements
Comparable<RootPathLoadStatistic>
this.storageMedium = storageMedium;
this.capacityB = capacityB <= 0 ? 1 : capacityB;
this.usedCapacityB = usedCapacityB;
+ this.copingSizeB = 0;
this.diskState = diskState;
}
@@ -71,7 +73,11 @@ public class RootPathLoadStatistic implements
Comparable<RootPathLoadStatistic>
}
public double getUsedPercent() {
- return capacityB <= 0 ? 0.0 : usedCapacityB / (double) capacityB;
+ return capacityB <= 0 ? 0.0 : (usedCapacityB + copingSizeB) / (double)
capacityB;
+ }
+
+ public void incrCopingSizeB(long size) {
+ copingSizeB += size;
}
public void setClazz(Classification clazz) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
index 3c2d2b148c..4a3a1a6eb2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
@@ -34,6 +34,7 @@ import org.apache.doris.clone.TabletScheduler.PathSlot;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
+import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.persist.ReplicaPersistInfo;
import org.apache.doris.resource.Tag;
@@ -179,6 +180,8 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
private long visibleVersion = -1;
private long committedVersion = -1;
+ private long tabletSize = 0;
+
private Replica srcReplica = null;
private long srcPathHash = -1;
// for disk balance to keep src path, and avoid take slot on
selectAlternativeTablets
@@ -281,6 +284,10 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
return failedSchedCounter;
}
+ public void resetFailedSchedCounter() {
+ failedSchedCounter = 0;
+ }
+
public void increaseFailedRunningCounter() {
++failedRunningCounter;
}
@@ -301,7 +308,7 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
} else {
decommissionTime = -1;
if (code == SubCode.WAITING_SLOT && type != Type.BALANCE) {
- return failedSchedCounter > 30 * 1000 /
TabletScheduler.SCHEDULE_INTERVAL_MS;
+ return failedSchedCounter > 30 * 1000 /
FeConstants.tablet_schedule_interval_ms;
} else {
return failedSchedCounter > 10;
}
@@ -477,13 +484,13 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
// database lock should be held.
public long getTabletSize() {
- long max = Long.MIN_VALUE;
- for (Replica replica : tablet.getReplicas()) {
- if (replica.getDataSize() > max) {
- max = replica.getDataSize();
- }
- }
- return max;
+ return tabletSize;
+ }
+
+ public void updateTabletSize() {
+ tabletSize = 0;
+ tablet.getReplicas().stream().forEach(
+ replica -> tabletSize = Math.max(tabletSize,
replica.getDataSize()));
}
/*
@@ -905,6 +912,7 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
// if this is a balance task, or this is a repair task with
// REPLICA_MISSING/REPLICA_RELOCATING,
// we create a new replica with state CLONE
+ long replicaId = 0;
if (tabletStatus == TabletStatus.REPLICA_MISSING
|| tabletStatus == TabletStatus.REPLICA_RELOCATING || type ==
Type.BALANCE
|| tabletStatus == TabletStatus.COLOCATE_MISMATCH
@@ -917,14 +925,9 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
committedVersion, /* use committed version as last failed
version */
-1 /* last success version */);
- TBackend tSrcBe = new TBackend(srcBe.getHost(), srcBe.getBePort(),
srcBe.getHttpPort());
- cloneTask = new CloneTask(destBackendId, dbId, tblId, partitionId,
indexId,
- tabletId, cloneReplica.getId(), schemaHash,
Lists.newArrayList(tSrcBe), storageMedium,
- visibleVersion, (int) (taskTimeoutMs / 1000));
- cloneTask.setPathHash(srcPathHash, destPathHash);
-
// addReplica() method will add this replica to tablet inverted
index too.
tablet.addReplica(cloneReplica);
+ replicaId = cloneReplica.getId();
} else if (tabletStatus == TabletStatus.VERSION_INCOMPLETE) {
Preconditions.checkState(type == Type.REPAIR, type);
// double check
@@ -937,18 +940,31 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
throw new SchedException(Status.SCHEDULE_FAILED, "dest
replica's path hash is changed. "
+ "current: " + replica.getPathHash() + ", scheduled:
" + destPathHash);
}
+ replicaId = replica.getId();
+ }
+
+ TBackend tSrcBe = new TBackend(srcBe.getHost(), srcBe.getBePort(),
srcBe.getHttpPort());
+ TBackend tDestBe = new TBackend(destBe.getHost(), destBe.getBePort(),
destBe.getHttpPort());
- TBackend tSrcBe = new TBackend(srcBe.getHost(), srcBe.getBePort(),
srcBe.getHttpPort());
- cloneTask = new CloneTask(destBackendId, dbId, tblId, partitionId,
indexId,
- tabletId, replica.getId(), schemaHash,
Lists.newArrayList(tSrcBe), storageMedium,
+ cloneTask = new CloneTask(tDestBe, destBackendId, dbId, tblId,
partitionId, indexId, tabletId,
+ replicaId, schemaHash, Lists.newArrayList(tSrcBe),
storageMedium,
visibleVersion, (int) (taskTimeoutMs / 1000));
- cloneTask.setPathHash(srcPathHash, destPathHash);
- }
+ cloneTask.setPathHash(srcPathHash, destPathHash);
this.state = State.RUNNING;
return cloneTask;
}
+ // for storage migration or cloning a new replica
+ public long getDestEstimatedCopingSize() {
+ if ((cloneTask != null && tabletStatus !=
TabletStatus.VERSION_INCOMPLETE)
+ || storageMediaMigrationTask != null) {
+ return Math.max(getTabletSize(), 10L);
+ } else {
+ return 0;
+ }
+ }
+
// timeout is between MIN_CLONE_TASK_TIMEOUT_MS and
MAX_CLONE_TASK_TIMEOUT_MS
private long getApproximateTimeoutMs() {
long tabletSize = getTabletSize();
@@ -1131,6 +1147,8 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
result.add(TimeUtils.longToTimeString(lastSchedTime));
result.add(TimeUtils.longToTimeString(lastVisitedTime));
result.add(TimeUtils.longToTimeString(finishedTime));
+ Pair<Double, String> tabletSizeDesc =
DebugUtil.getByteUint(tabletSize);
+
result.add(DebugUtil.DECIMAL_FORMAT_SCALE_3.format(tabletSizeDesc.first) + " "
+ tabletSizeDesc.second);
result.add(copyTimeMs > 0 ? String.valueOf(copySize / copyTimeMs /
1000.0) : FeConstants.null_string);
result.add(String.valueOf(failedSchedCounter));
result.add(String.valueOf(failedRunningCounter));
@@ -1162,8 +1180,9 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
value += 5 * 1000L;
}
+ // repair tasks always prior than balance
if (type == Type.BALANCE) {
- value += 30 * 60 * 1000L;
+ value += 10 * 24 * 3600L;
}
return value;
@@ -1174,12 +1193,19 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
StringBuilder sb = new StringBuilder();
sb.append("tablet id: ").append(tabletId).append(", status:
").append(tabletStatus.name());
sb.append(", state: ").append(state.name()).append(", type:
").append(type.name());
+ if (type == Type.BALANCE && balanceType != null) {
+ sb.append(", balance: ").append(balanceType.name());
+ }
+ if (priority != null) {
+ sb.append(", priority: ").append(priority.name());
+ }
+ sb.append(", tablet size: ").append(tabletSize);
if (srcReplica != null) {
- sb.append(". from backend: ").append(srcReplica.getBackendId());
+ sb.append(", from backend: ").append(srcReplica.getBackendId());
sb.append(", src path hash: ").append(srcPathHash);
}
if (destPathHash != -1) {
- sb.append(". to backend: ").append(destBackendId);
+ sb.append(", to backend: ").append(destBackendId);
sb.append(", dest path hash: ").append(destPathHash);
}
sb.append(", visible version: ").append(visibleVersion);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
index 1bc2f54337..d791b53475 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -22,7 +22,6 @@ import org.apache.doris.analysis.AdminRebalanceDiskStmt;
import org.apache.doris.catalog.ColocateTableIndex;
import org.apache.doris.catalog.ColocateTableIndex.GroupId;
import org.apache.doris.catalog.Database;
-import org.apache.doris.catalog.DiskInfo;
import org.apache.doris.catalog.DiskInfo.DiskState;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedIndex;
@@ -36,6 +35,8 @@ import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.Tablet.TabletStatus;
import org.apache.doris.catalog.TabletInvertedIndex;
+import org.apache.doris.clone.BackendLoadStatistic.BePathLoadStatPair;
+import
org.apache.doris.clone.BackendLoadStatistic.BePathLoadStatPairComparator;
import org.apache.doris.clone.SchedException.Status;
import org.apache.doris.clone.SchedException.SubCode;
import org.apache.doris.clone.TabletSchedCtx.Priority;
@@ -58,6 +59,7 @@ import org.apache.doris.task.DropReplicaTask;
import org.apache.doris.task.StorageMediaMigrationTask;
import org.apache.doris.thrift.TFinishTaskRequest;
import org.apache.doris.thrift.TStatusCode;
+import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.transaction.DatabaseTransactionMgr;
import org.apache.doris.transaction.TransactionState;
@@ -72,12 +74,12 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
-import java.util.stream.Collectors;
/**
* TabletScheduler saved the tablets produced by TabletChecker and try to
schedule them.
@@ -103,8 +105,6 @@ public class TabletScheduler extends MasterDaemon {
// the minimum interval of updating cluster statistics and priority of
tablet info
private static final long STAT_UPDATE_INTERVAL_MS = 20 * 1000; // 20s
- public static final long SCHEDULE_INTERVAL_MS = 100;
-
/*
* Tablet is added to pendingTablets as well it's id in allTabletTypes.
* TabletScheduler will take tablet from pendingTablets but will not
remove it's id from allTabletTypes when
@@ -127,12 +127,11 @@ public class TabletScheduler extends MasterDaemon {
private Map<Long, PathSlot> backendsWorkingSlots = Maps.newConcurrentMap();
// Tag -> load statistic
private Map<Tag, LoadStatisticForTag> statisticMap = Maps.newHashMap();
+
private long lastStatUpdateTime = 0;
private long lastSlotAdjustTime = 0;
- private long lastCheckTimeoutTime = 0;
-
private Env env;
private SystemInfoService infoService;
private TabletInvertedIndex invertedIndex;
@@ -151,19 +150,19 @@ public class TabletScheduler extends MasterDaemon {
public TabletScheduler(Env env, SystemInfoService infoService,
TabletInvertedIndex invertedIndex,
TabletSchedulerStat stat, String rebalancerType) {
- super("tablet scheduler", SCHEDULE_INTERVAL_MS);
+ super("tablet scheduler", FeConstants.tablet_schedule_interval_ms);
this.env = env;
this.infoService = infoService;
this.invertedIndex = invertedIndex;
this.colocateTableIndex = env.getColocateTableIndex();
this.stat = stat;
if (rebalancerType.equalsIgnoreCase("partition")) {
- this.rebalancer = new PartitionRebalancer(infoService,
invertedIndex);
+ this.rebalancer = new PartitionRebalancer(infoService,
invertedIndex, backendsWorkingSlots);
} else {
- this.rebalancer = new BeLoadRebalancer(infoService, invertedIndex);
+ this.rebalancer = new BeLoadRebalancer(infoService, invertedIndex,
backendsWorkingSlots);
}
// if rebalancer can not get new task, then use diskRebalancer to get
task
- this.diskRebalancer = new DiskRebalancer(infoService, invertedIndex);
+ this.diskRebalancer = new DiskRebalancer(infoService, invertedIndex,
backendsWorkingSlots);
}
public TabletSchedulerStat getStat() {
@@ -190,10 +189,11 @@ public class TabletScheduler extends MasterDaemon {
Set<Long> deletedBeIds = Sets.newHashSet();
for (Long beId : backendsWorkingSlots.keySet()) {
if (backends.containsKey(beId)) {
- List<Long> pathHashes =
backends.get(beId).getDisks().values().stream()
+ Map<Long, TStorageMedium> paths = Maps.newHashMap();
+ backends.get(beId).getDisks().values().stream()
.filter(v -> v.getState() == DiskState.ONLINE)
-
.map(DiskInfo::getPathHash).collect(Collectors.toList());
- backendsWorkingSlots.get(beId).updatePaths(pathHashes);
+ .forEach(v -> paths.put(v.getPathHash(),
v.getStorageMedium()));
+ backendsWorkingSlots.get(beId).updatePaths(paths);
} else {
deletedBeIds.add(beId);
}
@@ -208,9 +208,11 @@ public class TabletScheduler extends MasterDaemon {
// add new backends
for (Backend be : backends.values()) {
if (!backendsWorkingSlots.containsKey(be.getId())) {
- List<Long> pathHashes = be.getDisks().values().stream()
-
.map(DiskInfo::getPathHash).collect(Collectors.toList());
- PathSlot slot = new PathSlot(pathHashes, be.getId());
+ Map<Long, TStorageMedium> paths = Maps.newHashMap();
+ be.getDisks().values().stream()
+ .filter(v -> v.getState() == DiskState.ONLINE)
+ .forEach(v -> paths.put(v.getPathHash(),
v.getStorageMedium()));
+ PathSlot slot = new PathSlot(paths, be.getId());
backendsWorkingSlots.put(be.getId(), slot);
LOG.info("add new backend {} with slots num: {}", be.getId(),
be.getDisks().size());
}
@@ -261,9 +263,7 @@ public class TabletScheduler extends MasterDaemon {
pendingTablets.offer(tablet);
if (!contains) {
- LOG.info("Add tablet to pending queue, tablet id {}, type {},
status {}, priority {}",
- tablet.getTabletId(), tablet.getType(),
tablet.getTabletStatus(),
- tablet.getPriority());
+ LOG.info("Add tablet to pending queue, {}", tablet);
}
return AddResult.ADDED;
@@ -319,24 +319,16 @@ public class TabletScheduler extends MasterDaemon {
return;
}
- if (System.currentTimeMillis() - lastCheckTimeoutTime >= 1000L) {
- updateLoadStatisticsAndPriorityIfNecessary();
- handleRunningTablets();
- selectTabletsForBalance();
- lastCheckTimeoutTime = System.currentTimeMillis();
- }
-
+ updateLoadStatistics();
+ handleRunningTablets();
+ selectTabletsForBalance();
schedulePendingTablets();
stat.counterTabletScheduleRound.incrementAndGet();
}
- private void updateLoadStatisticsAndPriorityIfNecessary() {
- if (System.currentTimeMillis() - lastStatUpdateTime <
STAT_UPDATE_INTERVAL_MS) {
- return;
- }
-
+ private void updateLoadStatistics() {
updateLoadStatistic();
rebalancer.updateLoadStatistic(statisticMap);
diskRebalancer.updateLoadStatistic(statisticMap);
@@ -359,6 +351,12 @@ public class TabletScheduler extends MasterDaemon {
newStatisticMap.put(tag, loadStatistic);
LOG.debug("update load statistic for tag {}:\n{}", tag,
loadStatistic.getBrief());
}
+ Map<Long, Long> pathsCopingSize = getPathsCopingSize();
+ for (LoadStatisticForTag loadStatistic : newStatisticMap.values()) {
+ for (BackendLoadStatistic beLoadStatistic :
loadStatistic.getBackendLoadStatistics()) {
+ beLoadStatistic.incrPathsCopingSize(pathsCopingSize);
+ }
+ }
this.statisticMap = newStatisticMap;
}
@@ -584,6 +582,7 @@ public class TabletScheduler extends MasterDaemon {
// we do not concern priority here.
// once we take the tablet out of priority queue, priority is
meaningless.
tabletCtx.setTablet(tablet);
+ tabletCtx.updateTabletSize();
tabletCtx.setVersionInfo(partition.getVisibleVersion(),
partition.getCommittedVersion());
tabletCtx.setSchemaHash(tbl.getSchemaHashByIndexId(idx.getId()));
tabletCtx.setStorageMedium(tbl.getPartitionInfo().getDataProperty(partition.getId()).getStorageMedium());
@@ -691,6 +690,7 @@ public class TabletScheduler extends MasterDaemon {
// create clone task
batchTask.addTask(tabletCtx.createCloneReplicaAndTask());
+ incrDestPathCopingSize(tabletCtx);
}
// In dealing with the case of missing replicas, we need to select a tag
with missing replicas
@@ -782,6 +782,7 @@ public class TabletScheduler extends MasterDaemon {
private void handleReplicaRelocating(TabletSchedCtx tabletCtx,
AgentBatchTask batchTask)
throws SchedException {
stat.counterReplicaUnavailableErr.incrementAndGet();
+ tabletCtx.setTabletStatus(TabletStatus.VERSION_INCOMPLETE);
handleReplicaVersionIncomplete(tabletCtx, batchTask);
}
@@ -1202,6 +1203,7 @@ public class TabletScheduler extends MasterDaemon {
// create clone task
batchTask.addTask(tabletCtx.createCloneReplicaAndTask());
+ incrDestPathCopingSize(tabletCtx);
}
/**
@@ -1214,16 +1216,23 @@ public class TabletScheduler extends MasterDaemon {
return;
}
- long numOfBalancingTablets = getBalanceTabletsNumber();
- if (numOfBalancingTablets > Config.max_balancing_tablets) {
- LOG.info("number of balancing tablets {} exceed limit: {}, skip
selecting tablets for balance",
- numOfBalancingTablets, Config.max_balancing_tablets);
+ // No need to prefetch too many balance task to pending queue.
+ // Because for every sched, it will re select the balance task.
+ int needAddBalanceNum = Math.min(Config.schedule_batch_size -
getPendingNum(),
+ Config.max_balancing_tablets - getBalanceTabletsNumber());
+ if (needAddBalanceNum <= 0) {
return;
}
List<TabletSchedCtx> alternativeTablets =
rebalancer.selectAlternativeTablets();
+ Collections.shuffle(alternativeTablets);
for (TabletSchedCtx tabletCtx : alternativeTablets) {
- addTablet(tabletCtx, false);
+ if (addTablet(tabletCtx, false) == AddResult.ADDED) {
+ needAddBalanceNum--;
+ if (needAddBalanceNum <= 0) {
+ return;
+ }
+ }
}
if (Config.disable_disk_balance) {
LOG.info("disk balance is disabled. skip selecting tablets for
disk balance");
@@ -1237,7 +1246,12 @@ public class TabletScheduler extends MasterDaemon {
for (TabletSchedCtx tabletCtx : diskBalanceTablets) {
// add if task from prio backend or cluster is balanced
if (alternativeTablets.isEmpty() || tabletCtx.getPriority() ==
TabletSchedCtx.Priority.NORMAL) {
- addTablet(tabletCtx, false);
+ if (addTablet(tabletCtx, false) == AddResult.ADDED) {
+ needAddBalanceNum--;
+ if (needAddBalanceNum <= 0) {
+ break;
+ }
+ }
}
}
}
@@ -1249,16 +1263,17 @@ public class TabletScheduler extends MasterDaemon {
stat.counterBalanceSchedule.incrementAndGet();
AgentTask task = null;
if (tabletCtx.getBalanceType() ==
TabletSchedCtx.BalanceType.DISK_BALANCE) {
- task = diskRebalancer.createBalanceTask(tabletCtx,
backendsWorkingSlots);
+ task = diskRebalancer.createBalanceTask(tabletCtx);
checkDiskBalanceLastSuccTime(tabletCtx.getSrcBackendId(),
tabletCtx.getSrcPathHash());
checkDiskBalanceLastSuccTime(tabletCtx.getDestBackendId(),
tabletCtx.getDestPathHash());
} else if (tabletCtx.getBalanceType() ==
TabletSchedCtx.BalanceType.BE_BALANCE) {
- task = rebalancer.createBalanceTask(tabletCtx,
backendsWorkingSlots);
+ task = rebalancer.createBalanceTask(tabletCtx);
} else {
throw new SchedException(Status.UNRECOVERABLE,
"unknown balance type: " +
tabletCtx.getBalanceType().toString());
}
batchTask.addTask(task);
+ incrDestPathCopingSize(tabletCtx);
}
// choose a path on a backend which is fit for the tablet
@@ -1294,7 +1309,7 @@ public class TabletScheduler extends MasterDaemon {
// get all available paths which this tablet can fit in.
// beStatistics is sorted by mix load score in ascend order, so select
from first to last.
- List<RootPathLoadStatistic> allFitPaths = Lists.newArrayList();
+ List<BePathLoadStatPair> allFitPaths = Lists.newArrayList();
for (BackendLoadStatistic bes : beStatistics) {
if (!bes.isAvailable()) {
LOG.debug("backend {} is not available, skip. tablet: {}",
bes.getBeId(), tabletCtx.getTabletId());
@@ -1343,18 +1358,21 @@ public class TabletScheduler extends MasterDaemon {
}
}
- Preconditions.checkState(resultPaths.size() == 1);
- allFitPaths.add(resultPaths.get(0));
+ resultPaths.stream().forEach(path -> allFitPaths.add(new
BePathLoadStatPair(bes, path)));
}
if (allFitPaths.isEmpty()) {
throw new SchedException(Status.UNRECOVERABLE, "unable to find
dest path for new replica");
}
+ BePathLoadStatPairComparator comparator = new
BePathLoadStatPairComparator(allFitPaths);
+ Collections.sort(allFitPaths, comparator);
+
// all fit paths has already been sorted by load score in
'allFitPaths' in ascend order.
// just get first available path.
// we try to find a path with specified media type, if not find,
arbitrarily use one.
- for (RootPathLoadStatistic rootPathLoadStatistic : allFitPaths) {
+ for (BePathLoadStatPair bePathLoadStat : allFitPaths) {
+ RootPathLoadStatistic rootPathLoadStatistic =
bePathLoadStat.getPathLoadStatistic();
if (rootPathLoadStatistic.getStorageMedium() !=
tabletCtx.getStorageMedium()) {
LOG.debug("backend {}'s path {}'s storage medium {} "
+ "is not equal to tablet's storage medium {},
skip. tablet: {}",
@@ -1385,7 +1403,8 @@ public class TabletScheduler extends MasterDaemon {
boolean hasBePath = false;
// no root path with specified media type is found, get arbitrary one.
- for (RootPathLoadStatistic rootPathLoadStatistic : allFitPaths) {
+ for (BePathLoadStatPair bePathLoadStat : allFitPaths) {
+ RootPathLoadStatistic rootPathLoadStatistic =
bePathLoadStat.getPathLoadStatistic();
PathSlot slot =
backendsWorkingSlots.get(rootPathLoadStatistic.getBeId());
if (slot == null) {
LOG.debug("backend {}'s path {}'s slot is null, skip. tablet:
{}",
@@ -1622,7 +1641,10 @@ public class TabletScheduler extends MasterDaemon {
tabletCtx.increaseFailedRunningCounter();
if (!tabletCtx.isExceedFailedRunningLimit()) {
stat.counterCloneTaskFailed.incrementAndGet();
- addToRunningTablets(tabletCtx);
+ tabletCtx.releaseResource(this);
+ tabletCtx.resetFailedSchedCounter();
+ tabletCtx.setState(TabletSchedCtx.State.PENDING);
+ addBackToPendingTablets(tabletCtx);
return false;
} else {
// unrecoverable
@@ -1767,9 +1789,42 @@ public class TabletScheduler extends MasterDaemon {
return allTabletTypes.size();
}
- public synchronized long getBalanceTabletsNumber() {
- return pendingTablets.stream().filter(t -> t.getType() ==
Type.BALANCE).count()
- + runningTablets.values().stream().filter(t -> t.getType() ==
Type.BALANCE).count();
+ public synchronized int getBalanceTabletsNumber() {
+ return (int) (pendingTablets.stream().filter(t -> t.getType() ==
Type.BALANCE).count()
+ + runningTablets.values().stream().filter(t -> t.getType() ==
Type.BALANCE).count());
+ }
+
+ private synchronized Map<Long, Long> getPathsCopingSize() {
+ Map<Long, Long> pathsCopingSize = Maps.newHashMap();
+ for (TabletSchedCtx tablet : runningTablets.values()) {
+ long pathHash = tablet.getDestPathHash();
+ if (pathHash == 0 || pathHash == -1) {
+ continue;
+ }
+
+ long copingSize = tablet.getDestEstimatedCopingSize();
+ if (copingSize > 0) {
+ Long size = pathsCopingSize.getOrDefault(pathHash, 0L);
+ pathsCopingSize.put(pathHash, size + copingSize);
+ }
+ }
+ return pathsCopingSize;
+ }
+
+ private void incrDestPathCopingSize(TabletSchedCtx tablet) {
+ long destPathHash = tablet.getDestPathHash();
+ if (destPathHash == -1 || destPathHash == 0) {
+ return;
+ }
+
+ for (LoadStatisticForTag loadStatistic : statisticMap.values()) {
+ BackendLoadStatistic beLoadStatistic =
loadStatistic.getBackendLoadStatistics().stream()
+ .filter(v -> v.getBeId() ==
tablet.getDestBackendId()).findFirst().orElse(null);
+ if (beLoadStatistic != null) {
+ beLoadStatistic.incrPathCopingSize(destPathHash,
tablet.getDestEstimatedCopingSize());
+ break;
+ }
+ }
}
/**
@@ -1782,22 +1837,22 @@ public class TabletScheduler extends MasterDaemon {
private Map<Long, Slot> pathSlots = Maps.newConcurrentMap();
private long beId;
- public PathSlot(List<Long> paths, long beId) {
+ public PathSlot(Map<Long, TStorageMedium> paths, long beId) {
this.beId = beId;
- for (Long pathHash : paths) {
- pathSlots.put(pathHash, new Slot(beId));
+ for (Map.Entry<Long, TStorageMedium> entry : paths.entrySet()) {
+ pathSlots.put(entry.getKey(), new Slot(entry.getValue()));
}
}
// update the path
- public synchronized void updatePaths(List<Long> paths) {
+ public synchronized void updatePaths(Map<Long, TStorageMedium> paths) {
// delete non exist path
- pathSlots.entrySet().removeIf(entry ->
!paths.contains(entry.getKey()));
+ pathSlots.entrySet().removeIf(entry ->
!paths.containsKey(entry.getKey()));
// add new path
- for (Long pathHash : paths) {
- if (!pathSlots.containsKey(pathHash)) {
- pathSlots.put(pathHash, new Slot(beId));
+ for (Map.Entry<Long, TStorageMedium> entry : paths.entrySet()) {
+ if (!pathSlots.containsKey(entry.getKey())) {
+ pathSlots.put(entry.getKey(), new Slot(entry.getValue()));
}
}
}
@@ -1829,6 +1884,20 @@ public class TabletScheduler extends MasterDaemon {
return true;
}
+ public synchronized boolean hasAvailableBalanceSlot(long pathHash) {
+ if (pathHash == -1) {
+ return false;
+ }
+ Slot slot = pathSlots.get(pathHash);
+ if (slot == null) {
+ return false;
+ }
+ if (slot.getAvailableBalance() == 0) {
+ return false;
+ }
+ return true;
+ }
+
/**
* If the specified 'pathHash' has available slot, decrease the slot
number and return this path hash
*/
@@ -1872,27 +1941,27 @@ public class TabletScheduler extends MasterDaemon {
return total;
}
+ public synchronized int getTotalAvailBalanceSlotNum() {
+ int num = 0;
+ for (Slot slot : pathSlots.values()) {
+ num += slot.getAvailableBalance();
+ }
+ return num;
+ }
+
/**
* get path whose balance slot num is larger than 0
*/
public synchronized Set<Long> getAvailPathsForBalance() {
Set<Long> pathHashs = Sets.newHashSet();
for (Map.Entry<Long, Slot> entry : pathSlots.entrySet()) {
- if (entry.getValue().getBalanceAvailable() > 0) {
+ if (entry.getValue().getAvailableBalance() > 0) {
pathHashs.add(entry.getKey());
}
}
return pathHashs;
}
- public synchronized int getAvailBalanceSlotNum() {
- int num = 0;
- for (Map.Entry<Long, Slot> entry : pathSlots.entrySet()) {
- num += entry.getValue().getBalanceAvailable();
- }
- return num;
- }
-
public synchronized List<List<String>> getSlotInfo(long beId) {
List<List<String>> results = Lists.newArrayList();
pathSlots.forEach((key, value) -> {
@@ -1901,13 +1970,18 @@ public class TabletScheduler extends MasterDaemon {
result.add(String.valueOf(key));
result.add(String.valueOf(value.getAvailable()));
result.add(String.valueOf(value.getTotal()));
- result.add(String.valueOf(value.getBalanceAvailable()));
+ result.add(String.valueOf(value.getAvailableBalance()));
result.add(String.valueOf(value.getAvgRate()));
results.add(result);
});
return results;
}
+ public synchronized int getAvailableBalanceNum(long pathHash) {
+ Slot slot = pathSlots.get(pathHash);
+ return slot != null ? slot.getAvailableBalance() : 0;
+ }
+
public synchronized long takeBalanceSlot(long pathHash) {
Slot slot = pathSlots.get(pathHash);
if (slot == null) {
@@ -1980,10 +2054,10 @@ public class TabletScheduler extends MasterDaemon {
// for disk balance
public long diskBalanceLastSuccTime = 0;
- private long beId;
+ private TStorageMedium storageMedium;
- public Slot(long beId) {
- this.beId = beId;
+ public Slot(TStorageMedium storageMedium) {
+ this.storageMedium = storageMedium;
this.used = 0;
this.balanceUsed = 0;
}
@@ -1993,18 +2067,16 @@ public class TabletScheduler extends MasterDaemon {
}
public int getTotal() {
- int total = Math.max(1, Config.schedule_slot_num_per_path);
-
- Backend be = Env.getCurrentSystemInfo().getBackend(beId);
- if (be != null && be.isDecommissioned()) {
- total = Math.max(1,
Config.schedule_decommission_slot_num_per_path);
+ if (storageMedium == TStorageMedium.SSD) {
+ return Config.schedule_slot_num_per_ssd_path;
+ } else {
+ return Config.schedule_slot_num_per_hdd_path;
}
-
- return total;
}
- public int getBalanceAvailable() {
- return Math.max(0, getBalanceTotal() - balanceUsed);
+ public int getAvailableBalance() {
+ int leftBalance = Math.max(0, getBalanceTotal() - balanceUsed);
+ return Math.min(leftBalance, getAvailable());
}
public int getBalanceTotal() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
index 59f3efa32f..789315c33f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
@@ -67,6 +67,8 @@ public class FeConstants {
public static String null_string = "\\N";
public static long tablet_checker_interval_ms = 20 * 1000L;
+ public static long tablet_schedule_interval_ms = 100L;
+
public static String csv = "csv";
public static String csv_with_names = "csv_with_names";
public static String csv_with_names_and_types = "csv_with_names_and_types";
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletSchedulerDetailProcDir.java
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletSchedulerDetailProcDir.java
index 4441a99431..67e24870f9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletSchedulerDetailProcDir.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletSchedulerDetailProcDir.java
@@ -37,7 +37,7 @@ public class TabletSchedulerDetailProcDir implements
ProcDirInterface {
public static final ImmutableList<String> TITLE_NAMES = new
ImmutableList.Builder<String>().add("TabletId")
.add("Type").add("Medium").add("Status").add("State").add("SchedCode").add("Priority").add("SrcBe")
.add("SrcPath").add("DestBe").add("DestPath").add("Timeout").add("Create").add("LstSched").add("LstVisit")
-
.add("Finished").add("Rate").add("FailedSched").add("FailedRunning").add("VisibleVer")
+
.add("Finished").add("ReplicaSize").add("Rate").add("FailedSched").add("FailedRunning").add("VisibleVer")
.add("CmtVer").add("ErrMsg")
.build();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/CloneTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/CloneTask.java
index 019fded640..60531ced30 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/CloneTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/CloneTask.java
@@ -33,6 +33,7 @@ public class CloneTask extends AgentTask {
private long replicaId;
private List<TBackend> srcBackends;
private TStorageMedium storageMedium;
+ private TBackend destBackend;
private long visibleVersion;
@@ -43,10 +44,11 @@ public class CloneTask extends AgentTask {
private int taskVersion = VERSION_1;
- public CloneTask(long backendId, long dbId, long tableId, long
partitionId, long indexId, long tabletId,
- long replicaId, int schemaHash, List<TBackend> srcBackends,
TStorageMedium storageMedium,
- long visibleVersion, int timeoutS) {
+ public CloneTask(TBackend destBackend, long backendId, long dbId, long
tableId, long partitionId,
+ long indexId, long tabletId, long replicaId, int schemaHash,
List<TBackend> srcBackends,
+ TStorageMedium storageMedium, long visibleVersion, int timeoutS) {
super(null, backendId, TTaskType.CLONE, dbId, tableId, partitionId,
indexId, tabletId);
+ this.destBackend = destBackend;
this.replicaId = replicaId;
this.schemaHash = schemaHash;
this.srcBackends = srcBackends;
@@ -95,15 +97,16 @@ public class CloneTask extends AgentTask {
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
- sb.append("tablet id: ").append(tabletId).append(", replica id:
").append(replicaId).append(", schema hash: ")
- .append(schemaHash);
+ sb.append("tablet id: ").append(tabletId)
+ .append(", replica id: ").append(replicaId)
+ .append(", schema hash: ").append(schemaHash);
sb.append(", storageMedium: ").append(storageMedium.name());
sb.append(", visible version: ").append(visibleVersion);
sb.append(", src backend: ").append(srcBackends.get(0).getHost())
.append(", src path hash: ").append(srcPathHash);
- sb.append(", src backend:
").append(srcBackends.get(0).getHost()).append(", src path hash: ")
- .append(srcPathHash);
- sb.append(", dest backend: ").append(backendId).append(", dest path
hash: ").append(destPathHash);
+ sb.append(", dest backend id: ").append(backendId)
+ .append(", dest backend: ").append(destBackend.getHost())
+ .append(", dest path hash: ").append(destPathHash);
return sb.toString();
}
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/clone/DecommissionTest.java
b/fe/fe-core/src/test/java/org/apache/doris/clone/DecommissionTest.java
new file mode 100644
index 0000000000..43ea5340bc
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/DecommissionTest.java
@@ -0,0 +1,174 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.clone;
+
+import org.apache.doris.analysis.AlterSystemStmt;
+import org.apache.doris.analysis.CreateDbStmt;
+import org.apache.doris.analysis.CreateTableStmt;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.TabletInvertedIndex;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.ExceptionChecker;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.thrift.TDisk;
+import org.apache.doris.thrift.TStorageMedium;
+import org.apache.doris.utframe.UtFrameUtils;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+
+public class DecommissionTest {
+ private static final Logger LOG =
LogManager.getLogger(TabletReplicaTooSlowTest.class);
+ // use a unique dir so that it won't be conflict with other unit test which
+ // may also start a Mocked Frontend
+ private static String runningDirBase = "fe";
+ private static String runningDir = runningDirBase +
"/mocked/DecommissionTest/" + UUID.randomUUID() + "/";
+ private static ConnectContext connectContext;
+
+ private static Random random = new Random(System.currentTimeMillis());
+
+ private long id = 10086;
+
+ private final SystemInfoService systemInfoService = new
SystemInfoService();
+ private final TabletInvertedIndex invertedIndex = new
TabletInvertedIndex();
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ FeConstants.runningUnitTest = true;
+ System.out.println(runningDir);
+ FeConstants.runningUnitTest = true;
+ FeConstants.tablet_checker_interval_ms = 200;
+ FeConstants.tablet_schedule_interval_ms = 2000;
+ Config.tablet_repair_delay_factor_second = 1;
+ Config.enable_round_robin_create_tablet = true;
+ Config.schedule_slot_num_per_hdd_path = 10000;
+ Config.max_scheduling_tablets = 10000;
+ Config.schedule_batch_size = 10000;
+ Config.disable_balance = true;
+ // 4 backends:
+ // 127.0.0.1
+ // 127.0.0.2
+ // 127.0.0.3
+ // 127.0.0.4
+ UtFrameUtils.createDorisClusterWithMultiTag(runningDir, 4);
+ List<Backend> backends = Env.getCurrentSystemInfo().getAllBackends();
+ for (Backend be : backends) {
+ Map<String, TDisk> backendDisks = Maps.newHashMap();
+ TDisk tDisk1 = new TDisk();
+ tDisk1.setRootPath("/home/doris1.HDD");
+ tDisk1.setDiskTotalCapacity(20000000);
+ tDisk1.setDataUsedCapacity(1);
+ tDisk1.setUsed(true);
+ tDisk1.setDiskAvailableCapacity(tDisk1.disk_total_capacity -
tDisk1.data_used_capacity);
+ tDisk1.setPathHash(random.nextLong());
+ tDisk1.setStorageMedium(TStorageMedium.HDD);
+ backendDisks.put(tDisk1.getRootPath(), tDisk1);
+
+ TDisk tDisk2 = new TDisk();
+ tDisk2.setRootPath("/home/doris2.HHD");
+ tDisk2.setDiskTotalCapacity(20000000);
+ tDisk2.setDataUsedCapacity(1);
+ tDisk2.setUsed(true);
+ tDisk2.setDiskAvailableCapacity(tDisk2.disk_total_capacity -
tDisk2.data_used_capacity);
+ tDisk2.setPathHash(random.nextLong());
+ tDisk2.setStorageMedium(TStorageMedium.HDD);
+ backendDisks.put(tDisk2.getRootPath(), tDisk2);
+
+ be.updateDisks(backendDisks);
+ }
+
+ connectContext = UtFrameUtils.createDefaultCtx();
+
+ // create database
+ String createDbStmtStr = "create database test;";
+ CreateDbStmt createDbStmt = (CreateDbStmt)
UtFrameUtils.parseAndAnalyzeStmt(createDbStmtStr, connectContext);
+ Env.getCurrentEnv().createDb(createDbStmt);
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ //UtFrameUtils.cleanDorisFeDir(runningDirBase);
+ }
+
+ private static void createTable(String sql) throws Exception {
+ CreateTableStmt createTableStmt = (CreateTableStmt)
UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext);
+ Env.getCurrentEnv().createTable(createTableStmt);
+ RebalancerTestUtil.updateReplicaPathHash();
+ }
+
+ @Test
+ public void testDecommissionBackend() throws Exception {
+ // test colocate tablet repair
+ String createStr = "create table test.tbl1\n"
+ + "(k1 date, k2 int)\n"
+ + "distributed by hash(k2) buckets 2400\n"
+ + "properties\n"
+ + "(\n"
+ + " \"replication_num\" = \"1\"\n"
+ + ")";
+ ExceptionChecker.expectThrowsNoException(() -> createTable(createStr));
+ int totalReplicaNum = 1 * 2400;
+ checkBalance(1, totalReplicaNum, 4);
+
+ Backend backend = Env.getCurrentSystemInfo().getAllBackends().get(0);
+ String decommissionStmtStr = "alter system decommission backend \"" +
backend.getHost()
+ + ":" + backend.getHeartbeatPort() + "\"";
+ AlterSystemStmt decommissionStmt =
+ (AlterSystemStmt)
UtFrameUtils.parseAndAnalyzeStmt(decommissionStmtStr, connectContext);
+
Env.getCurrentEnv().getAlterInstance().processAlterCluster(decommissionStmt);
+
+ Assert.assertEquals(true, backend.isDecommissioned());
+
+ checkBalance(200, totalReplicaNum, 3);
+ }
+
+ void checkBalance(int tryTimes, int totalReplicaNum, int backendNum)
throws Exception {
+ int beReplicaNum = totalReplicaNum / backendNum;
+ for (int i = 0; i < tryTimes; i++) {
+ List<Long> backendIds =
Env.getCurrentSystemInfo().getAllBackendIds(true);
+ if (backendNum != backendIds.size() && i != tryTimes - 1) {
+ Thread.sleep(1000);
+ continue;
+ }
+
+ List<Integer> tabletNums = Lists.newArrayList();
+ for (long beId : backendIds) {
+
tabletNums.add(Env.getCurrentInvertedIndex().getTabletNumByBackendId(beId));
+ }
+
+ Assert.assertEquals("tablet nums = " + tabletNums, backendNum,
backendIds.size());
+ for (int tabletNum : tabletNums) {
+ Assert.assertEquals("tablet nums = " + tabletNums,
beReplicaNum, tabletNum);
+ }
+ }
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/clone/DiskRebalanceTest.java
b/fe/fe-core/src/test/java/org/apache/doris/clone/DiskRebalanceTest.java
index 9d5ffe1e6d..457466d72a 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/clone/DiskRebalanceTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/DiskRebalanceTest.java
@@ -20,7 +20,6 @@ package org.apache.doris.clone;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DataProperty;
import org.apache.doris.catalog.Database;
-import org.apache.doris.catalog.DiskInfo;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HashDistributionInfo;
import org.apache.doris.catalog.KeysType;
@@ -36,7 +35,6 @@ import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.resource.Tag;
-import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.task.AgentTask;
import org.apache.doris.task.StorageMediaMigrationTask;
@@ -60,7 +58,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
-import java.util.stream.Collectors;
import java.util.stream.LongStream;
public class DiskRebalanceTest {
@@ -79,10 +76,12 @@ public class DiskRebalanceTest {
private final SystemInfoService systemInfoService = new
SystemInfoService();
private final TabletInvertedIndex invertedIndex = new
TabletInvertedIndex();
private Map<Tag, LoadStatisticForTag> statisticMap;
+ private Map<Long, PathSlot> backendsWorkingSlots = Maps.newHashMap();
@Before
public void setUp() throws Exception {
Config.used_capacity_percent_max_diff = 1.0;
+ Config.balance_slot_num_per_path = 1;
db = new Database(1, "test db");
db.setClusterName(SystemInfoService.DEFAULT_CLUSTER);
new Expectations() {
@@ -137,12 +136,19 @@ public class DiskRebalanceTest {
Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(1, 2,
Lists.newArrayList(3L)));
}
- private void generateStatisticMap() {
+ private void generateStatisticsAndPathSlots() {
LoadStatisticForTag loadStatistic = new
LoadStatisticForTag(Tag.DEFAULT_BACKEND_TAG, systemInfoService,
invertedIndex);
loadStatistic.init();
statisticMap = Maps.newHashMap();
statisticMap.put(Tag.DEFAULT_BACKEND_TAG, loadStatistic);
+ backendsWorkingSlots.clear();
+ for (BackendLoadStatistic beStat :
loadStatistic.getSortedBeLoadStats(null)) {
+ Map<Long, TStorageMedium> paths = Maps.newHashMap();
+ beStat.getPathStatistics().stream().forEach(
+ path -> paths.put(path.getPathHash(),
path.getStorageMedium()));
+ backendsWorkingSlots.put(beStat.getBeId(), new PathSlot(paths,
beStat.getBeId()));
+ }
}
private void createPartitionsForTable(OlapTable olapTable,
MaterializedIndex index, Long partitionCount) {
@@ -187,8 +193,9 @@ public class DiskRebalanceTest {
// case start
Configurator.setLevel("org.apache.doris.clone.DiskRebalancer",
Level.DEBUG);
- Rebalancer rebalancer = new DiskRebalancer(Env.getCurrentSystemInfo(),
Env.getCurrentInvertedIndex());
- generateStatisticMap();
+ generateStatisticsAndPathSlots();
+ Rebalancer rebalancer = new DiskRebalancer(Env.getCurrentSystemInfo(),
Env.getCurrentInvertedIndex(),
+ backendsWorkingSlots);
rebalancer.updateLoadStatistic(statisticMap);
List<TabletSchedCtx> alternativeTablets =
rebalancer.selectAlternativeTablets();
// check alternativeTablets;
@@ -229,8 +236,9 @@ public class DiskRebalanceTest {
// case start
Configurator.setLevel("org.apache.doris.clone.DiskRebalancer",
Level.DEBUG);
- Rebalancer rebalancer = new DiskRebalancer(Env.getCurrentSystemInfo(),
Env.getCurrentInvertedIndex());
- generateStatisticMap();
+ generateStatisticsAndPathSlots();
+ Rebalancer rebalancer = new DiskRebalancer(Env.getCurrentSystemInfo(),
Env.getCurrentInvertedIndex(),
+ backendsWorkingSlots);
rebalancer.updateLoadStatistic(statisticMap);
for (Map.Entry<Tag, LoadStatisticForTag> s : statisticMap.entrySet()) {
if (s.getValue() != null) {
@@ -240,16 +248,6 @@ public class DiskRebalanceTest {
List<TabletSchedCtx> alternativeTablets =
rebalancer.selectAlternativeTablets();
// check alternativeTablets;
Assert.assertEquals(2, alternativeTablets.size());
- Map<Long, PathSlot> backendsWorkingSlots = Maps.newConcurrentMap();
- for (Backend be : Env.getCurrentSystemInfo().getAllBackends()) {
- if (!backendsWorkingSlots.containsKey(be.getId())) {
- List<Long> pathHashes =
be.getDisks().values().stream().map(DiskInfo::getPathHash)
- .collect(Collectors.toList());
- PathSlot slot = new PathSlot(pathHashes,
Config.schedule_slot_num_per_path);
- backendsWorkingSlots.put(be.getId(), slot);
- }
- }
-
for (TabletSchedCtx tabletCtx : alternativeTablets) {
LOG.info("try to schedule tablet {}", tabletCtx.getTabletId());
try {
@@ -259,7 +257,7 @@ public class DiskRebalanceTest {
tabletCtx.setSchemaHash(olapTable.getSchemaHashByIndexId(tabletCtx.getIndexId()));
tabletCtx.setTabletStatus(Tablet.TabletStatus.HEALTHY); //
rebalance tablet should be healthy first
- AgentTask task = rebalancer.createBalanceTask(tabletCtx,
backendsWorkingSlots);
+ AgentTask task = rebalancer.createBalanceTask(tabletCtx);
if (tabletCtx.getTabletSize() == 0) {
Assert.fail("no exception");
} else {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java
b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java
index c36ef531c2..fe47338398 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java
@@ -196,7 +196,7 @@ public class RebalanceTest {
@Test
public void testPrioBackends() {
- Rebalancer rebalancer = new DiskRebalancer(Env.getCurrentSystemInfo(),
Env.getCurrentInvertedIndex());
+ Rebalancer rebalancer = new DiskRebalancer(Env.getCurrentSystemInfo(),
Env.getCurrentInvertedIndex(), null);
// add
{ // CHECKSTYLE IGNORE THIS LINE
List<Backend> backends = Lists.newArrayList();
@@ -232,7 +232,7 @@ public class RebalanceTest {
// Call runAfterCatalogReady manually instead of starting daemon thread
TabletSchedulerStat stat = new TabletSchedulerStat();
PartitionRebalancer rebalancer = new
PartitionRebalancer(Env.getCurrentSystemInfo(),
- Env.getCurrentInvertedIndex());
+ Env.getCurrentInvertedIndex(), null);
TabletScheduler tabletScheduler = new TabletScheduler(env,
systemInfoService, invertedIndex, stat, "");
// The rebalancer inside the scheduler will use this rebalancer, for
getToDeleteReplicaId
Deencapsulation.setField(tabletScheduler, "rebalancer", rebalancer);
@@ -256,7 +256,7 @@ public class RebalanceTest {
tabletCtx.setTabletStatus(Tablet.TabletStatus.HEALTHY); //
rebalance tablet should be healthy first
// createCloneReplicaAndTask, create replica will change
invertedIndex too.
- AgentTask task = rebalancer.createBalanceTask(tabletCtx,
tabletScheduler.getBackendsWorkingSlots());
+ AgentTask task = rebalancer.createBalanceTask(tabletCtx);
batchTask.addTask(task);
} catch (SchedException e) {
LOG.warn("schedule tablet {} failed: {}",
tabletCtx.getTabletId(), e.getMessage());
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java
b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java
index 7d43a5fb77..95f71d0b51 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java
@@ -19,6 +19,7 @@ package org.apache.doris.clone;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DiskInfo;
+import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
@@ -32,6 +33,7 @@ import org.apache.doris.thrift.TStorageMedium;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Table;
import java.util.List;
import java.util.Map;
@@ -106,4 +108,25 @@ public class RebalancerTestUtil {
invertedIndex.addReplica(tablet.getId(), replica);
});
}
+
+ public static void updateReplicaPathHash() {
+ Table<Long, Long, Replica> replicaMetaTable =
Env.getCurrentInvertedIndex().getReplicaMetaTable();
+ for (Table.Cell<Long, Long, Replica> cell :
replicaMetaTable.cellSet()) {
+ long beId = cell.getColumnKey();
+ Backend be = Env.getCurrentSystemInfo().getBackend(beId);
+ if (be == null) {
+ continue;
+ }
+ Replica replica = cell.getValue();
+ TabletMeta tabletMeta =
Env.getCurrentInvertedIndex().getTabletMeta(cell.getRowKey());
+ ImmutableMap<String, DiskInfo> diskMap = be.getDisks();
+ for (DiskInfo diskInfo : diskMap.values()) {
+ if (diskInfo.getStorageMedium() ==
tabletMeta.getStorageMedium()) {
+ replica.setPathHash(diskInfo.getPathHash());
+ break;
+ }
+ }
+ }
+ }
+
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/clone/RootPathLoadStatisticTest.java
b/fe/fe-core/src/test/java/org/apache/doris/clone/RootPathLoadStatisticTest.java
index a0d8dd94c0..efb22d333a 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/clone/RootPathLoadStatisticTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/clone/RootPathLoadStatisticTest.java
@@ -31,18 +31,22 @@ public class RootPathLoadStatisticTest {
@Test
public void test() {
- RootPathLoadStatistic usageLow = new RootPathLoadStatistic(0L,
"/home/disk1", 12345L, TStorageMedium.HDD, 4096L,
+ RootPathLoadStatistic usage1 = new RootPathLoadStatistic(0L,
"/home/disk1", 12345L, TStorageMedium.HDD, 4096L,
1024L, DiskState.ONLINE);
- RootPathLoadStatistic usageHigh = new RootPathLoadStatistic(0L,
"/home/disk2", 67890L, TStorageMedium.HDD,
+ RootPathLoadStatistic usage2 = new RootPathLoadStatistic(0L,
"/home/disk2", 67890L, TStorageMedium.HDD,
4096L, 2048L, DiskState.ONLINE);
List<RootPathLoadStatistic> list = Lists.newArrayList();
- list.add(usageLow);
- list.add(usageHigh);
+ list.add(usage1);
+ list.add(usage2);
// low usage should be ahead
Collections.sort(list);
- Assert.assertTrue(list.get(0).getPathHash() == usageLow.getPathHash());
+ Assert.assertTrue(list.get(0).getPathHash() == usage1.getPathHash());
+
+ usage1.incrCopingSizeB(2048L);
+ Collections.sort(list);
+ Assert.assertTrue(list.get(1).getPathHash() == usage1.getPathHash());
}
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java
b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java
index 5c58954952..0e9f1446c7 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java
@@ -26,7 +26,6 @@ import org.apache.doris.analysis.DropTableStmt;
import org.apache.doris.catalog.ColocateGroupSchema;
import org.apache.doris.catalog.ColocateTableIndex;
import org.apache.doris.catalog.Database;
-import org.apache.doris.catalog.DiskInfo;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.OlapTable;
@@ -36,7 +35,6 @@ import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.TabletInvertedIndex;
-import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
@@ -54,7 +52,6 @@ import org.apache.doris.thrift.TDisk;
import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.utframe.UtFrameUtils;
-import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Table;
@@ -162,7 +159,7 @@ public class TabletRepairAndBalanceTest {
CreateTableStmt createTableStmt = (CreateTableStmt)
UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext);
Env.getCurrentEnv().createTable(createTableStmt);
// must set replicas' path hash, or the tablet scheduler won't work
- updateReplicaPathHash();
+ RebalancerTestUtil.updateReplicaPathHash();
}
private static void dropTable(String sql) throws Exception {
@@ -170,26 +167,6 @@ public class TabletRepairAndBalanceTest {
Env.getCurrentEnv().dropTable(dropTableStmt);
}
- private static void updateReplicaPathHash() {
- Table<Long, Long, Replica> replicaMetaTable =
Env.getCurrentInvertedIndex().getReplicaMetaTable();
- for (Table.Cell<Long, Long, Replica> cell :
replicaMetaTable.cellSet()) {
- long beId = cell.getColumnKey();
- Backend be = Env.getCurrentSystemInfo().getBackend(beId);
- if (be == null) {
- continue;
- }
- Replica replica = cell.getValue();
- TabletMeta tabletMeta =
Env.getCurrentInvertedIndex().getTabletMeta(cell.getRowKey());
- ImmutableMap<String, DiskInfo> diskMap = be.getDisks();
- for (DiskInfo diskInfo : diskMap.values()) {
- if (diskInfo.getStorageMedium() ==
tabletMeta.getStorageMedium()) {
- replica.setPathHash(diskInfo.getPathHash());
- break;
- }
- }
- }
- }
-
private static void alterTable(String sql) throws Exception {
AlterTableStmt alterTableStmt = (AlterTableStmt)
UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext);
Env.getCurrentEnv().getAlterInstance().processAlterTable(alterTableStmt);
@@ -498,7 +475,7 @@ public class TabletRepairAndBalanceTest {
ExceptionChecker.expectThrowsNoException(() ->
createTable(createStr6));
OlapTable tbl3 = db.getOlapTableOrDdlException("col_tbl3");
- updateReplicaPathHash();
+ RebalancerTestUtil.updateReplicaPathHash();
// Set one replica's state as DECOMMISSION, see if it can be changed
to NORMAL
Tablet oneTablet = null;
Replica oneReplica = null;
diff --git a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
index 86d3fda79f..b0b862e3dd 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
@@ -114,7 +114,8 @@ public class AgentTaskTest {
// clone
cloneTask =
- new CloneTask(backendId1, dbId, tableId, partitionId,
indexId1, tabletId1, replicaId1, schemaHash1,
+ new CloneTask(new TBackend("host2", 8290, 8390), backendId1,
dbId, tableId, partitionId,
+ indexId1, tabletId1, replicaId1, schemaHash1,
Arrays.asList(new TBackend("host1", 8290, 8390)),
TStorageMedium.HDD, -1, 3600);
// storageMediaMigrationTask
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]