This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 0c23915f6d2 [improvement](tablet schedule) colocate balance between
all groups #23543 (#26229)
0c23915f6d2 is described below
commit 0c23915f6d2d0bf7e8e4cc8fbd054b9abfb08758
Author: yujun <[email protected]>
AuthorDate: Wed Nov 1 22:06:43 2023 +0800
[improvement](tablet schedule) colocate balance between all groups #23543
(#26229)
---
.../main/java/org/apache/doris/common/Config.java | 4 +
.../clone/ColocateTableCheckerAndBalancer.java | 521 +++++++++++++++++++--
.../org/apache/doris/clone/TabletScheduler.java | 2 +-
.../org/apache/doris/catalog/CatalogTestUtil.java | 44 ++
.../org/apache/doris/clone/BalanceStatistic.java | 102 ++++
.../ColocateTableCheckerAndBalancerPerfTest.java | 196 ++++++++
.../clone/ColocateTableCheckerAndBalancerTest.java | 112 ++++-
.../org/apache/doris/clone/RebalancerTestUtil.java | 48 +-
.../apache/doris/utframe/MockedBackendFactory.java | 74 ++-
.../apache/doris/utframe/TestWithFeService.java | 5 +-
.../org/apache/doris/utframe/UtFrameUtils.java | 6 +-
11 files changed, 1049 insertions(+), 65 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index bf499ef0af4..b5f7b0ce92b 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -723,6 +723,10 @@ public class Config extends ConfigBase {
*/
@ConfField(mutable = true, masterOnly = true) public static boolean
disable_colocate_balance = false;
+ @ConfField(mutable = true, masterOnly = true, description =
{"是否启用group间的均衡",
+ "is allow colocate balance between all groups"})
+ public static boolean disable_colocate_balance_between_groups = false;
+
/**
* The default user resource publishing timeout.
*/
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
index 5c18c2bd468..e70ec445cd6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
@@ -49,6 +49,7 @@ import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -79,6 +80,248 @@ public class ColocateTableCheckerAndBalancer extends
MasterDaemon {
return INSTANCE;
}
+ public static class BucketStatistic {
+ public int tabletOrderIdx;
+ public int totalReplicaNum;
+ public long totalReplicaDataSize;
+
+ public BucketStatistic(int tabletOrderIdx, int totalReplicaNum, long
totalReplicaDataSize) {
+ this.tabletOrderIdx = tabletOrderIdx;
+ this.totalReplicaNum = totalReplicaNum;
+ this.totalReplicaDataSize = totalReplicaDataSize;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof BucketStatistic)) {
+ return false;
+ }
+
+ BucketStatistic other = (BucketStatistic) obj;
+ return tabletOrderIdx == other.tabletOrderIdx && totalReplicaNum
== other.totalReplicaNum
+ && totalReplicaDataSize == other.totalReplicaDataSize;
+ }
+
+ @Override
+ public String toString() {
+ return "{ orderIdx: " + tabletOrderIdx + ", total replica num: " +
totalReplicaNum
+ + ", total data size: " + totalReplicaDataSize + " }";
+ }
+ }
+
+ public static class BackendBuckets {
+ private long beId;
+ private Map<GroupId, List<Integer>> groupTabletOrderIndices =
Maps.newHashMap();
+
+ public BackendBuckets(long beId) {
+ this.beId = beId;
+ }
+
+ // for test
+ public Map<GroupId, List<Integer>> getGroupTabletOrderIndices() {
+ return groupTabletOrderIndices;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof BackendBuckets)) {
+ return false;
+ }
+
+ BackendBuckets other = (BackendBuckets) obj;
+ return beId == other.beId &&
groupTabletOrderIndices.equals(other.groupTabletOrderIndices);
+ }
+
+ @Override
+ public String toString() {
+ return "{ backendId: " + beId + ", group order index: " +
groupTabletOrderIndices + " }";
+ }
+
+ public void addGroupTablet(GroupId groupId, int tabletOrderIdx) {
+ List<Integer> indices = groupTabletOrderIndices.get(groupId);
+ if (indices == null) {
+ indices = Lists.newArrayList();
+ groupTabletOrderIndices.put(groupId, indices);
+ }
+ indices.add(tabletOrderIdx);
+ }
+
+ public void removeGroupTablet(GroupId groupId, int tabletOrderIdx) {
+ List<Integer> indices = groupTabletOrderIndices.get(groupId);
+ if (indices == null) {
+ return;
+ }
+
+ indices.remove(Integer.valueOf(tabletOrderIdx));
+ if (indices.isEmpty()) {
+ groupTabletOrderIndices.remove(groupId);
+ }
+ }
+
+ public boolean containsGroupTablet(GroupId groupId, int
tabletOrderIdx) {
+ List<Integer> indices = groupTabletOrderIndices.get(groupId);
+ if (indices == null) {
+ return false;
+ }
+
+ return indices.indexOf(Integer.valueOf(tabletOrderIdx)) >= 0;
+ }
+
+ public int getTotalReplicaNum(Map<GroupId, List<BucketStatistic>>
allGroupBucketsMap) {
+ int totalReplicaNum = 0;
+ for (Map.Entry<GroupId, List<Integer>> entry :
groupTabletOrderIndices.entrySet()) {
+ List<BucketStatistic> bucketStatistics =
allGroupBucketsMap.get(entry.getKey());
+ if (bucketStatistics != null) {
+ for (int tabletOrderIdx : entry.getValue()) {
+ if (tabletOrderIdx < bucketStatistics.size()) {
+ totalReplicaNum +=
bucketStatistics.get(tabletOrderIdx).totalReplicaNum;
+ }
+ }
+ }
+ }
+
+ return totalReplicaNum;
+ }
+
+ public long getTotalReplicaDataSize(Map<GroupId,
List<BucketStatistic>> allGroupBucketsMap) {
+ long totalReplicaDataSize = 0;
+ for (Map.Entry<GroupId, List<Integer>> entry :
groupTabletOrderIndices.entrySet()) {
+ List<BucketStatistic> bucketStatistics =
allGroupBucketsMap.get(entry.getKey());
+ if (bucketStatistics != null) {
+ for (int tabletOrderIdx : entry.getValue()) {
+ if (tabletOrderIdx < bucketStatistics.size()) {
+ totalReplicaDataSize +=
bucketStatistics.get(tabletOrderIdx).totalReplicaDataSize;
+ }
+ }
+ }
+ }
+
+ return totalReplicaDataSize;
+ }
+
+ public int getTotalBucketsNum() {
+ return groupTabletOrderIndices.values().stream().mapToInt(indices
-> indices.size()).sum();
+ }
+
+ public int getGroupBucketsNum(GroupId groupId) {
+ List<Integer> indices = groupTabletOrderIndices.get(groupId);
+ if (indices == null) {
+ return 0;
+ } else {
+ return indices.size();
+ }
+ }
+ }
+
+ public static class GlobalColocateStatistic {
+ private Map<Long, BackendBuckets> backendBucketsMap =
Maps.newHashMap();
+ private Map<GroupId, List<BucketStatistic>> allGroupBucketsMap =
Maps.newHashMap();
+ private Map<Tag, Integer> allTagBucketNum = Maps.newHashMap();
+ private static final BackendBuckets DUMMY_BE = new BackendBuckets(0);
+
+ public GlobalColocateStatistic() {
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof GlobalColocateStatistic)) {
+ return false;
+ }
+
+ GlobalColocateStatistic other = (GlobalColocateStatistic) obj;
+ return backendBucketsMap.equals(other.backendBucketsMap)
+ && allGroupBucketsMap.equals(other.allGroupBucketsMap)
+ && allTagBucketNum.equals(other.allTagBucketNum);
+ }
+
+ @Override
+ public String toString() {
+ return "{ backends: " + backendBucketsMap + ", groups: " +
allGroupBucketsMap
+ + ", tag bucket num: " + allTagBucketNum + " }";
+ }
+
+ Map<Long, BackendBuckets> getBackendBucketsMap() {
+ return backendBucketsMap;
+ }
+
+ Map<GroupId, List<BucketStatistic>> getAllGroupBucketsMap() {
+ return allGroupBucketsMap;
+ }
+
+ Map<Tag, Integer> getAllTagBucketNum() {
+ return allTagBucketNum;
+ }
+
+ public boolean moveTablet(GroupId groupId, int tabletOrderIdx,
+ long srcBeId, long destBeId) {
+ BackendBuckets srcBackendBuckets = backendBucketsMap.get(srcBeId);
+ if (srcBackendBuckets == null ||
!srcBackendBuckets.containsGroupTablet(groupId, tabletOrderIdx)) {
+ return false;
+ }
+
+ BackendBuckets destBackendBuckets =
backendBucketsMap.get(destBeId);
+ if (destBackendBuckets == null) {
+ destBackendBuckets = new BackendBuckets(destBeId);
+ backendBucketsMap.put(destBeId, destBackendBuckets);
+ }
+ if (destBackendBuckets.containsGroupTablet(groupId,
tabletOrderIdx)) {
+ return false;
+ }
+
+ srcBackendBuckets.removeGroupTablet(groupId, tabletOrderIdx);
+ destBackendBuckets.addGroupTablet(groupId, tabletOrderIdx);
+ if (srcBackendBuckets.getTotalBucketsNum() == 0) {
+ backendBucketsMap.remove(srcBeId);
+ }
+
+ return true;
+ }
+
+ public int getBackendTotalBucketNum(long backendId) {
+ return backendBucketsMap.getOrDefault(backendId,
DUMMY_BE).getTotalBucketsNum();
+ }
+
+ public long getBackendTotalReplicaDataSize(long backendId) {
+ return backendBucketsMap.getOrDefault(backendId, DUMMY_BE)
+ .getTotalReplicaDataSize(allGroupBucketsMap);
+ }
+
+ public long getBucketTotalReplicaDataSize(GroupId groupId, int
tabletOrderIdx) {
+ List<BucketStatistic> bucketStatistics =
allGroupBucketsMap.get(groupId);
+ if (bucketStatistics != null && tabletOrderIdx <
bucketStatistics.size()) {
+ return
bucketStatistics.get(tabletOrderIdx).totalReplicaDataSize;
+ } else {
+ return 0L;
+ }
+ }
+
+ public void addGroup(GroupId groupId, ReplicaAllocation replicaAlloc,
List<Set<Long>> backendBucketsSeq,
+ List<Long> totalReplicaDataSizes, int
totalReplicaNumPerBucket) {
+ Preconditions.checkState(backendBucketsSeq.size() ==
totalReplicaDataSizes.size(),
+ backendBucketsSeq.size() + " vs. " +
totalReplicaDataSizes.size());
+ List<BucketStatistic> bucketStatistics = Lists.newArrayList();
+ for (int tabletOrderIdx = 0; tabletOrderIdx <
backendBucketsSeq.size(); tabletOrderIdx++) {
+ BucketStatistic bucket = new BucketStatistic(tabletOrderIdx,
totalReplicaNumPerBucket,
+ totalReplicaDataSizes.get(tabletOrderIdx));
+ bucketStatistics.add(bucket);
+ for (long backendId : backendBucketsSeq.get(tabletOrderIdx)) {
+ BackendBuckets backendBuckets =
backendBucketsMap.get(backendId);
+ if (backendBuckets == null) {
+ backendBuckets = new BackendBuckets(backendId);
+ backendBucketsMap.put(backendId, backendBuckets);
+ }
+ backendBuckets.addGroupTablet(groupId, tabletOrderIdx);
+ }
+ }
+ int bucketNum = backendBucketsSeq.size();
+ replicaAlloc.getAllocMap().forEach((tag, count) -> {
+ allTagBucketNum.put(tag, allTagBucketNum.getOrDefault(tag, 0)
+ bucketNum * count);
+ });
+ allGroupBucketsMap.put(groupId, bucketStatistics);
+ }
+
+ }
+
@Override
/*
* Each round, we do 2 steps:
@@ -92,8 +335,8 @@ public class ColocateTableCheckerAndBalancer extends
MasterDaemon {
* Otherwise, mark the group as stable
*/
protected void runAfterCatalogReady() {
- relocateAndBalanceGroup();
- matchGroup();
+ relocateAndBalanceGroups();
+ matchGroups();
}
/*
@@ -134,17 +377,32 @@ public class ColocateTableCheckerAndBalancer extends
MasterDaemon {
* +-+ +-+ +-+ +-+
* A B C D
*/
- private void relocateAndBalanceGroup() {
+ private void relocateAndBalanceGroups() {
+ Set<GroupId> groupIds =
Sets.newHashSet(Env.getCurrentEnv().getColocateTableIndex().getAllGroupIds());
+
+ // balance only inside each group, excluded balance between all groups
+ Set<GroupId> changeGroups = relocateAndBalanceGroup(groupIds, false);
+
+ if (!Config.disable_colocate_balance_between_groups
+ && !changeGroups.isEmpty()) {
+ // balance both inside each group and between all groups
+ relocateAndBalanceGroup(changeGroups, true);
+ }
+ }
+
+ private Set<GroupId> relocateAndBalanceGroup(Set<GroupId> groupIds,
boolean balanceBetweenGroups) {
+ Set<GroupId> changeGroups = Sets.newHashSet();
if (Config.disable_colocate_balance) {
- return;
+ return changeGroups;
}
Env env = Env.getCurrentEnv();
ColocateTableIndex colocateIndex = env.getColocateTableIndex();
SystemInfoService infoService = Env.getCurrentSystemInfo();
+ GlobalColocateStatistic globalColocateStatistic =
buildGlobalColocateStatistic();
+
// get all groups
- Set<GroupId> groupIds = colocateIndex.getAllGroupIds();
for (GroupId groupId : groupIds) {
Map<Tag, LoadStatisticForTag> statisticMap =
env.getTabletScheduler().getStatisticMap();
if (statisticMap == null) {
@@ -182,8 +440,10 @@ public class ColocateTableCheckerAndBalancer extends
MasterDaemon {
// try relocate or balance this group for specified tag
List<List<Long>> balancedBackendsPerBucketSeq =
Lists.newArrayList();
if (relocateAndBalance(groupId, tag, unavailableBeIdsInGroup,
availableBeIds, colocateIndex,
- infoService, statistic, balancedBackendsPerBucketSeq))
{
+ infoService, statistic, globalColocateStatistic,
balancedBackendsPerBucketSeq,
+ balanceBetweenGroups)) {
colocateIndex.addBackendsPerBucketSeqByTag(groupId, tag,
balancedBackendsPerBucketSeq);
+ changeGroups.add(groupId);
Map<Tag, List<List<Long>>> balancedBackendsPerBucketSeqMap
= Maps.newHashMap();
balancedBackendsPerBucketSeqMap.put(tag,
balancedBackendsPerBucketSeq);
ColocatePersistInfo info = ColocatePersistInfo
@@ -194,6 +454,8 @@ public class ColocateTableCheckerAndBalancer extends
MasterDaemon {
}
}
}
+
+ return changeGroups;
}
/*
@@ -201,7 +463,7 @@ public class ColocateTableCheckerAndBalancer extends
MasterDaemon {
* replicas, and mark that group as unstable.
* If every replicas match the backends in group, mark that group as
stable.
*/
- private void matchGroup() {
+ private void matchGroups() {
long start = System.currentTimeMillis();
CheckerCounter counter = new CheckerCounter();
@@ -311,6 +573,83 @@ public class ColocateTableCheckerAndBalancer extends
MasterDaemon {
counter.tabletInScheduler, counter.tabletNotReady, cost);
}
+ private GlobalColocateStatistic buildGlobalColocateStatistic() {
+ Env env = Env.getCurrentEnv();
+ ColocateTableIndex colocateIndex = env.getColocateTableIndex();
+ GlobalColocateStatistic globalColocateStatistic = new
GlobalColocateStatistic();
+
+ Set<GroupId> groupIds = colocateIndex.getAllGroupIds();
+ for (GroupId groupId : groupIds) {
+ ColocateGroupSchema groupSchema =
colocateIndex.getGroupSchema(groupId);
+ if (groupSchema == null) {
+ continue;
+ }
+ ReplicaAllocation replicaAlloc = groupSchema.getReplicaAlloc();
+ List<Long> tableIds = colocateIndex.getAllTableIds(groupId);
+ List<Set<Long>> backendBucketsSeq =
colocateIndex.getBackendsPerBucketSeqSet(groupId);
+ if (backendBucketsSeq.isEmpty()) {
+ continue;
+ }
+
+ int totalReplicaNumPerBucket = 0;
+ ArrayList<Long> totalReplicaDataSizes = Lists.newArrayList();
+ for (int i = 0; i < backendBucketsSeq.size(); i++) {
+ totalReplicaDataSizes.add(0L);
+ }
+
+ for (Long tableId : tableIds) {
+ long dbId = groupId.dbId;
+ if (dbId == 0) {
+ dbId = groupId.getDbIdByTblId(tableId);
+ }
+ Database db = env.getInternalCatalog().getDbNullable(dbId);
+ if (db == null) {
+ continue;
+ }
+ OlapTable olapTable = (OlapTable) db.getTableNullable(tableId);
+ if (olapTable == null ||
!colocateIndex.isColocateTable(olapTable.getId())) {
+ continue;
+ }
+
+ olapTable.readLock();
+ try {
+ for (Partition partition : olapTable.getPartitions()) {
+ short replicationNum =
replicaAlloc.getTotalReplicaNum();
+
+ // Here we only get VISIBLE indexes. All other indexes
are not queryable.
+ // So it does not matter if tablets of other indexes
are not matched.
+
+ for (MaterializedIndex index :
partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
+ Preconditions.checkState(backendBucketsSeq.size()
== index.getTablets().size(),
+ backendBucketsSeq.size() + " vs. " +
index.getTablets().size());
+ int tabletOrderIdx = 0;
+ totalReplicaNumPerBucket++;
+ for (Long tabletId : index.getTabletIdsInOrder()) {
+ Set<Long> bucketsSeq =
backendBucketsSeq.get(tabletOrderIdx);
+ Preconditions.checkState(bucketsSeq.size() ==
replicationNum,
+ bucketsSeq.size() + " vs. " +
replicationNum);
+ Tablet tablet = index.getTablet(tabletId);
+ totalReplicaDataSizes.set(tabletOrderIdx,
+
totalReplicaDataSizes.get(tabletOrderIdx) + tablet.getDataSize(true));
+ tabletOrderIdx++;
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn("build group {} colocate statistic error",
groupId, e);
+ continue;
+ } finally {
+ olapTable.readUnlock();
+ }
+ }
+
+ globalColocateStatistic.addGroup(groupId, replicaAlloc,
backendBucketsSeq, totalReplicaDataSizes,
+ totalReplicaNumPerBucket);
+ }
+
+ return globalColocateStatistic;
+ }
+
/*
* Each balance is performed for a single workload group in a colocate
group.
* For example, if the replica allocation of a colocate group is {TagA: 2,
TagB: 1},
@@ -369,8 +708,9 @@ public class ColocateTableCheckerAndBalancer extends
MasterDaemon {
* Return false if nothing changed.
*/
private boolean relocateAndBalance(GroupId groupId, Tag tag, Set<Long>
unavailableBeIds, List<Long> availableBeIds,
- ColocateTableIndex colocateIndex, SystemInfoService infoService,
- LoadStatisticForTag statistic, List<List<Long>>
balancedBackendsPerBucketSeq) {
+ ColocateTableIndex colocateIndex, SystemInfoService infoService,
LoadStatisticForTag statistic,
+ GlobalColocateStatistic globalColocateStatistic, List<List<Long>>
balancedBackendsPerBucketSeq,
+ boolean balanceBetweenGroups) {
ColocateGroupSchema groupSchema =
colocateIndex.getGroupSchema(groupId);
short replicaNum =
groupSchema.getReplicaAlloc().getReplicaNumByTag(tag);
List<List<Long>> backendsPerBucketSeq = Lists.newArrayList(
@@ -379,7 +719,16 @@ public class ColocateTableCheckerAndBalancer extends
MasterDaemon {
List<Long> flatBackendsPerBucketSeq = backendsPerBucketSeq.stream()
.flatMap(List::stream).collect(Collectors.toList());
+ int tagTotalBucketNum =
globalColocateStatistic.getAllTagBucketNum().getOrDefault(tag, 0);
+ int availableBeNum = availableBeIds.size();
+ int highTotalBucketNumPerBe = availableBeNum == 0 ? 0 :
+ (tagTotalBucketNum + availableBeNum - 1) / availableBeNum;
+ int lowTotalBucketNumPerBe = availableBeNum == 0 ? 0 :
tagTotalBucketNum / availableBeNum;
+
boolean isChanged = false;
+ int times = 0;
+ List<RootPathLoadStatistic> resultPaths = Lists.newArrayList();
+
OUT:
while (true) {
// update backends and hosts at each round
@@ -390,27 +739,34 @@ public class ColocateTableCheckerAndBalancer extends
MasterDaemon {
return false;
}
Preconditions.checkState(backendsPerBucketSeq.size() ==
hostsPerBucketSeq.size());
+ times++;
+ if (times > 10 * backendsPerBucketSeq.size()) {
+ LOG.warn("iterate too many times for relocate group: {},
times: {}, bucket num: {}",
+ groupId, times, backendsPerBucketSeq.size());
+ break;
+ }
long srcBeId = -1;
List<Integer> seqIndexes = null;
- boolean hasUnavailableBe = false;
+ boolean srcBeUnavailable = false;
// first choose the unavailable be as src be
for (Long beId : unavailableBeIds) {
seqIndexes = getBeSeqIndexes(flatBackendsPerBucketSeq, beId);
if (!seqIndexes.isEmpty()) {
srcBeId = beId;
- hasUnavailableBe = true;
+ srcBeUnavailable = true;
LOG.info("find unavailable backend {} in colocate group:
{}", beId, groupId);
break;
}
}
+
// sort backends with replica num in desc order
List<Map.Entry<Long, Long>> backendWithReplicaNum =
- getSortedBackendReplicaNumPairs(availableBeIds,
- unavailableBeIds, statistic,
flatBackendsPerBucketSeq);
+ getSortedBackendReplicaNumPairs(availableBeIds,
unavailableBeIds, statistic,
+ globalColocateStatistic, flatBackendsPerBucketSeq);
// if there is only one available backend and no unavailable
bucketId to relocate, end the outer loop
- if (backendWithReplicaNum.size() <= 1 && !hasUnavailableBe) {
+ if (backendWithReplicaNum.size() <= 1 && !srcBeUnavailable) {
break;
}
@@ -426,12 +782,40 @@ public class ColocateTableCheckerAndBalancer extends
MasterDaemon {
// we try to use a low backend to replace the src backend.
// if replace failed(eg: both backends are on some host),
select next low backend and try(j--)
Map.Entry<Long, Long> lowBackend =
backendWithReplicaNum.get(j);
- if ((!hasUnavailableBe) && (seqIndexes.size() -
lowBackend.getValue()) <= 1) {
- // balanced
- break OUT;
+ long destBeId = lowBackend.getKey();
+ if (!srcBeUnavailable) {
+ long diffThisGroup = seqIndexes.size() -
lowBackend.getValue();
+ if (diffThisGroup < 1) {
+ // balanced
+ break OUT;
+ }
+
+ // src's group bucket num = dest's group bucket num + 1
+ // if move group bucket from src to dest, dest will be one
more group num than src.
+ // check global view
+ //
+ // suppose bucket num = 3, three BE A/B/C, two group
group1/group2, then we have:
+ //
+ // A [ group1:bucket0, group2:bucket0]
+ // B [ group1:bucket1, group2:bucket1]
+ // C [ group1:bucket2, group2:bucket2]
+ //
+ // if we add a new BE D, for each group:
bucketNum(A)=bucketNum(B)=bucketNum(C)=1, bucketNum(D)=0
+ // so each group is balance, but in global groups view,
it's not balance.
+ // we should move one of the buckets to D
+ if (diffThisGroup == 1) {
+ if (!balanceBetweenGroups) {
+ break OUT;
+ }
+ int srcTotalBucketNum =
globalColocateStatistic.getBackendTotalBucketNum(srcBeId);
+ int destTotalBucketNum =
globalColocateStatistic.getBackendTotalBucketNum(destBeId);
+ if (srcTotalBucketNum <= highTotalBucketNumPerBe
+ || destTotalBucketNum >=
lowTotalBucketNumPerBe) {
+ continue;
+ }
+ }
}
- long destBeId = lowBackend.getKey();
Backend destBe = infoService.getBackend(destBeId);
if (destBe == null) {
LOG.info("backend {} does not exist", destBeId);
@@ -454,6 +838,14 @@ public class ColocateTableCheckerAndBalancer extends
MasterDaemon {
continue;
}
+ BackendLoadStatistic beStat =
statistic.getBackendLoadStatistic(destBeId);
+ if (beStat == null) {
+ LOG.warn("not found backend {} statistic", destBeId);
+ continue;
+ }
+
+ int targetSeqIndex = -1;
+ long minDataSizeDiff = Long.MAX_VALUE;
for (int seqIndex : seqIndexes) {
// the bucket index.
// eg: 0 / 3 = 0, so that the bucket index of the 4th
backend id in flatBackendsPerBucketSeq is 0.
@@ -461,26 +853,62 @@ public class ColocateTableCheckerAndBalancer extends
MasterDaemon {
List<Long> backendsSet =
backendsPerBucketSeq.get(bucketIndex);
List<String> hostsSet = hostsPerBucketSeq.get(bucketIndex);
// the replicas of a tablet can not locate in same Backend
or same host
- if (!backendsSet.contains(destBeId) &&
!hostsSet.contains(destBe.getHost())) {
-
Preconditions.checkState(backendsSet.contains(srcBeId), srcBeId);
- flatBackendsPerBucketSeq.set(seqIndex, destBeId);
- LOG.info("replace backend {} with backend {} in
colocate group {}, idx: {}",
- srcBeId, destBeId, groupId, seqIndex);
- // just replace one backend at a time, src and dest BE
id should be recalculated because
- // flatBackendsPerBucketSeq is changed.
- isChanged = true;
- isThisRoundChanged = true;
- break;
+ if (backendsSet.contains(destBeId) ||
hostsSet.contains(destBe.getHost())) {
+ continue;
+ }
+
+ Preconditions.checkState(backendsSet.contains(srcBeId),
srcBeId);
+ long bucketDataSize =
+
globalColocateStatistic.getBucketTotalReplicaDataSize(groupId, bucketIndex);
+
+ resultPaths.clear();
+ BalanceStatus st = beStat.isFit(bucketDataSize, null,
resultPaths, true);
+ if (!st.ok()) {
+ LOG.debug("backend {} is unable to fit in group {},
tablet order idx {}, data size {}",
+ destBeId, groupId, bucketIndex,
bucketDataSize);
+ continue;
+ }
+
+ long newSrcBeTotalReplicaDataSize =
globalColocateStatistic.getBackendTotalReplicaDataSize(srcBeId)
+ - bucketDataSize;
+ long newDestBeTotalReplicaDataSize =
+
globalColocateStatistic.getBackendTotalReplicaDataSize(destBeId) +
bucketDataSize;
+ long dataSizeDiff = Math.abs(newSrcBeTotalReplicaDataSize
- newDestBeTotalReplicaDataSize);
+ if (targetSeqIndex < 0 || dataSizeDiff < minDataSizeDiff) {
+ targetSeqIndex = seqIndex;
+ minDataSizeDiff = dataSizeDiff;
}
}
- if (isThisRoundChanged) {
- // we found a change
- break;
+ if (targetSeqIndex < 0) {
+ // we use next node as dst node
+ LOG.info("unable to replace backend {} with backend {} in
colocate group {}",
+ srcBeId, destBeId, groupId);
+ continue;
}
- // we use next node as dst node
- LOG.info("unable to replace backend {} with backend {} in
colocate group {}",
- srcBeId, destBeId, groupId);
+
+ int tabletOrderIdx = targetSeqIndex / replicaNum;
+ int oldSrcThisGroup = seqIndexes.size();
+ long oldDestThisGroup = lowBackend.getValue();
+ int oldSrcBucketNum =
globalColocateStatistic.getBackendTotalBucketNum(srcBeId);
+ int oldDestBucketNum =
globalColocateStatistic.getBackendTotalBucketNum(destBeId);
+ LOG.debug("OneMove: group {}, src {}, this group {}, all group
{}, dest {}, this group {}, "
+ + "all group {}", groupId, srcBeId, oldSrcThisGroup,
oldSrcBucketNum, destBeId,
+ oldDestThisGroup, oldDestBucketNum);
+ Preconditions.checkState(
+ globalColocateStatistic.moveTablet(groupId,
tabletOrderIdx, srcBeId, destBeId));
+ Preconditions.checkState(oldSrcBucketNum - 1
+ ==
globalColocateStatistic.getBackendTotalBucketNum(srcBeId));
+ Preconditions.checkState(oldDestBucketNum + 1
+ ==
globalColocateStatistic.getBackendTotalBucketNum(destBeId));
+ flatBackendsPerBucketSeq.set(targetSeqIndex, destBeId);
+ // just replace one backend at a time, src and dest BE id
should be recalculated because
+ // flatBackendsPerBucketSeq is changed.
+ isChanged = true;
+ isThisRoundChanged = true;
+ LOG.info("replace backend {} with backend {} in colocate group
{}, idx: {}",
+ srcBeId, destBeId, groupId, targetSeqIndex);
+ break;
}
if (!isThisRoundChanged) {
@@ -522,7 +950,9 @@ public class ColocateTableCheckerAndBalancer extends
MasterDaemon {
}
private List<Map.Entry<Long, Long>>
getSortedBackendReplicaNumPairs(List<Long> allAvailBackendIds,
- Set<Long> unavailBackendIds, LoadStatisticForTag statistic,
List<Long> flatBackendsPerBucketSeq) {
+ Set<Long> unavailBackendIds, LoadStatisticForTag statistic,
+ GlobalColocateStatistic globalColocateStatistic,
+ List<Long> flatBackendsPerBucketSeq) {
// backend id -> replica num, and sorted by replica num, descending.
Map<Long, Long> backendToReplicaNum = flatBackendsPerBucketSeq.stream()
.collect(Collectors.groupingBy(Function.identity(),
Collectors.counting()));
@@ -544,20 +974,27 @@ public class ColocateTableCheckerAndBalancer extends
MasterDaemon {
if (!entry1.getValue().equals(entry2.getValue())) {
return (int) (entry2.getValue() - entry1.getValue());
}
+
+ // From java 7, sorting needs to satisfy reflexivity,
transitivity and symmetry.
+ // Otherwise it will raise exception "Comparison method
violates its general contract".
+
BackendLoadStatistic beStat1 =
statistic.getBackendLoadStatistic(entry1.getKey());
BackendLoadStatistic beStat2 =
statistic.getBackendLoadStatistic(entry2.getKey());
if (beStat1 == null || beStat2 == null) {
- return 0;
+ if (beStat1 == null && beStat2 == null) {
+ return 0;
+ } else {
+ return beStat1 == null ? 1 : -1;
+ }
}
double loadScore1 = beStat1.getMixLoadScore();
double loadScore2 = beStat2.getMixLoadScore();
- if (Math.abs(loadScore1 - loadScore2) < 1e-6) {
- return 0;
- } else if (loadScore2 > loadScore1) {
- return 1;
- } else {
- return -1;
+ int cmp = Double.compare(loadScore2, loadScore1);
+ if (cmp != 0) {
+ return cmp;
}
+
+ return Long.compare(entry1.getKey(), entry2.getKey());
})
.collect(Collectors.toList());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
index 90d136715ce..f2542243a75 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -1142,7 +1142,7 @@ public class TabletScheduler extends MasterDaemon {
// it will also delete replica from tablet inverted index.
tabletCtx.deleteReplica(replica);
- if (force) {
+ if (force || FeConstants.runningUnitTest) {
// send the delete replica task.
// also, this may not be necessary, but delete it will make things
simpler.
// NOTICE: only delete the replica from meta may not work.
sometimes we can depend on tablet report
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java
b/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java
index 98b59129362..a05c63b812f 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java
@@ -330,4 +330,48 @@ public class CatalogTestUtil {
backend.setAlive(true);
return backend;
}
+
+ public static long getTabletDataSize(long tabletId) {
+ Env env = Env.getCurrentEnv();
+ TabletInvertedIndex invertedIndex = env.getTabletInvertedIndex();
+ TabletMeta tabletMeta = invertedIndex.getTabletMeta(tabletId);
+ if (tabletMeta == null) {
+ return -1L;
+ }
+
+ long dbId = tabletMeta.getDbId();
+ long tableId = tabletMeta.getTableId();
+ long partitionId = tabletMeta.getPartitionId();
+ long indexId = tabletMeta.getIndexId();
+ Database db = env.getInternalCatalog().getDbNullable(dbId);
+ if (db == null) {
+ return -1L;
+ }
+ Table table = db.getTableNullable(tableId);
+ if (table == null) {
+ return -1L;
+ }
+ if (table.getType() != Table.TableType.OLAP) {
+ return -1L;
+ }
+ OlapTable olapTable = (OlapTable) table;
+ olapTable.readLock();
+ try {
+ Partition partition = olapTable.getPartition(partitionId);
+ if (partition == null) {
+ return -1L;
+ }
+ MaterializedIndex materializedIndex = partition.getIndex(indexId);
+ if (materializedIndex == null) {
+ return -1L;
+ }
+ Tablet tablet = materializedIndex.getTablet(tabletId);
+ if (tablet == null) {
+ return -1L;
+ }
+ return tablet.getDataSize(true);
+ } finally {
+ olapTable.readUnlock();
+ }
+ }
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/clone/BalanceStatistic.java
b/fe/fe-core/src/test/java/org/apache/doris/clone/BalanceStatistic.java
new file mode 100644
index 00000000000..596a06f6681
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/BalanceStatistic.java
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.clone;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.system.Backend;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Table;
+
+import java.util.List;
+import java.util.Map;
+
+public class BalanceStatistic {
+ public Map<Long, Long> backendTotalDataSize;
+ public Map<Long, Integer> backendTotalReplicaNum;
+
+ private BalanceStatistic(Map<Long, Long> backendTotalDataSize,
+ Map<Long, Integer> backendTotalReplicaNum) {
+ this.backendTotalDataSize = backendTotalDataSize;
+ this.backendTotalReplicaNum = backendTotalReplicaNum;
+ }
+
+ public static BalanceStatistic getCurrentBalanceStatistic() {
+ Map<Long, Long> backendTotalDataSize = Maps.newHashMap();
+ Map<Long, Integer> backendTotalReplicaNum = Maps.newHashMap();
+ List<Backend> backends =
Env.getCurrentSystemInfo().getIdToBackend().values().asList();
+ backends.forEach(be -> {
+ backendTotalDataSize.put(be.getId(), 0L);
+ backendTotalReplicaNum.put(be.getId(), 0);
+ });
+
+ Table<Long, Long, Replica> replicaMetaTable =
+ Env.getCurrentInvertedIndex().getReplicaMetaTable();
+ for (Table.Cell<Long, Long, Replica> cell :
replicaMetaTable.cellSet()) {
+ long beId = cell.getColumnKey();
+ Replica replica = cell.getValue();
+ backendTotalDataSize.put(beId, backendTotalDataSize.get(beId) +
replica.getDataSize());
+ backendTotalReplicaNum.put(beId, backendTotalReplicaNum.get(beId)
+ 1);
+ }
+
+ return new BalanceStatistic(backendTotalDataSize,
backendTotalReplicaNum);
+ }
+
+ public Map<Long, Long> getBackendTotalDataSize() {
+ return backendTotalDataSize;
+ }
+
+ public Map<Long, Integer> getBackendTotalReplicaNum() {
+ return backendTotalReplicaNum;
+ }
+
+ public long getBeMinTotalDataSize() {
+ return backendTotalDataSize.values().stream().min(Long::compare).get();
+ }
+
+ public long getBeMaxTotalDataSize() {
+ return backendTotalDataSize.values().stream().max(Long::compare).get();
+ }
+
+ public int getBeMinTotalReplicaNum() {
+ return
backendTotalReplicaNum.values().stream().min(Integer::compare).get();
+ }
+
+ public int getBeMaxTotalReplicaNum() {
+ return
backendTotalReplicaNum.values().stream().max(Integer::compare).get();
+ }
+
+ public void printToStdout() {
+ int minTotalReplicaNum = getBeMinTotalReplicaNum();
+ int maxTotalReplicaNum = getBeMaxTotalReplicaNum();
+ long minTotalDataSize = getBeMinTotalDataSize();
+ long maxTotalDataSize = getBeMaxTotalDataSize();
+
+ System.out.println("");
+ System.out.println("=== backend min total replica num: " +
minTotalReplicaNum);
+ System.out.println("=== backend max total replica num: " +
maxTotalReplicaNum);
+ System.out.println("=== max / min : " + (maxTotalReplicaNum / (double)
minTotalReplicaNum));
+
+ System.out.println("");
+ System.out.println("=== min total data size: " + minTotalDataSize);
+ System.out.println("=== max total data size: " + maxTotalDataSize);
+ System.out.println("=== max / min : " + (maxTotalDataSize / (double)
minTotalDataSize));
+ }
+}
+
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableCheckerAndBalancerPerfTest.java
b/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableCheckerAndBalancerPerfTest.java
new file mode 100644
index 00000000000..56ac9b192b8
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableCheckerAndBalancerPerfTest.java
@@ -0,0 +1,196 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.clone;
+
+import org.apache.doris.analysis.CreateDbStmt;
+import org.apache.doris.analysis.CreateTableStmt;
+import org.apache.doris.catalog.ColocateTableIndex;
+import org.apache.doris.catalog.ColocateTableIndex.GroupId;
+import org.apache.doris.catalog.DiskInfo;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.DdlExecutor;
+import org.apache.doris.resource.Tag;
+import org.apache.doris.system.Backend;
+import org.apache.doris.utframe.UtFrameUtils;
+
+import com.google.common.collect.Maps;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+
+public class ColocateTableCheckerAndBalancerPerfTest {
+ private static String runningDir =
"fe/mocked/ColocateTableCheckerAndBalancerPerfTest/"
+ + UUID.randomUUID().toString() + "/";
+
+ private static ConnectContext connectContext;
+ private static final int TEMP_DISALBE_BE_NUM = 2;
+ private static List<Backend> backends;
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ FeConstants.runningUnitTest = true;
+ FeConstants.enableInternalSchemaDb = false;
+ FeConstants.tablet_checker_interval_ms = 100;
+ FeConstants.tablet_schedule_interval_ms = 100;
+ Config.enable_round_robin_create_tablet = false;
+ Config.disable_balance = true;
+ Config.schedule_batch_size = 400;
+ Config.schedule_slot_num_per_hdd_path = 1000;
+ Config.disable_colocate_balance = true;
+ Config.disable_tablet_scheduler = true;
+ UtFrameUtils.createDorisClusterWithMultiTag(runningDir, 6);
+
+ backends =
Env.getCurrentSystemInfo().getIdToBackend().values().asList();
+ for (Backend be : backends) {
+ for (DiskInfo diskInfo : be.getDisks().values()) {
+ diskInfo.setTotalCapacityB(10L << 40);
+ diskInfo.setDataUsedCapacityB(1L);
+ diskInfo.setAvailableCapacityB(
+ diskInfo.getTotalCapacityB() -
diskInfo.getDataUsedCapacityB());
+ }
+ }
+ Map<String, String> tagMap = Maps.newHashMap();
+ tagMap.put(Tag.TYPE_LOCATION, "zone_a");
+ for (int i = 0; i < TEMP_DISALBE_BE_NUM; i++) {
+ backends.get(i).setTagMap(tagMap);
+ }
+
+ // create connect context
+ connectContext = UtFrameUtils.createDefaultCtx();
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ UtFrameUtils.cleanDorisFeDir(runningDir);
+ }
+
+ @Test
+ public void testRelocateAndBalance() throws Exception {
+
+ Env env = Env.getCurrentEnv();
+ String createDbStmtStr = "create database test;";
+ CreateDbStmt createDbStmt = (CreateDbStmt)
UtFrameUtils.parseAndAnalyzeStmt(createDbStmtStr, connectContext);
+ DdlExecutor.execute(env, createDbStmt);
+
+ Random random = new Random();
+ final int groupNum = 100;
+ for (int groupIndex = 0; groupIndex <= groupNum; groupIndex++) {
+ int tableNum = 1 + random.nextInt(10);
+ for (int tableIndex = 0; tableIndex < tableNum; tableIndex++) {
+ String sql = String.format("CREATE TABLE test.table_%s_%s\n"
+ + "( k1 int, k2 int, v1 int )\n"
+ + "ENGINE=OLAP\n"
+ + "UNIQUE KEY (k1,k2)\n"
+ + "DISTRIBUTED BY HASH(k2) BUCKETS 11\n"
+ + "PROPERTIES('colocate_with' = 'group_%s');",
+ groupIndex, tableIndex, groupIndex);
+ CreateTableStmt createTableStmt = (CreateTableStmt)
UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext);
+ try {
+ DdlExecutor.execute(env, createTableStmt);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+
+ BalanceStatistic beforeBalanceStatistic =
BalanceStatistic.getCurrentBalanceStatistic();
+ Assert.assertEquals("group: " + groupIndex + ", table: " +
tableIndex + ", "
+ + beforeBalanceStatistic.getBackendTotalReplicaNum(),
+ 0, beforeBalanceStatistic.getBeMinTotalReplicaNum());
+ }
+ }
+
+ ColocateTableIndex colocateIndex = env.getColocateTableIndex();
+ Set<GroupId> groupIds = colocateIndex.getAllGroupIds();
+
+ RebalancerTestUtil.updateReplicaDataSize(100L << 10, 10, 10);
+ RebalancerTestUtil.updateReplicaPathHash();
+
+ BalanceStatistic beforeBalanceStatistic =
BalanceStatistic.getCurrentBalanceStatistic();
+ Assert.assertEquals("" +
beforeBalanceStatistic.getBackendTotalReplicaNum(),
+ 0, beforeBalanceStatistic.getBeMinTotalReplicaNum());
+
+ // all groups stable
+ Thread.sleep(1000);
+ Assert.assertTrue("some groups are unstable",
+ groupIds.stream().noneMatch(groupId ->
colocateIndex.isGroupUnstable(groupId)));
+
+ // after enable colocate balance and some backends return, it should
relocate all groups.
+ // and they will be unstable
+ Map<String, String> tagMap =
backends.get(TEMP_DISALBE_BE_NUM).getTagMap();
+ for (int i = 0; i < TEMP_DISALBE_BE_NUM; i++) {
+ backends.get(i).setTagMap(tagMap);
+ }
+ Config.disable_colocate_balance = false;
+ Thread.sleep(1000);
+ Assert.assertTrue("some groups are stable",
+ groupIds.stream().allMatch(groupId ->
colocateIndex.isGroupUnstable(groupId)));
+
+
+ // after enable scheduler, the unstable groups should shed their
tablets and change to stable
+ Config.disable_tablet_scheduler = false;
+ for (int i = 0; true; i++) {
+ Thread.sleep(1000);
+
+ boolean allStable = groupIds.stream().noneMatch(
+ groupId -> colocateIndex.isGroupUnstable(groupId));
+
+ if (allStable) {
+ break;
+ }
+
+ Assert.assertTrue("some groups are unstable", i < 60);
+ }
+
+ System.out.println("=== before colocate relocate and balance:");
+ beforeBalanceStatistic.printToStdout();
+ Assert.assertEquals("" +
beforeBalanceStatistic.getBackendTotalReplicaNum(),
+ 0, beforeBalanceStatistic.getBeMinTotalReplicaNum());
+ Assert.assertEquals("" +
beforeBalanceStatistic.getBackendTotalDataSize(),
+ 0, beforeBalanceStatistic.getBeMinTotalDataSize());
+ long beforeDataSizeDiff =
beforeBalanceStatistic.getBeMaxTotalDataSize()
+ - beforeBalanceStatistic.getBeMinTotalDataSize();
+ int beforeReplicaNumDiff =
beforeBalanceStatistic.getBeMaxTotalReplicaNum()
+ - beforeBalanceStatistic.getBeMinTotalReplicaNum();
+
+ BalanceStatistic afterBalanceStatistic =
BalanceStatistic.getCurrentBalanceStatistic();
+ System.out.println("");
+ System.out.println("=== after colocate relocate and balance:");
+ afterBalanceStatistic.printToStdout();
+
+ Assert.assertTrue("" +
afterBalanceStatistic.getBackendTotalReplicaNum(),
+ afterBalanceStatistic.getBeMinTotalReplicaNum() > 0);
+ Assert.assertTrue("" + afterBalanceStatistic.getBackendTotalDataSize(),
+ afterBalanceStatistic.getBeMinTotalDataSize() > 0);
+ long afterDataSizeDiff = afterBalanceStatistic.getBeMaxTotalDataSize()
+ - afterBalanceStatistic.getBeMinTotalDataSize();
+ int afterReplicaNumDiff =
afterBalanceStatistic.getBeMaxTotalReplicaNum()
+ - afterBalanceStatistic.getBeMinTotalReplicaNum();
+ Assert.assertTrue(afterDataSizeDiff <= beforeDataSizeDiff);
+ Assert.assertTrue(afterReplicaNumDiff <= beforeReplicaNumDiff);
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableCheckerAndBalancerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableCheckerAndBalancerTest.java
index b159dee523d..ee33862c955 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableCheckerAndBalancerTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableCheckerAndBalancerTest.java
@@ -24,12 +24,16 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.catalog.TabletInvertedIndex;
+import org.apache.doris.clone.ColocateTableCheckerAndBalancer.BackendBuckets;
+import org.apache.doris.clone.ColocateTableCheckerAndBalancer.BucketStatistic;
+import
org.apache.doris.clone.ColocateTableCheckerAndBalancer.GlobalColocateStatistic;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.resource.Tag;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.thrift.TStorageMedium;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -106,6 +110,20 @@ public class ColocateTableCheckerAndBalancerTest {
return colocateTableIndex;
}
+ private GlobalColocateStatistic
createGlobalColocateStatistic(ColocateTableIndex colocateTableIndex,
+ GroupId groupId) {
+ GlobalColocateStatistic globalColocateStatistic = new
GlobalColocateStatistic();
+ List<Set<Long>> backendsPerBucketSeq =
colocateTableIndex.getBackendsPerBucketSeqSet(groupId);
+ List<Long> totalReplicaDataSizes = Lists.newArrayList();
+ for (int i = 0; i < backendsPerBucketSeq.size(); i++) {
+ totalReplicaDataSizes.add(1L);
+ }
+ ReplicaAllocation replicaAlloc = new ReplicaAllocation((short)
backendsPerBucketSeq.get(0).size());
+ globalColocateStatistic.addGroup(groupId, replicaAlloc,
backendsPerBucketSeq, totalReplicaDataSizes, 3);
+
+ return globalColocateStatistic;
+ }
+
@Test
public void testBalance(@Mocked SystemInfoService infoService,
@Mocked LoadStatisticForTag statistic) {
@@ -140,7 +158,11 @@ public class ColocateTableCheckerAndBalancerTest {
minTimes = 0;
statistic.getBackendLoadStatistic(anyLong);
- result = null;
+ result = new Delegate<BackendLoadStatistic>() {
+ BackendLoadStatistic delegate(Long beId) {
+ return new FakeBackendLoadStatistic(beId, null, null);
+ }
+ };
minTimes = 0;
}
};
@@ -158,24 +180,28 @@ public class ColocateTableCheckerAndBalancerTest {
Lists.newArrayList(1L, 2L, 3L, 4L, 1L, 2L, 3L, 4L, 1L, 2L, 3L,
4L, 1L, 2L, 3L));
Deencapsulation.setField(colocateTableIndex, "group2Schema",
group2Schema);
+ GlobalColocateStatistic globalColocateStatistic =
createGlobalColocateStatistic(colocateTableIndex, groupId);
List<List<Long>> balancedBackendsPerBucketSeq = Lists.newArrayList();
List<Long> allAvailBackendIds = Lists.newArrayList(1L, 2L, 3L, 4L, 5L,
6L, 7L, 8L, 9L);
boolean changed = (Boolean) Deencapsulation.invoke(balancer,
"relocateAndBalance", groupId,
Tag.DEFAULT_BACKEND_TAG, new HashSet<Long>(),
allAvailBackendIds,
- colocateTableIndex, infoService, statistic,
balancedBackendsPerBucketSeq);
+ colocateTableIndex, infoService, statistic,
globalColocateStatistic,
+ balancedBackendsPerBucketSeq, false);
List<List<Long>> expected = Lists.partition(
- Lists.newArrayList(9L, 5L, 3L, 4L, 6L, 8L, 7L, 6L, 1L, 2L, 9L,
4L, 1L, 2L, 3L), 3);
- Assert.assertTrue(changed);
+ Lists.newArrayList(8L, 5L, 6L, 5L, 6L, 7L, 9L, 4L, 1L, 2L, 3L,
4L, 1L, 2L, 3L), 3);
+ Assert.assertTrue("" + globalColocateStatistic, changed);
Assert.assertEquals(expected, balancedBackendsPerBucketSeq);
// 2. balance a already balanced group
colocateTableIndex = createColocateIndex(groupId,
Lists.newArrayList(9L, 8L, 7L, 8L, 6L, 5L, 9L, 4L, 1L, 2L, 3L,
4L, 1L, 2L, 3L));
+ globalColocateStatistic =
createGlobalColocateStatistic(colocateTableIndex, groupId);
Deencapsulation.setField(colocateTableIndex, "group2Schema",
group2Schema);
balancedBackendsPerBucketSeq.clear();
changed = (Boolean) Deencapsulation.invoke(balancer,
"relocateAndBalance", groupId,
Tag.DEFAULT_BACKEND_TAG, new HashSet<Long>(),
allAvailBackendIds,
- colocateTableIndex, infoService, statistic,
balancedBackendsPerBucketSeq);
+ colocateTableIndex, infoService, statistic,
globalColocateStatistic,
+ balancedBackendsPerBucketSeq, false);
System.out.println(balancedBackendsPerBucketSeq);
Assert.assertFalse(changed);
Assert.assertTrue(balancedBackendsPerBucketSeq.isEmpty());
@@ -214,7 +240,11 @@ public class ColocateTableCheckerAndBalancerTest {
result = backend9;
minTimes = 0;
statistic.getBackendLoadStatistic(anyLong);
- result = null;
+ result = new Delegate<BackendLoadStatistic>() {
+ BackendLoadStatistic delegate(Long beId) {
+ return new FakeBackendLoadStatistic(beId, null, null);
+ }
+ };
minTimes = 0;
}
};
@@ -229,24 +259,28 @@ public class ColocateTableCheckerAndBalancerTest {
// 1. only one available backend
// [[7], [7], [7], [7], [7]]
ColocateTableIndex colocateTableIndex = createColocateIndex(groupId,
Lists.newArrayList(7L, 7L, 7L, 7L, 7L));
+ GlobalColocateStatistic globalColocateStatistic =
createGlobalColocateStatistic(colocateTableIndex, groupId);
Deencapsulation.setField(colocateTableIndex, "group2Schema",
group2Schema);
List<List<Long>> balancedBackendsPerBucketSeq = Lists.newArrayList();
List<Long> allAvailBackendIds = Lists.newArrayList(7L);
boolean changed = Deencapsulation.invoke(balancer,
"relocateAndBalance", groupId, Tag.DEFAULT_BACKEND_TAG,
- new HashSet<Long>(), allAvailBackendIds, colocateTableIndex,
infoService, statistic, balancedBackendsPerBucketSeq);
+ new HashSet<Long>(), allAvailBackendIds, colocateTableIndex,
infoService, statistic, globalColocateStatistic,
+ balancedBackendsPerBucketSeq, false);
Assert.assertFalse(changed);
// 2. all backends are checked but this round is not changed
// [[7], [7], [7], [7], [7]]
// and add new backends 8, 9 that are on the same host with 7
colocateTableIndex = createColocateIndex(groupId,
Lists.newArrayList(7L, 7L, 7L, 7L, 7L));
+ globalColocateStatistic =
createGlobalColocateStatistic(colocateTableIndex, groupId);
Deencapsulation.setField(colocateTableIndex, "group2Schema",
group2Schema);
balancedBackendsPerBucketSeq = Lists.newArrayList();
allAvailBackendIds = Lists.newArrayList(7L, 8L, 9L);
changed = Deencapsulation.invoke(balancer, "relocateAndBalance",
groupId, Tag.DEFAULT_BACKEND_TAG,
- new HashSet<Long>(), allAvailBackendIds, colocateTableIndex,
infoService, statistic, balancedBackendsPerBucketSeq);
+ new HashSet<Long>(), allAvailBackendIds, colocateTableIndex,
infoService, statistic, globalColocateStatistic,
+ balancedBackendsPerBucketSeq, false);
Assert.assertFalse(changed);
}
@@ -271,13 +305,15 @@ public class ColocateTableCheckerAndBalancerTest {
group2Schema.put(groupId, groupSchema);
ColocateTableIndex colocateTableIndex = createColocateIndex(groupId,
Lists.newArrayList(7L, 7L, 7L, 7L, 7L));
+ GlobalColocateStatistic globalColocateStatistic =
createGlobalColocateStatistic(colocateTableIndex, groupId);
Deencapsulation.setField(colocateTableIndex, "group2Schema",
group2Schema);
List<List<Long>> balancedBackendsPerBucketSeq = Lists.newArrayList();
Set<Long> unAvailBackendIds = Sets.newHashSet(1L, 2L, 3L, 4L, 5L, 6L,
7L, 8L, 9L);
List<Long> availBackendIds = Lists.newArrayList();
boolean changed = (Boolean) Deencapsulation.invoke(balancer,
"relocateAndBalance", groupId, Tag.DEFAULT_BACKEND_TAG,
- unAvailBackendIds, availBackendIds, colocateTableIndex,
infoService, statistic, balancedBackendsPerBucketSeq);
+ unAvailBackendIds, availBackendIds, colocateTableIndex,
infoService, statistic, globalColocateStatistic,
+ balancedBackendsPerBucketSeq, false);
Assert.assertFalse(changed);
}
@@ -295,19 +331,20 @@ public class ColocateTableCheckerAndBalancerTest {
}
};
+ GlobalColocateStatistic globalColocateStatistic = new
GlobalColocateStatistic();
// all buckets are on different be
List<Long> allAvailBackendIds = Lists.newArrayList(1L, 2L, 3L, 4L, 5L,
6L, 7L, 8L);
Set<Long> unavailBackendIds = Sets.newHashSet(9L);
List<Long> flatBackendsPerBucketSeq = Lists.newArrayList(1L, 2L, 3L,
4L, 5L, 6L, 7L, 8L, 9L);
List<Map.Entry<Long, Long>> backends =
Deencapsulation.invoke(balancer, "getSortedBackendReplicaNumPairs",
- allAvailBackendIds, unavailBackendIds, statistic,
flatBackendsPerBucketSeq);
+ allAvailBackendIds, unavailBackendIds, statistic,
globalColocateStatistic, flatBackendsPerBucketSeq);
long[] backendIds =
backends.stream().mapToLong(Map.Entry::getKey).toArray();
Assert.assertArrayEquals(new long[]{7L, 8L, 6L, 2L, 3L, 5L, 4L, 1L},
backendIds);
// 0,1 bucket on same be and 5, 6 on same be
flatBackendsPerBucketSeq = Lists.newArrayList(1L, 1L, 3L, 4L, 5L, 6L,
7L, 7L, 9L);
backends = Deencapsulation.invoke(balancer,
"getSortedBackendReplicaNumPairs", allAvailBackendIds, unavailBackendIds,
- statistic, flatBackendsPerBucketSeq);
+ statistic, globalColocateStatistic, flatBackendsPerBucketSeq);
backendIds = backends.stream().mapToLong(Map.Entry::getKey).toArray();
Assert.assertArrayEquals(new long[]{7L, 1L, 6L, 3L, 5L, 4L, 8L, 2L},
backendIds);
}
@@ -322,6 +359,12 @@ public class ColocateTableCheckerAndBalancerTest {
public double getMixLoadScore() {
return mixLoadScores.get(getBeId());
}
+
+ @Override
+ public BalanceStatus isFit(long tabletSize, TStorageMedium medium,
List<RootPathLoadStatistic> result,
+ boolean isSupplement) {
+ return BalanceStatus.OK;
+ }
}
@Test
@@ -599,4 +642,51 @@ public class ColocateTableCheckerAndBalancerTest {
System.out.println(availableBeIds);
Assert.assertArrayEquals(new long[]{2L, 4L},
availableBeIds.stream().mapToLong(i -> i).sorted().toArray());
}
+
+ @Test
+ public void testGlobalColocateStatistic() {
+ GroupId groupId1 = new GroupId(1L, 10000L);
+ GroupId groupId2 = new GroupId(2L, 20000L);
+ GlobalColocateStatistic globalColocateStatistic = new
GlobalColocateStatistic();
+ globalColocateStatistic.addGroup(groupId1, new
ReplicaAllocation((short) 2),
+ Lists.newArrayList(Sets.newHashSet(1001L, 1002L),
Sets.newHashSet(1002L, 1003L),
+ Sets.newHashSet(1001L, 1003L)),
+ Lists.newArrayList(100L, 200L, 300L), 5);
+ globalColocateStatistic.addGroup(groupId2, new
ReplicaAllocation((short) 1),
+ Lists.newArrayList(Sets.newHashSet(1001L),
Sets.newHashSet(1002L),
+ Sets.newHashSet(1003L), Sets.newHashSet(1001L)),
+ Lists.newArrayList(100L, 200L, 300L, 400L), 7);
+
+ Map<Long, BackendBuckets> backendBucketsMap =
globalColocateStatistic.getBackendBucketsMap();
+ BackendBuckets backendBuckets1 = backendBucketsMap.get(1001L);
+ Assert.assertNotNull(backendBuckets1);
+ Assert.assertEquals(Lists.newArrayList(0, 2),
+ backendBuckets1.getGroupTabletOrderIndices().get(groupId1));
+ Assert.assertEquals(Lists.newArrayList(0, 3),
+ backendBuckets1.getGroupTabletOrderIndices().get(groupId2));
+ BackendBuckets backendBuckets2 = backendBucketsMap.get(1002L);
+ Assert.assertNotNull(backendBuckets2);
+ Assert.assertEquals(Lists.newArrayList(0, 1),
+ backendBuckets2.getGroupTabletOrderIndices().get(groupId1));
+ Assert.assertEquals(Lists.newArrayList(1),
+ backendBuckets2.getGroupTabletOrderIndices().get(groupId2));
+ BackendBuckets backendBuckets3 = backendBucketsMap.get(1003L);
+ Assert.assertNotNull(backendBuckets3);
+ Assert.assertEquals(Lists.newArrayList(1, 2),
+ backendBuckets3.getGroupTabletOrderIndices().get(groupId1));
+ Assert.assertEquals(Lists.newArrayList(2),
+ backendBuckets3.getGroupTabletOrderIndices().get(groupId2));
+
+ Map<GroupId, List<BucketStatistic>> allGroupBucketsMap =
globalColocateStatistic.getAllGroupBucketsMap();
+ Assert.assertEquals(Lists.newArrayList(new BucketStatistic(0, 5,
100L), new BucketStatistic(1, 5, 200L),
+ new BucketStatistic(2, 5, 300L)),
+ allGroupBucketsMap.get(groupId1));
+ Assert.assertEquals(Lists.newArrayList(new BucketStatistic(0, 7,
100L), new BucketStatistic(1, 7, 200L),
+ new BucketStatistic(2, 7, 300L), new BucketStatistic(3, 7,
400L)),
+ allGroupBucketsMap.get(groupId2));
+
+ Map<Tag, Integer> expectAllTagBucketNum = Maps.newHashMap();
+ expectAllTagBucketNum.put(Tag.DEFAULT_BACKEND_TAG, 10);
+ Assert.assertEquals(expectAllTagBucketNum,
globalColocateStatistic.getAllTagBucketNum());
+ }
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java
b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java
index 95f71d0b51a..da03d42a644 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java
@@ -24,6 +24,7 @@ import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.TabletMeta;
@@ -37,6 +38,7 @@ import com.google.common.collect.Table;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.stream.IntStream;
public class RebalancerTestUtil {
@@ -110,7 +112,8 @@ public class RebalancerTestUtil {
}
public static void updateReplicaPathHash() {
- Table<Long, Long, Replica> replicaMetaTable =
Env.getCurrentInvertedIndex().getReplicaMetaTable();
+ Table<Long, Long, Replica> replicaMetaTable =
+ Env.getCurrentInvertedIndex().getReplicaMetaTable();
for (Table.Cell<Long, Long, Replica> cell :
replicaMetaTable.cellSet()) {
long beId = cell.getColumnKey();
Backend be = Env.getCurrentSystemInfo().getBackend(beId);
@@ -129,4 +132,47 @@ public class RebalancerTestUtil {
}
}
+
+ public static void updateReplicaDataSize(long minReplicaSize, int
tableSkew, int tabletSkew) {
+ Random random = new Random();
+ tableSkew = Math.max(tableSkew, 1);
+ tabletSkew = Math.max(tabletSkew, 1);
+ Env env = Env.getCurrentEnv();
+ List<Long> dbIds = env.getInternalCatalog().getDbIds();
+ for (Long dbId : dbIds) {
+ Database db = env.getInternalCatalog().getDbNullable(dbId);
+ if (db == null) {
+ continue;
+ }
+
+ if (db.isMysqlCompatibleDatabase()) {
+ continue;
+ }
+
+ for (org.apache.doris.catalog.Table table : db.getTables()) {
+ long tableBaseSize = minReplicaSize * (1 +
random.nextInt(tableSkew));
+ table.readLock();
+ try {
+ if (table.getType() != TableType.OLAP) {
+ continue;
+ }
+
+ OlapTable tbl = (OlapTable) table;
+ for (Partition partition : tbl.getAllPartitions()) {
+ for (MaterializedIndex idx :
partition.getMaterializedIndices(
+ MaterializedIndex.IndexExtState.VISIBLE)) {
+ for (Tablet tablet : idx.getTablets()) {
+ long tabletSize = tableBaseSize * (1 +
random.nextInt(tabletSkew));
+ for (Replica replica : tablet.getReplicas()) {
+ replica.updateStat(tabletSize, 1000L);
+ }
+ }
+ }
+ }
+ } finally {
+ table.readUnlock();
+ }
+ }
+ }
+ }
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
index 3a09cae73bb..bf2a7964b91 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
@@ -17,11 +17,14 @@
package org.apache.doris.utframe;
+import org.apache.doris.catalog.CatalogTestUtil;
+import org.apache.doris.catalog.DiskInfo;
import org.apache.doris.common.ClientPool;
import org.apache.doris.proto.Data;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.PBackendServiceGrpc;
import org.apache.doris.proto.Types;
+import org.apache.doris.system.Backend;
import org.apache.doris.thrift.BackendService;
import org.apache.doris.thrift.FrontendService;
import org.apache.doris.thrift.HeartbeatService;
@@ -35,6 +38,7 @@ import org.apache.doris.thrift.TCancelPlanFragmentResult;
import org.apache.doris.thrift.TCheckStorageFormatResult;
import org.apache.doris.thrift.TCloneReq;
import org.apache.doris.thrift.TDiskTrashInfo;
+import org.apache.doris.thrift.TDropTabletReq;
import org.apache.doris.thrift.TExecPlanFragmentParams;
import org.apache.doris.thrift.TExecPlanFragmentResult;
import org.apache.doris.thrift.TExportState;
@@ -45,6 +49,7 @@ import org.apache.doris.thrift.THeartbeatResult;
import org.apache.doris.thrift.TIngestBinlogRequest;
import org.apache.doris.thrift.TIngestBinlogResult;
import org.apache.doris.thrift.TMasterInfo;
+import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TRoutineLoadTask;
import org.apache.doris.thrift.TScanBatchResult;
import org.apache.doris.thrift.TScanCloseParams;
@@ -124,11 +129,16 @@ public class MockedBackendFactory {
// User can extends this abstract class to create other custom be thrift
service
public abstract static class BeThriftService implements
BackendService.Iface {
protected MockedBackend backend;
+ protected Backend backendInFe;
public void setBackend(MockedBackend backend) {
this.backend = backend;
}
+ public void setBackendInFe(Backend backendInFe) {
+ this.backendInFe = backendInFe;
+ }
+
public abstract void init();
}
@@ -151,11 +161,15 @@ public class MockedBackendFactory {
@Override
public void run() {
while (true) {
+ boolean ok = false;
+ FrontendService.Client client = null;
+ TNetworkAddress address = null;
try {
+ address = backend.getFeAddress();
TAgentTaskRequest request = taskQueue.take();
System.out.println(
"get agent task request. type: " +
request.getTaskType() + ", signature: "
- + request.getSignature() + ", fe addr: " +
backend.getFeAddress());
+ + request.getSignature() + ", fe addr: " +
address);
TFinishTaskRequest finishTaskRequest = new
TFinishTaskRequest(tBackend,
request.getTaskType(),
request.getSignature(), new TStatus(TStatusCode.OK));
TTaskType taskType = request.getTaskType();
@@ -164,35 +178,79 @@ public class MockedBackendFactory {
case ALTER:
++reportVersion;
break;
+ case DROP:
+ handleDropTablet(request,
finishTaskRequest);
+ break;
case CLONE:
- handleClone(request, finishTaskRequest);
+ handleCloneTablet(request,
finishTaskRequest);
break;
default:
break;
}
finishTaskRequest.setReportVersion(reportVersion);
- FrontendService.Client client =
-
ClientPool.frontendPool.borrowObject(backend.getFeAddress(), 2000);
- System.out.println("get fe " +
backend.getFeAddress() + " client: " + client);
+ client =
ClientPool.frontendPool.borrowObject(address, 2000);
client.finishTask(finishTaskRequest);
+ ok = true;
} catch (Exception e) {
e.printStackTrace();
+ } finally {
+ if (ok) {
+ ClientPool.frontendPool.returnObject(address,
client);
+ } else {
+
ClientPool.frontendPool.invalidateObject(address, client);
+ }
}
}
}
- private void handleClone(TAgentTaskRequest request,
TFinishTaskRequest finishTaskRequest) {
+ private void handleDropTablet(TAgentTaskRequest request,
TFinishTaskRequest finishTaskRequest) {
+ TDropTabletReq req = request.getDropTabletReq();
+ long dataSize = Math.max(1,
CatalogTestUtil.getTabletDataSize(req.tablet_id));
+ DiskInfo diskInfo = getDisk(-1);
+ if (diskInfo != null) {
+ diskInfo.setDataUsedCapacityB(Math.max(0L,
+ diskInfo.getDataUsedCapacityB() -
dataSize));
+
diskInfo.setAvailableCapacityB(Math.min(diskInfo.getTotalCapacityB(),
+ diskInfo.getAvailableCapacityB() +
dataSize));
+ }
+ }
+
+ private void handleCloneTablet(TAgentTaskRequest request,
TFinishTaskRequest finishTaskRequest) {
TCloneReq req = request.getCloneReq();
+ long dataSize = Math.max(1,
CatalogTestUtil.getTabletDataSize(req.tablet_id));
+ long pathHash = req.dest_path_hash;
+ DiskInfo diskInfo = getDisk(pathHash);
+ if (diskInfo != null) {
+ pathHash = diskInfo.getPathHash();
+
diskInfo.setDataUsedCapacityB(Math.min(diskInfo.getTotalCapacityB(),
+ diskInfo.getDataUsedCapacityB() +
dataSize));
+ diskInfo.setAvailableCapacityB(Math.max(0L,
+ diskInfo.getAvailableCapacityB() -
dataSize));
+ }
+
List<TTabletInfo> tabletInfos = Lists.newArrayList();
TTabletInfo tabletInfo = new TTabletInfo(req.tablet_id,
req.schema_hash, req.committed_version,
- req.committed_version_hash, 1, 1);
+ req.committed_version_hash, 1, dataSize);
tabletInfo.setStorageMedium(req.storage_medium);
- tabletInfo.setPathHash(req.dest_path_hash);
+ tabletInfo.setPathHash(pathHash);
tabletInfo.setUsed(true);
tabletInfos.add(tabletInfo);
finishTaskRequest.setFinishTabletInfos(tabletInfos);
}
+
+ private DiskInfo getDisk(long pathHash) {
+ DiskInfo diskInfo = null;
+ for (DiskInfo tmpDiskInfo :
backendInFe.getDisks().values()) {
+ diskInfo = tmpDiskInfo;
+ if (diskInfo.getPathHash() == pathHash
+ || pathHash == -1L || pathHash == 0) {
+ break;
+ }
+ }
+
+ return diskInfo;
+ }
}).start();
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
index 5edf5917a39..9a86b264a62 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
@@ -453,9 +453,10 @@ public abstract class TestWithFeService {
int beHttpPort = findValidPort();
// start be
+ MockedBackendFactory.BeThriftService beThriftService = new
DefaultBeThriftServiceImpl();
MockedBackend backend = MockedBackendFactory.createBackend(beHost,
beHeartbeatPort, beThriftPort, beBrpcPort,
beHttpPort, new DefaultHeartbeatServiceImpl(beThriftPort,
beHttpPort, beBrpcPort),
- new DefaultBeThriftServiceImpl(), new
DefaultPBackendServiceImpl());
+ beThriftService, new DefaultPBackendServiceImpl());
backend.setFeAddress(new TNetworkAddress("127.0.0.1", feRpcPort));
backend.start();
@@ -466,6 +467,7 @@ public abstract class TestWithFeService {
diskInfo1.setTotalCapacityB(10L << 30);
diskInfo1.setAvailableCapacityB(5L << 30);
diskInfo1.setDataUsedCapacityB(480000);
+ diskInfo1.setPathHash(be.getId());
Map<String, DiskInfo> disks = Maps.newHashMap();
disks.put(diskInfo1.getRootPath(), diskInfo1);
be.setDisks(ImmutableMap.copyOf(disks));
@@ -473,6 +475,7 @@ public abstract class TestWithFeService {
be.setBePort(beThriftPort);
be.setHttpPort(beHttpPort);
be.setBrpcPort(beBrpcPort);
+ beThriftService.setBackendInFe(be);
Env.getCurrentSystemInfo().addBackend(be);
return be;
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java
b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java
index 9f24461dbfc..a5475a3e5aa 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java
@@ -280,9 +280,10 @@ public class UtFrameUtils {
int beHttpPort = findValidPort();
// start be
+ MockedBackendFactory.BeThriftService beThriftService = new
DefaultBeThriftServiceImpl();
MockedBackend backend = MockedBackendFactory.createBackend(beHost,
beHeartbeatPort, beThriftPort, beBrpcPort,
beHttpPort, new DefaultHeartbeatServiceImpl(beThriftPort,
beHttpPort, beBrpcPort),
- new DefaultBeThriftServiceImpl(), new
DefaultPBackendServiceImpl());
+ beThriftService, new DefaultPBackendServiceImpl());
backend.setFeAddress(new TNetworkAddress("127.0.0.1", feRpcPort));
backend.start();
@@ -293,13 +294,16 @@ public class UtFrameUtils {
diskInfo1.setTotalCapacityB(10L << 30);
diskInfo1.setAvailableCapacityB(5L << 30);
diskInfo1.setDataUsedCapacityB(480000);
+ diskInfo1.setPathHash(be.getId());
disks.put(diskInfo1.getRootPath(), diskInfo1);
be.setDisks(ImmutableMap.copyOf(disks));
be.setAlive(true);
be.setBePort(beThriftPort);
be.setHttpPort(beHttpPort);
be.setBrpcPort(beBrpcPort);
+ beThriftService.setBackendInFe(be);
Env.getCurrentSystemInfo().addBackend(be);
+
return be;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]