This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 0c23915f6d2 [improvement](tablet schedule) colocate balance between 
all groups #23543 (#26229)
0c23915f6d2 is described below

commit 0c23915f6d2d0bf7e8e4cc8fbd054b9abfb08758
Author: yujun <[email protected]>
AuthorDate: Wed Nov 1 22:06:43 2023 +0800

    [improvement](tablet schedule) colocate balance between all groups #23543 
(#26229)
---
 .../main/java/org/apache/doris/common/Config.java  |   4 +
 .../clone/ColocateTableCheckerAndBalancer.java     | 521 +++++++++++++++++++--
 .../org/apache/doris/clone/TabletScheduler.java    |   2 +-
 .../org/apache/doris/catalog/CatalogTestUtil.java  |  44 ++
 .../org/apache/doris/clone/BalanceStatistic.java   | 102 ++++
 .../ColocateTableCheckerAndBalancerPerfTest.java   | 196 ++++++++
 .../clone/ColocateTableCheckerAndBalancerTest.java | 112 ++++-
 .../org/apache/doris/clone/RebalancerTestUtil.java |  48 +-
 .../apache/doris/utframe/MockedBackendFactory.java |  74 ++-
 .../apache/doris/utframe/TestWithFeService.java    |   5 +-
 .../org/apache/doris/utframe/UtFrameUtils.java     |   6 +-
 11 files changed, 1049 insertions(+), 65 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index bf499ef0af4..b5f7b0ce92b 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -723,6 +723,10 @@ public class Config extends ConfigBase {
      */
     @ConfField(mutable = true, masterOnly = true) public static boolean 
disable_colocate_balance = false;
 
+    @ConfField(mutable = true, masterOnly = true, description = 
{"是否启用group间的均衡",
+            "is allow colocate balance between all groups"})
+    public static boolean disable_colocate_balance_between_groups = false;
+
     /**
      * The default user resource publishing timeout.
      */
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
 
b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
index 5c18c2bd468..e70ec445cd6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/clone/ColocateTableCheckerAndBalancer.java
@@ -49,6 +49,7 @@ import com.google.common.collect.Sets;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -79,6 +80,248 @@ public class ColocateTableCheckerAndBalancer extends 
MasterDaemon {
         return INSTANCE;
     }
 
+    public static class BucketStatistic {
+        public int tabletOrderIdx;
+        public int totalReplicaNum;
+        public long totalReplicaDataSize;
+
+        public BucketStatistic(int tabletOrderIdx, int totalReplicaNum, long 
totalReplicaDataSize) {
+            this.tabletOrderIdx = tabletOrderIdx;
+            this.totalReplicaNum = totalReplicaNum;
+            this.totalReplicaDataSize = totalReplicaDataSize;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (!(obj instanceof BucketStatistic)) {
+                return false;
+            }
+
+            BucketStatistic other = (BucketStatistic) obj;
+            return tabletOrderIdx == other.tabletOrderIdx && totalReplicaNum 
== other.totalReplicaNum
+                    && totalReplicaDataSize == other.totalReplicaDataSize;
+        }
+
+        @Override
+        public String toString() {
+            return "{ orderIdx: " + tabletOrderIdx + ", total replica num: " + 
totalReplicaNum
+                    + ", total data size: " + totalReplicaDataSize + " }";
+        }
+    }
+
+    public static class BackendBuckets {
+        private long beId;
+        private Map<GroupId, List<Integer>>  groupTabletOrderIndices = 
Maps.newHashMap();
+
+        public BackendBuckets(long beId) {
+            this.beId = beId;
+        }
+
+        // for test
+        public Map<GroupId, List<Integer>> getGroupTabletOrderIndices() {
+            return groupTabletOrderIndices;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (!(obj instanceof BackendBuckets)) {
+                return false;
+            }
+
+            BackendBuckets other = (BackendBuckets) obj;
+            return beId == other.beId && 
groupTabletOrderIndices.equals(other.groupTabletOrderIndices);
+        }
+
+        @Override
+        public String toString() {
+            return "{ backendId: " + beId + ", group order index: " + 
groupTabletOrderIndices + " }";
+        }
+
+        public void addGroupTablet(GroupId groupId, int tabletOrderIdx) {
+            List<Integer> indices = groupTabletOrderIndices.get(groupId);
+            if (indices == null) {
+                indices = Lists.newArrayList();
+                groupTabletOrderIndices.put(groupId, indices);
+            }
+            indices.add(tabletOrderIdx);
+        }
+
+        public void removeGroupTablet(GroupId groupId, int tabletOrderIdx) {
+            List<Integer> indices = groupTabletOrderIndices.get(groupId);
+            if (indices == null) {
+                return;
+            }
+
+            indices.remove(Integer.valueOf(tabletOrderIdx));
+            if (indices.isEmpty()) {
+                groupTabletOrderIndices.remove(groupId);
+            }
+        }
+
+        public boolean containsGroupTablet(GroupId groupId, int 
tabletOrderIdx) {
+            List<Integer> indices = groupTabletOrderIndices.get(groupId);
+            if (indices == null) {
+                return false;
+            }
+
+            return indices.indexOf(Integer.valueOf(tabletOrderIdx)) >= 0;
+        }
+
+        public int getTotalReplicaNum(Map<GroupId, List<BucketStatistic>> 
allGroupBucketsMap) {
+            int totalReplicaNum = 0;
+            for (Map.Entry<GroupId, List<Integer>> entry : 
groupTabletOrderIndices.entrySet()) {
+                List<BucketStatistic> bucketStatistics = 
allGroupBucketsMap.get(entry.getKey());
+                if (bucketStatistics != null) {
+                    for (int tabletOrderIdx : entry.getValue()) {
+                        if (tabletOrderIdx < bucketStatistics.size()) {
+                            totalReplicaNum += 
bucketStatistics.get(tabletOrderIdx).totalReplicaNum;
+                        }
+                    }
+                }
+            }
+
+            return totalReplicaNum;
+        }
+
+        public long getTotalReplicaDataSize(Map<GroupId, 
List<BucketStatistic>> allGroupBucketsMap) {
+            long totalReplicaDataSize = 0;
+            for (Map.Entry<GroupId, List<Integer>> entry : 
groupTabletOrderIndices.entrySet()) {
+                List<BucketStatistic> bucketStatistics = 
allGroupBucketsMap.get(entry.getKey());
+                if (bucketStatistics != null) {
+                    for (int tabletOrderIdx : entry.getValue()) {
+                        if (tabletOrderIdx < bucketStatistics.size()) {
+                            totalReplicaDataSize += 
bucketStatistics.get(tabletOrderIdx).totalReplicaDataSize;
+                        }
+                    }
+                }
+            }
+
+            return totalReplicaDataSize;
+        }
+
+        public int getTotalBucketsNum() {
+            return groupTabletOrderIndices.values().stream().mapToInt(indices 
-> indices.size()).sum();
+        }
+
+        public int getGroupBucketsNum(GroupId groupId) {
+            List<Integer> indices = groupTabletOrderIndices.get(groupId);
+            if (indices == null) {
+                return 0;
+            } else {
+                return indices.size();
+            }
+        }
+    }
+
+    public static class GlobalColocateStatistic {
+        private Map<Long, BackendBuckets> backendBucketsMap = 
Maps.newHashMap();
+        private Map<GroupId, List<BucketStatistic>> allGroupBucketsMap = 
Maps.newHashMap();
+        private Map<Tag, Integer> allTagBucketNum = Maps.newHashMap();
+        private static final BackendBuckets DUMMY_BE = new BackendBuckets(0);
+
+        public GlobalColocateStatistic() {
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (!(obj instanceof GlobalColocateStatistic)) {
+                return false;
+            }
+
+            GlobalColocateStatistic other = (GlobalColocateStatistic) obj;
+            return backendBucketsMap.equals(other.backendBucketsMap)
+                    && allGroupBucketsMap.equals(other.allGroupBucketsMap)
+                    && allTagBucketNum.equals(other.allTagBucketNum);
+        }
+
+        @Override
+        public String toString() {
+            return "{ backends: " + backendBucketsMap + ", groups: " + 
allGroupBucketsMap
+                    + ", tag bucket num: " + allTagBucketNum + " }";
+        }
+
+        Map<Long, BackendBuckets> getBackendBucketsMap() {
+            return backendBucketsMap;
+        }
+
+        Map<GroupId, List<BucketStatistic>> getAllGroupBucketsMap() {
+            return allGroupBucketsMap;
+        }
+
+        Map<Tag, Integer> getAllTagBucketNum() {
+            return allTagBucketNum;
+        }
+
+        public boolean moveTablet(GroupId groupId, int tabletOrderIdx,
+                long srcBeId, long destBeId) {
+            BackendBuckets srcBackendBuckets = backendBucketsMap.get(srcBeId);
+            if (srcBackendBuckets == null || 
!srcBackendBuckets.containsGroupTablet(groupId, tabletOrderIdx)) {
+                return false;
+            }
+
+            BackendBuckets destBackendBuckets = 
backendBucketsMap.get(destBeId);
+            if (destBackendBuckets == null) {
+                destBackendBuckets = new BackendBuckets(destBeId);
+                backendBucketsMap.put(destBeId, destBackendBuckets);
+            }
+            if (destBackendBuckets.containsGroupTablet(groupId, 
tabletOrderIdx)) {
+                return false;
+            }
+
+            srcBackendBuckets.removeGroupTablet(groupId, tabletOrderIdx);
+            destBackendBuckets.addGroupTablet(groupId, tabletOrderIdx);
+            if (srcBackendBuckets.getTotalBucketsNum() == 0) {
+                backendBucketsMap.remove(srcBeId);
+            }
+
+            return true;
+        }
+
+        public int getBackendTotalBucketNum(long backendId) {
+            return backendBucketsMap.getOrDefault(backendId, 
DUMMY_BE).getTotalBucketsNum();
+        }
+
+        public long getBackendTotalReplicaDataSize(long backendId) {
+            return backendBucketsMap.getOrDefault(backendId, DUMMY_BE)
+                    .getTotalReplicaDataSize(allGroupBucketsMap);
+        }
+
+        public long getBucketTotalReplicaDataSize(GroupId groupId, int 
tabletOrderIdx) {
+            List<BucketStatistic> bucketStatistics = 
allGroupBucketsMap.get(groupId);
+            if (bucketStatistics != null && tabletOrderIdx < 
bucketStatistics.size()) {
+                return 
bucketStatistics.get(tabletOrderIdx).totalReplicaDataSize;
+            } else {
+                return 0L;
+            }
+        }
+
+        public void addGroup(GroupId groupId, ReplicaAllocation replicaAlloc, 
List<Set<Long>> backendBucketsSeq,
+                List<Long> totalReplicaDataSizes, int 
totalReplicaNumPerBucket) {
+            Preconditions.checkState(backendBucketsSeq.size() == 
totalReplicaDataSizes.size(),
+                    backendBucketsSeq.size() + " vs. " + 
totalReplicaDataSizes.size());
+            List<BucketStatistic> bucketStatistics = Lists.newArrayList();
+            for (int tabletOrderIdx = 0; tabletOrderIdx < 
backendBucketsSeq.size(); tabletOrderIdx++) {
+                BucketStatistic bucket = new BucketStatistic(tabletOrderIdx, 
totalReplicaNumPerBucket,
+                        totalReplicaDataSizes.get(tabletOrderIdx));
+                bucketStatistics.add(bucket);
+                for (long backendId : backendBucketsSeq.get(tabletOrderIdx)) {
+                    BackendBuckets backendBuckets = 
backendBucketsMap.get(backendId);
+                    if (backendBuckets == null) {
+                        backendBuckets = new BackendBuckets(backendId);
+                        backendBucketsMap.put(backendId, backendBuckets);
+                    }
+                    backendBuckets.addGroupTablet(groupId, tabletOrderIdx);
+                }
+            }
+            int bucketNum = backendBucketsSeq.size();
+            replicaAlloc.getAllocMap().forEach((tag, count) -> {
+                allTagBucketNum.put(tag, allTagBucketNum.getOrDefault(tag, 0) 
+ bucketNum * count);
+            });
+            allGroupBucketsMap.put(groupId, bucketStatistics);
+        }
+
+    }
+
     @Override
     /*
      * Each round, we do 2 steps:
@@ -92,8 +335,8 @@ public class ColocateTableCheckerAndBalancer extends 
MasterDaemon {
      *      Otherwise, mark the group as stable
      */
     protected void runAfterCatalogReady() {
-        relocateAndBalanceGroup();
-        matchGroup();
+        relocateAndBalanceGroups();
+        matchGroups();
     }
 
     /*
@@ -134,17 +377,32 @@ public class ColocateTableCheckerAndBalancer extends 
MasterDaemon {
      * +-+  +-+  +-+  +-+
      *  A    B    C    D
      */
-    private void relocateAndBalanceGroup() {
+    private void relocateAndBalanceGroups() {
+        Set<GroupId> groupIds = 
Sets.newHashSet(Env.getCurrentEnv().getColocateTableIndex().getAllGroupIds());
+
+        // balance only inside each group, excluded balance between all groups
+        Set<GroupId> changeGroups = relocateAndBalanceGroup(groupIds, false);
+
+        if (!Config.disable_colocate_balance_between_groups
+                && !changeGroups.isEmpty()) {
+            // balance both inside each group and between all groups
+            relocateAndBalanceGroup(changeGroups, true);
+        }
+    }
+
+    private Set<GroupId> relocateAndBalanceGroup(Set<GroupId> groupIds, 
boolean balanceBetweenGroups) {
+        Set<GroupId> changeGroups = Sets.newHashSet();
         if (Config.disable_colocate_balance) {
-            return;
+            return changeGroups;
         }
 
         Env env = Env.getCurrentEnv();
         ColocateTableIndex colocateIndex = env.getColocateTableIndex();
         SystemInfoService infoService = Env.getCurrentSystemInfo();
 
+        GlobalColocateStatistic globalColocateStatistic = 
buildGlobalColocateStatistic();
+
         // get all groups
-        Set<GroupId> groupIds = colocateIndex.getAllGroupIds();
         for (GroupId groupId : groupIds) {
             Map<Tag, LoadStatisticForTag> statisticMap = 
env.getTabletScheduler().getStatisticMap();
             if (statisticMap == null) {
@@ -182,8 +440,10 @@ public class ColocateTableCheckerAndBalancer extends 
MasterDaemon {
                 // try relocate or balance this group for specified tag
                 List<List<Long>> balancedBackendsPerBucketSeq = 
Lists.newArrayList();
                 if (relocateAndBalance(groupId, tag, unavailableBeIdsInGroup, 
availableBeIds, colocateIndex,
-                        infoService, statistic, balancedBackendsPerBucketSeq)) 
{
+                        infoService, statistic, globalColocateStatistic, 
balancedBackendsPerBucketSeq,
+                        balanceBetweenGroups)) {
                     colocateIndex.addBackendsPerBucketSeqByTag(groupId, tag, 
balancedBackendsPerBucketSeq);
+                    changeGroups.add(groupId);
                     Map<Tag, List<List<Long>>> balancedBackendsPerBucketSeqMap 
= Maps.newHashMap();
                     balancedBackendsPerBucketSeqMap.put(tag, 
balancedBackendsPerBucketSeq);
                     ColocatePersistInfo info = ColocatePersistInfo
@@ -194,6 +454,8 @@ public class ColocateTableCheckerAndBalancer extends 
MasterDaemon {
                 }
             }
         }
+
+        return changeGroups;
     }
 
     /*
@@ -201,7 +463,7 @@ public class ColocateTableCheckerAndBalancer extends 
MasterDaemon {
      * replicas, and mark that group as unstable.
      * If every replicas match the backends in group, mark that group as 
stable.
      */
-    private void matchGroup() {
+    private void matchGroups() {
         long start = System.currentTimeMillis();
         CheckerCounter counter = new CheckerCounter();
 
@@ -311,6 +573,83 @@ public class ColocateTableCheckerAndBalancer extends 
MasterDaemon {
                 counter.tabletInScheduler, counter.tabletNotReady, cost);
     }
 
+    private GlobalColocateStatistic buildGlobalColocateStatistic() {
+        Env env = Env.getCurrentEnv();
+        ColocateTableIndex colocateIndex = env.getColocateTableIndex();
+        GlobalColocateStatistic globalColocateStatistic = new 
GlobalColocateStatistic();
+
+        Set<GroupId> groupIds = colocateIndex.getAllGroupIds();
+        for (GroupId groupId : groupIds) {
+            ColocateGroupSchema groupSchema = 
colocateIndex.getGroupSchema(groupId);
+            if (groupSchema == null) {
+                continue;
+            }
+            ReplicaAllocation replicaAlloc = groupSchema.getReplicaAlloc();
+            List<Long> tableIds = colocateIndex.getAllTableIds(groupId);
+            List<Set<Long>> backendBucketsSeq = 
colocateIndex.getBackendsPerBucketSeqSet(groupId);
+            if (backendBucketsSeq.isEmpty()) {
+                continue;
+            }
+
+            int totalReplicaNumPerBucket = 0;
+            ArrayList<Long> totalReplicaDataSizes = Lists.newArrayList();
+            for (int i = 0; i < backendBucketsSeq.size(); i++) {
+                totalReplicaDataSizes.add(0L);
+            }
+
+            for (Long tableId : tableIds) {
+                long dbId = groupId.dbId;
+                if (dbId == 0) {
+                    dbId = groupId.getDbIdByTblId(tableId);
+                }
+                Database db = env.getInternalCatalog().getDbNullable(dbId);
+                if (db == null) {
+                    continue;
+                }
+                OlapTable olapTable = (OlapTable) db.getTableNullable(tableId);
+                if (olapTable == null || 
!colocateIndex.isColocateTable(olapTable.getId())) {
+                    continue;
+                }
+
+                olapTable.readLock();
+                try {
+                    for (Partition partition : olapTable.getPartitions()) {
+                        short replicationNum = 
replicaAlloc.getTotalReplicaNum();
+
+                        // Here we only get VISIBLE indexes. All other indexes 
are not queryable.
+                        // So it does not matter if tablets of other indexes 
are not matched.
+
+                        for (MaterializedIndex index : 
partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
+                            Preconditions.checkState(backendBucketsSeq.size() 
== index.getTablets().size(),
+                                    backendBucketsSeq.size() + " vs. " + 
index.getTablets().size());
+                            int tabletOrderIdx = 0;
+                            totalReplicaNumPerBucket++;
+                            for (Long tabletId : index.getTabletIdsInOrder()) {
+                                Set<Long> bucketsSeq = 
backendBucketsSeq.get(tabletOrderIdx);
+                                Preconditions.checkState(bucketsSeq.size() == 
replicationNum,
+                                        bucketsSeq.size() + " vs. " + 
replicationNum);
+                                Tablet tablet = index.getTablet(tabletId);
+                                totalReplicaDataSizes.set(tabletOrderIdx,
+                                        
totalReplicaDataSizes.get(tabletOrderIdx) + tablet.getDataSize(true));
+                                tabletOrderIdx++;
+                            }
+                        }
+                    }
+                } catch (Exception e) {
+                    LOG.warn("build group {} colocate statistic error", 
groupId, e);
+                    continue;
+                } finally {
+                    olapTable.readUnlock();
+                }
+            }
+
+            globalColocateStatistic.addGroup(groupId, replicaAlloc, 
backendBucketsSeq, totalReplicaDataSizes,
+                    totalReplicaNumPerBucket);
+        }
+
+        return globalColocateStatistic;
+    }
+
     /*
      * Each balance is performed for a single workload group in a colocate 
group.
      * For example, if the replica allocation of a colocate group is {TagA: 2, 
TagB: 1},
@@ -369,8 +708,9 @@ public class ColocateTableCheckerAndBalancer extends 
MasterDaemon {
      *  Return false if nothing changed.
      */
     private boolean relocateAndBalance(GroupId groupId, Tag tag, Set<Long> 
unavailableBeIds, List<Long> availableBeIds,
-            ColocateTableIndex colocateIndex, SystemInfoService infoService,
-            LoadStatisticForTag statistic, List<List<Long>> 
balancedBackendsPerBucketSeq) {
+            ColocateTableIndex colocateIndex, SystemInfoService infoService, 
LoadStatisticForTag statistic,
+            GlobalColocateStatistic globalColocateStatistic, List<List<Long>> 
balancedBackendsPerBucketSeq,
+            boolean balanceBetweenGroups) {
         ColocateGroupSchema groupSchema = 
colocateIndex.getGroupSchema(groupId);
         short replicaNum = 
groupSchema.getReplicaAlloc().getReplicaNumByTag(tag);
         List<List<Long>> backendsPerBucketSeq = Lists.newArrayList(
@@ -379,7 +719,16 @@ public class ColocateTableCheckerAndBalancer extends 
MasterDaemon {
         List<Long> flatBackendsPerBucketSeq = backendsPerBucketSeq.stream()
                 .flatMap(List::stream).collect(Collectors.toList());
 
+        int tagTotalBucketNum = 
globalColocateStatistic.getAllTagBucketNum().getOrDefault(tag, 0);
+        int availableBeNum = availableBeIds.size();
+        int highTotalBucketNumPerBe = availableBeNum == 0 ? 0 :
+                (tagTotalBucketNum + availableBeNum - 1) / availableBeNum;
+        int lowTotalBucketNumPerBe = availableBeNum == 0 ? 0 : 
tagTotalBucketNum / availableBeNum;
+
         boolean isChanged = false;
+        int times = 0;
+        List<RootPathLoadStatistic> resultPaths = Lists.newArrayList();
+
         OUT:
         while (true) {
             // update backends and hosts at each round
@@ -390,27 +739,34 @@ public class ColocateTableCheckerAndBalancer extends 
MasterDaemon {
                 return false;
             }
             Preconditions.checkState(backendsPerBucketSeq.size() == 
hostsPerBucketSeq.size());
+            times++;
+            if (times > 10 * backendsPerBucketSeq.size()) {
+                LOG.warn("iterate too many times for relocate group: {}, 
times: {}, bucket num: {}",
+                        groupId, times, backendsPerBucketSeq.size());
+                break;
+            }
 
             long srcBeId = -1;
             List<Integer> seqIndexes = null;
-            boolean hasUnavailableBe = false;
+            boolean srcBeUnavailable = false;
             // first choose the unavailable be as src be
             for (Long beId : unavailableBeIds) {
                 seqIndexes = getBeSeqIndexes(flatBackendsPerBucketSeq, beId);
                 if (!seqIndexes.isEmpty()) {
                     srcBeId = beId;
-                    hasUnavailableBe = true;
+                    srcBeUnavailable = true;
                     LOG.info("find unavailable backend {} in colocate group: 
{}", beId, groupId);
                     break;
                 }
             }
+
             // sort backends with replica num in desc order
             List<Map.Entry<Long, Long>> backendWithReplicaNum =
-                    getSortedBackendReplicaNumPairs(availableBeIds,
-                            unavailableBeIds, statistic, 
flatBackendsPerBucketSeq);
+                    getSortedBackendReplicaNumPairs(availableBeIds, 
unavailableBeIds, statistic,
+                            globalColocateStatistic, flatBackendsPerBucketSeq);
 
             // if there is only one available backend and no unavailable 
bucketId to relocate, end the outer loop
-            if (backendWithReplicaNum.size() <= 1 && !hasUnavailableBe) {
+            if (backendWithReplicaNum.size() <= 1 && !srcBeUnavailable) {
                 break;
             }
 
@@ -426,12 +782,40 @@ public class ColocateTableCheckerAndBalancer extends 
MasterDaemon {
                 // we try to use a low backend to replace the src backend.
                 // if replace failed(eg: both backends are on some host), 
select next low backend and try(j--)
                 Map.Entry<Long, Long> lowBackend = 
backendWithReplicaNum.get(j);
-                if ((!hasUnavailableBe) && (seqIndexes.size() - 
lowBackend.getValue()) <= 1) {
-                    // balanced
-                    break OUT;
+                long destBeId = lowBackend.getKey();
+                if (!srcBeUnavailable) {
+                    long diffThisGroup = seqIndexes.size() - 
lowBackend.getValue();
+                    if (diffThisGroup < 1) {
+                        // balanced
+                        break OUT;
+                    }
+
+                    // src's group bucket num = dest's group bucket num + 1
+                    // if move group bucket from src to dest, dest will be one 
more group num than src.
+                    // check global view
+                    //
+                    // suppose bucket num = 3, three BE A/B/C,  two group 
group1/group2, then we have:
+                    //
+                    // A [ group1:bucket0,  group2:bucket0]
+                    // B [ group1:bucket1,  group2:bucket1]
+                    // C [ group1:bucket2,  group2:bucket2]
+                    //
+                    // if we add a new BE D, for each group: 
bucketNum(A)=bucketNum(B)=bucketNum(C)=1,  bucketNum(D)=0
+                    // so each group is balance, but in global groups view, 
it's not balance.
+                    // we should move one of the buckets to D
+                    if (diffThisGroup == 1) {
+                        if (!balanceBetweenGroups) {
+                            break OUT;
+                        }
+                        int srcTotalBucketNum = 
globalColocateStatistic.getBackendTotalBucketNum(srcBeId);
+                        int destTotalBucketNum = 
globalColocateStatistic.getBackendTotalBucketNum(destBeId);
+                        if (srcTotalBucketNum <= highTotalBucketNumPerBe
+                                || destTotalBucketNum >= 
lowTotalBucketNumPerBe) {
+                            continue;
+                        }
+                    }
                 }
 
-                long destBeId = lowBackend.getKey();
                 Backend destBe = infoService.getBackend(destBeId);
                 if (destBe == null) {
                     LOG.info("backend {} does not exist", destBeId);
@@ -454,6 +838,14 @@ public class ColocateTableCheckerAndBalancer extends 
MasterDaemon {
                     continue;
                 }
 
+                BackendLoadStatistic beStat = 
statistic.getBackendLoadStatistic(destBeId);
+                if (beStat == null) {
+                    LOG.warn("not found backend {} statistic", destBeId);
+                    continue;
+                }
+
+                int targetSeqIndex = -1;
+                long minDataSizeDiff = Long.MAX_VALUE;
                 for (int seqIndex : seqIndexes) {
                     // the bucket index.
                     // eg: 0 / 3 = 0, so that the bucket index of the 4th 
backend id in flatBackendsPerBucketSeq is 0.
@@ -461,26 +853,62 @@ public class ColocateTableCheckerAndBalancer extends 
MasterDaemon {
                     List<Long> backendsSet = 
backendsPerBucketSeq.get(bucketIndex);
                     List<String> hostsSet = hostsPerBucketSeq.get(bucketIndex);
                     // the replicas of a tablet can not locate in same Backend 
or same host
-                    if (!backendsSet.contains(destBeId) && 
!hostsSet.contains(destBe.getHost())) {
-                        
Preconditions.checkState(backendsSet.contains(srcBeId), srcBeId);
-                        flatBackendsPerBucketSeq.set(seqIndex, destBeId);
-                        LOG.info("replace backend {} with backend {} in 
colocate group {}, idx: {}",
-                                srcBeId, destBeId, groupId, seqIndex);
-                        // just replace one backend at a time, src and dest BE 
id should be recalculated because
-                        // flatBackendsPerBucketSeq is changed.
-                        isChanged = true;
-                        isThisRoundChanged = true;
-                        break;
+                    if (backendsSet.contains(destBeId) || 
hostsSet.contains(destBe.getHost())) {
+                        continue;
+                    }
+
+                    Preconditions.checkState(backendsSet.contains(srcBeId), 
srcBeId);
+                    long bucketDataSize =
+                            
globalColocateStatistic.getBucketTotalReplicaDataSize(groupId, bucketIndex);
+
+                    resultPaths.clear();
+                    BalanceStatus st = beStat.isFit(bucketDataSize, null, 
resultPaths, true);
+                    if (!st.ok()) {
+                        LOG.debug("backend {} is unable to fit in group {}, 
tablet order idx {}, data size {}",
+                                destBeId, groupId, bucketIndex, 
bucketDataSize);
+                        continue;
+                    }
+
+                    long newSrcBeTotalReplicaDataSize = 
globalColocateStatistic.getBackendTotalReplicaDataSize(srcBeId)
+                            - bucketDataSize;
+                    long newDestBeTotalReplicaDataSize =
+                            
globalColocateStatistic.getBackendTotalReplicaDataSize(destBeId) + 
bucketDataSize;
+                    long dataSizeDiff = Math.abs(newSrcBeTotalReplicaDataSize 
- newDestBeTotalReplicaDataSize);
+                    if (targetSeqIndex < 0 || dataSizeDiff < minDataSizeDiff) {
+                        targetSeqIndex = seqIndex;
+                        minDataSizeDiff = dataSizeDiff;
                     }
                 }
 
-                if (isThisRoundChanged) {
-                    // we found a change
-                    break;
+                if (targetSeqIndex < 0) {
+                    // we use next node as dst node
+                    LOG.info("unable to replace backend {} with backend {} in 
colocate group {}",
+                            srcBeId, destBeId, groupId);
+                    continue;
                 }
-                // we use next node as dst node
-                LOG.info("unable to replace backend {} with backend {} in 
colocate group {}",
-                        srcBeId, destBeId, groupId);
+
+                int tabletOrderIdx = targetSeqIndex / replicaNum;
+                int oldSrcThisGroup = seqIndexes.size();
+                long oldDestThisGroup = lowBackend.getValue();
+                int oldSrcBucketNum = 
globalColocateStatistic.getBackendTotalBucketNum(srcBeId);
+                int oldDestBucketNum = 
globalColocateStatistic.getBackendTotalBucketNum(destBeId);
+                LOG.debug("OneMove: group {}, src {}, this group {}, all group 
{}, dest {}, this group {}, "
+                        + "all group {}", groupId, srcBeId, oldSrcThisGroup, 
oldSrcBucketNum, destBeId,
+                        oldDestThisGroup, oldDestBucketNum);
+                Preconditions.checkState(
+                        globalColocateStatistic.moveTablet(groupId, 
tabletOrderIdx, srcBeId, destBeId));
+                Preconditions.checkState(oldSrcBucketNum - 1
+                        == 
globalColocateStatistic.getBackendTotalBucketNum(srcBeId));
+                Preconditions.checkState(oldDestBucketNum + 1
+                        == 
globalColocateStatistic.getBackendTotalBucketNum(destBeId));
+                flatBackendsPerBucketSeq.set(targetSeqIndex, destBeId);
+                // just replace one backend at a time, src and dest BE id 
should be recalculated because
+                // flatBackendsPerBucketSeq is changed.
+                isChanged = true;
+                isThisRoundChanged = true;
+                LOG.info("replace backend {} with backend {} in colocate group 
{}, idx: {}",
+                        srcBeId, destBeId, groupId, targetSeqIndex);
+                break;
             }
 
             if (!isThisRoundChanged) {
@@ -522,7 +950,9 @@ public class ColocateTableCheckerAndBalancer extends 
MasterDaemon {
     }
 
     private List<Map.Entry<Long, Long>> 
getSortedBackendReplicaNumPairs(List<Long> allAvailBackendIds,
-            Set<Long> unavailBackendIds, LoadStatisticForTag statistic, 
List<Long> flatBackendsPerBucketSeq) {
+            Set<Long> unavailBackendIds, LoadStatisticForTag statistic,
+            GlobalColocateStatistic globalColocateStatistic,
+            List<Long> flatBackendsPerBucketSeq) {
         // backend id -> replica num, and sorted by replica num, descending.
         Map<Long, Long> backendToReplicaNum = flatBackendsPerBucketSeq.stream()
                 .collect(Collectors.groupingBy(Function.identity(), 
Collectors.counting()));
@@ -544,20 +974,27 @@ public class ColocateTableCheckerAndBalancer extends 
MasterDaemon {
                     if (!entry1.getValue().equals(entry2.getValue())) {
                         return (int) (entry2.getValue() - entry1.getValue());
                     }
+
+                    // From java 7, sorting needs to satisfy reflexivity, 
transitivity and symmetry.
+                    // Otherwise it will raise exception "Comparison method 
violates its general contract".
+
                     BackendLoadStatistic beStat1 = 
statistic.getBackendLoadStatistic(entry1.getKey());
                     BackendLoadStatistic beStat2 = 
statistic.getBackendLoadStatistic(entry2.getKey());
                     if (beStat1 == null || beStat2 == null) {
-                        return 0;
+                        if (beStat1 == null && beStat2 == null) {
+                            return 0;
+                        } else {
+                            return beStat1 == null ? 1 : -1;
+                        }
                     }
                     double loadScore1 = beStat1.getMixLoadScore();
                     double loadScore2 = beStat2.getMixLoadScore();
-                    if (Math.abs(loadScore1 - loadScore2) < 1e-6) {
-                        return 0;
-                    } else if (loadScore2 > loadScore1) {
-                        return 1;
-                    } else {
-                        return -1;
+                    int cmp = Double.compare(loadScore2, loadScore1);
+                    if (cmp != 0) {
+                        return cmp;
                     }
+
+                    return Long.compare(entry1.getKey(), entry2.getKey());
                 })
                 .collect(Collectors.toList());
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
index 90d136715ce..f2542243a75 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -1142,7 +1142,7 @@ public class TabletScheduler extends MasterDaemon {
         // it will also delete replica from tablet inverted index.
         tabletCtx.deleteReplica(replica);
 
-        if (force) {
+        if (force || FeConstants.runningUnitTest) {
             // send the delete replica task.
             // also, this may not be necessary, but delete it will make things 
simpler.
             // NOTICE: only delete the replica from meta may not work. 
sometimes we can depend on tablet report
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java 
b/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java
index 98b59129362..a05c63b812f 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java
@@ -330,4 +330,48 @@ public class CatalogTestUtil {
         backend.setAlive(true);
         return backend;
     }
+
+    public static long getTabletDataSize(long tabletId) {
+        Env env = Env.getCurrentEnv();
+        TabletInvertedIndex invertedIndex = env.getTabletInvertedIndex();
+        TabletMeta tabletMeta = invertedIndex.getTabletMeta(tabletId);
+        if (tabletMeta == null) {
+            return -1L;
+        }
+
+        long dbId = tabletMeta.getDbId();
+        long tableId = tabletMeta.getTableId();
+        long partitionId = tabletMeta.getPartitionId();
+        long indexId = tabletMeta.getIndexId();
+        Database db = env.getInternalCatalog().getDbNullable(dbId);
+        if (db == null) {
+            return -1L;
+        }
+        Table table = db.getTableNullable(tableId);
+        if (table == null) {
+            return -1L;
+        }
+        if (table.getType() != Table.TableType.OLAP) {
+            return -1L;
+        }
+        OlapTable olapTable = (OlapTable) table;
+        olapTable.readLock();
+        try {
+            Partition partition = olapTable.getPartition(partitionId);
+            if (partition == null) {
+                return -1L;
+            }
+            MaterializedIndex materializedIndex = partition.getIndex(indexId);
+            if (materializedIndex == null) {
+                return -1L;
+            }
+            Tablet tablet = materializedIndex.getTablet(tabletId);
+            if (tablet == null) {
+                return -1L;
+            }
+            return tablet.getDataSize(true);
+        } finally {
+            olapTable.readUnlock();
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/clone/BalanceStatistic.java 
b/fe/fe-core/src/test/java/org/apache/doris/clone/BalanceStatistic.java
new file mode 100644
index 00000000000..596a06f6681
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/BalanceStatistic.java
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.clone;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.system.Backend;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Table;
+
+import java.util.List;
+import java.util.Map;
+
+public class BalanceStatistic {
+    public Map<Long, Long> backendTotalDataSize;
+    public Map<Long, Integer> backendTotalReplicaNum;
+
+    private BalanceStatistic(Map<Long, Long> backendTotalDataSize,
+            Map<Long, Integer> backendTotalReplicaNum) {
+        this.backendTotalDataSize = backendTotalDataSize;
+        this.backendTotalReplicaNum = backendTotalReplicaNum;
+    }
+
+    public static BalanceStatistic getCurrentBalanceStatistic() {
+        Map<Long, Long> backendTotalDataSize = Maps.newHashMap();
+        Map<Long, Integer> backendTotalReplicaNum = Maps.newHashMap();
+        List<Backend> backends = 
Env.getCurrentSystemInfo().getIdToBackend().values().asList();
+        backends.forEach(be -> {
+            backendTotalDataSize.put(be.getId(), 0L);
+            backendTotalReplicaNum.put(be.getId(), 0);
+        });
+
+        Table<Long, Long, Replica> replicaMetaTable =
+                Env.getCurrentInvertedIndex().getReplicaMetaTable();
+        for (Table.Cell<Long, Long, Replica> cell : 
replicaMetaTable.cellSet()) {
+            long beId = cell.getColumnKey();
+            Replica replica = cell.getValue();
+            backendTotalDataSize.put(beId, backendTotalDataSize.get(beId) + 
replica.getDataSize());
+            backendTotalReplicaNum.put(beId, backendTotalReplicaNum.get(beId) 
+ 1);
+        }
+
+        return new BalanceStatistic(backendTotalDataSize, 
backendTotalReplicaNum);
+    }
+
+    public Map<Long, Long> getBackendTotalDataSize() {
+        return backendTotalDataSize;
+    }
+
+    public Map<Long, Integer> getBackendTotalReplicaNum() {
+        return backendTotalReplicaNum;
+    }
+
+    public long getBeMinTotalDataSize() {
+        return backendTotalDataSize.values().stream().min(Long::compare).get();
+    }
+
+    public long getBeMaxTotalDataSize() {
+        return backendTotalDataSize.values().stream().max(Long::compare).get();
+    }
+
+    public int getBeMinTotalReplicaNum() {
+        return 
backendTotalReplicaNum.values().stream().min(Integer::compare).get();
+    }
+
+    public int getBeMaxTotalReplicaNum() {
+        return 
backendTotalReplicaNum.values().stream().max(Integer::compare).get();
+    }
+
+    public void printToStdout() {
+        int minTotalReplicaNum = getBeMinTotalReplicaNum();
+        int maxTotalReplicaNum = getBeMaxTotalReplicaNum();
+        long minTotalDataSize = getBeMinTotalDataSize();
+        long maxTotalDataSize = getBeMaxTotalDataSize();
+
+        System.out.println("");
+        System.out.println("=== backend min total replica num: " + 
minTotalReplicaNum);
+        System.out.println("=== backend max total replica num: " + 
maxTotalReplicaNum);
+        System.out.println("=== max / min : " + (maxTotalReplicaNum / (double) 
minTotalReplicaNum));
+
+        System.out.println("");
+        System.out.println("=== min total data size: " + minTotalDataSize);
+        System.out.println("=== max total data size: " + maxTotalDataSize);
+        System.out.println("=== max / min : " + (maxTotalDataSize / (double) 
minTotalDataSize));
+    }
+}
+
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableCheckerAndBalancerPerfTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableCheckerAndBalancerPerfTest.java
new file mode 100644
index 00000000000..56ac9b192b8
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableCheckerAndBalancerPerfTest.java
@@ -0,0 +1,196 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.clone;
+
+import org.apache.doris.analysis.CreateDbStmt;
+import org.apache.doris.analysis.CreateTableStmt;
+import org.apache.doris.catalog.ColocateTableIndex;
+import org.apache.doris.catalog.ColocateTableIndex.GroupId;
+import org.apache.doris.catalog.DiskInfo;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.DdlExecutor;
+import org.apache.doris.resource.Tag;
+import org.apache.doris.system.Backend;
+import org.apache.doris.utframe.UtFrameUtils;
+
+import com.google.common.collect.Maps;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+
+public class ColocateTableCheckerAndBalancerPerfTest {
+    private static String runningDir = 
"fe/mocked/ColocateTableCheckerAndBalancerPerfTest/"
+            + UUID.randomUUID().toString() + "/";
+
+    private static ConnectContext connectContext;
+    private static final int TEMP_DISALBE_BE_NUM = 2;
+    private static List<Backend> backends;
+
+    @BeforeClass
+    public static void beforeClass() throws Exception {
+        FeConstants.runningUnitTest = true;
+        FeConstants.enableInternalSchemaDb = false;
+        FeConstants.tablet_checker_interval_ms = 100;
+        FeConstants.tablet_schedule_interval_ms = 100;
+        Config.enable_round_robin_create_tablet = false;
+        Config.disable_balance = true;
+        Config.schedule_batch_size = 400;
+        Config.schedule_slot_num_per_hdd_path = 1000;
+        Config.disable_colocate_balance = true;
+        Config.disable_tablet_scheduler = true;
+        UtFrameUtils.createDorisClusterWithMultiTag(runningDir, 6);
+
+        backends = 
Env.getCurrentSystemInfo().getIdToBackend().values().asList();
+        for (Backend be : backends) {
+            for (DiskInfo diskInfo : be.getDisks().values()) {
+                diskInfo.setTotalCapacityB(10L << 40);
+                diskInfo.setDataUsedCapacityB(1L);
+                diskInfo.setAvailableCapacityB(
+                        diskInfo.getTotalCapacityB() - 
diskInfo.getDataUsedCapacityB());
+            }
+        }
+        Map<String, String> tagMap = Maps.newHashMap();
+        tagMap.put(Tag.TYPE_LOCATION, "zone_a");
+        for (int i = 0; i < TEMP_DISALBE_BE_NUM; i++) {
+            backends.get(i).setTagMap(tagMap);
+        }
+
+        // create connect context
+        connectContext = UtFrameUtils.createDefaultCtx();
+    }
+
+    @AfterClass
+    public static void tearDown() {
+        UtFrameUtils.cleanDorisFeDir(runningDir);
+    }
+
+    @Test
+    public void testRelocateAndBalance() throws Exception {
+
+        Env env = Env.getCurrentEnv();
+        String createDbStmtStr = "create database test;";
+        CreateDbStmt createDbStmt = (CreateDbStmt) 
UtFrameUtils.parseAndAnalyzeStmt(createDbStmtStr, connectContext);
+        DdlExecutor.execute(env, createDbStmt);
+
+        Random random = new Random();
+        final int groupNum = 100;
+        for (int groupIndex = 0; groupIndex <= groupNum; groupIndex++) {
+            int tableNum = 1 + random.nextInt(10);
+            for (int tableIndex = 0; tableIndex < tableNum; tableIndex++) {
+                String sql = String.format("CREATE TABLE test.table_%s_%s\n"
+                        + "( k1 int, k2 int, v1 int )\n"
+                        + "ENGINE=OLAP\n"
+                        + "UNIQUE KEY (k1,k2)\n"
+                        + "DISTRIBUTED BY HASH(k2) BUCKETS 11\n"
+                        + "PROPERTIES('colocate_with' = 'group_%s');",
+                        groupIndex, tableIndex, groupIndex);
+                CreateTableStmt createTableStmt = (CreateTableStmt) 
UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext);
+                try {
+                    DdlExecutor.execute(env, createTableStmt);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    throw e;
+                }
+
+                BalanceStatistic beforeBalanceStatistic = 
BalanceStatistic.getCurrentBalanceStatistic();
+                Assert.assertEquals("group: " + groupIndex + ", table: " + 
tableIndex + ", "
+                        + beforeBalanceStatistic.getBackendTotalReplicaNum(),
+                        0, beforeBalanceStatistic.getBeMinTotalReplicaNum());
+            }
+        }
+
+        ColocateTableIndex colocateIndex = env.getColocateTableIndex();
+        Set<GroupId> groupIds = colocateIndex.getAllGroupIds();
+
+        RebalancerTestUtil.updateReplicaDataSize(100L << 10, 10, 10);
+        RebalancerTestUtil.updateReplicaPathHash();
+
+        BalanceStatistic beforeBalanceStatistic = 
BalanceStatistic.getCurrentBalanceStatistic();
+        Assert.assertEquals("" + 
beforeBalanceStatistic.getBackendTotalReplicaNum(),
+                0, beforeBalanceStatistic.getBeMinTotalReplicaNum());
+
+        // all groups stable
+        Thread.sleep(1000);
+        Assert.assertTrue("some groups are unstable",
+                groupIds.stream().noneMatch(groupId -> 
colocateIndex.isGroupUnstable(groupId)));
+
+        // after enable colocate balance and some backends return,  it should 
relocate all groups.
+        // and they will be unstable
+        Map<String, String> tagMap = 
backends.get(TEMP_DISALBE_BE_NUM).getTagMap();
+        for (int i = 0; i < TEMP_DISALBE_BE_NUM; i++) {
+            backends.get(i).setTagMap(tagMap);
+        }
+        Config.disable_colocate_balance = false;
+        Thread.sleep(1000);
+        Assert.assertTrue("some groups are stable",
+                groupIds.stream().allMatch(groupId -> 
colocateIndex.isGroupUnstable(groupId)));
+
+
+        // after enable scheduler, the unstable groups should shed their 
tablets and change to stable
+        Config.disable_tablet_scheduler = false;
+        for (int i = 0; true; i++) {
+            Thread.sleep(1000);
+
+            boolean allStable = groupIds.stream().noneMatch(
+                    groupId -> colocateIndex.isGroupUnstable(groupId));
+
+            if (allStable) {
+                break;
+            }
+
+            Assert.assertTrue("some groups are unstable", i < 60);
+        }
+
+        System.out.println("=== before colocate relocate and balance:");
+        beforeBalanceStatistic.printToStdout();
+        Assert.assertEquals("" + 
beforeBalanceStatistic.getBackendTotalReplicaNum(),
+                0, beforeBalanceStatistic.getBeMinTotalReplicaNum());
+        Assert.assertEquals("" + 
beforeBalanceStatistic.getBackendTotalDataSize(),
+                0, beforeBalanceStatistic.getBeMinTotalDataSize());
+        long beforeDataSizeDiff = 
beforeBalanceStatistic.getBeMaxTotalDataSize()
+                - beforeBalanceStatistic.getBeMinTotalDataSize();
+        int beforeReplicaNumDiff = 
beforeBalanceStatistic.getBeMaxTotalReplicaNum()
+                - beforeBalanceStatistic.getBeMinTotalReplicaNum();
+
+        BalanceStatistic afterBalanceStatistic = 
BalanceStatistic.getCurrentBalanceStatistic();
+        System.out.println("");
+        System.out.println("=== after colocate relocate and balance:");
+        afterBalanceStatistic.printToStdout();
+
+        Assert.assertTrue("" + 
afterBalanceStatistic.getBackendTotalReplicaNum(),
+                afterBalanceStatistic.getBeMinTotalReplicaNum() > 0);
+        Assert.assertTrue("" + afterBalanceStatistic.getBackendTotalDataSize(),
+                afterBalanceStatistic.getBeMinTotalDataSize() > 0);
+        long afterDataSizeDiff = afterBalanceStatistic.getBeMaxTotalDataSize()
+                - afterBalanceStatistic.getBeMinTotalDataSize();
+        int afterReplicaNumDiff = 
afterBalanceStatistic.getBeMaxTotalReplicaNum()
+                - afterBalanceStatistic.getBeMinTotalReplicaNum();
+        Assert.assertTrue(afterDataSizeDiff <= beforeDataSizeDiff);
+        Assert.assertTrue(afterReplicaNumDiff <= beforeReplicaNumDiff);
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableCheckerAndBalancerTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableCheckerAndBalancerTest.java
index b159dee523d..ee33862c955 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableCheckerAndBalancerTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/clone/ColocateTableCheckerAndBalancerTest.java
@@ -24,12 +24,16 @@ import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.PrimitiveType;
 import org.apache.doris.catalog.ReplicaAllocation;
 import org.apache.doris.catalog.TabletInvertedIndex;
+import org.apache.doris.clone.ColocateTableCheckerAndBalancer.BackendBuckets;
+import org.apache.doris.clone.ColocateTableCheckerAndBalancer.BucketStatistic;
+import 
org.apache.doris.clone.ColocateTableCheckerAndBalancer.GlobalColocateStatistic;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.jmockit.Deencapsulation;
 import org.apache.doris.resource.Tag;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.thrift.TStorageMedium;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -106,6 +110,20 @@ public class ColocateTableCheckerAndBalancerTest {
         return colocateTableIndex;
     }
 
+    private GlobalColocateStatistic 
createGlobalColocateStatistic(ColocateTableIndex colocateTableIndex,
+            GroupId groupId) {
+        GlobalColocateStatistic globalColocateStatistic = new 
GlobalColocateStatistic();
+        List<Set<Long>> backendsPerBucketSeq = 
colocateTableIndex.getBackendsPerBucketSeqSet(groupId);
+        List<Long> totalReplicaDataSizes = Lists.newArrayList();
+        for (int i = 0; i < backendsPerBucketSeq.size(); i++) {
+            totalReplicaDataSizes.add(1L);
+        }
+        ReplicaAllocation replicaAlloc = new ReplicaAllocation((short) 
backendsPerBucketSeq.get(0).size());
+        globalColocateStatistic.addGroup(groupId, replicaAlloc, 
backendsPerBucketSeq, totalReplicaDataSizes, 3);
+
+        return globalColocateStatistic;
+    }
+
     @Test
     public void testBalance(@Mocked SystemInfoService infoService,
             @Mocked LoadStatisticForTag statistic) {
@@ -140,7 +158,11 @@ public class ColocateTableCheckerAndBalancerTest {
                 minTimes = 0;
 
                 statistic.getBackendLoadStatistic(anyLong);
-                result = null;
+                result = new Delegate<BackendLoadStatistic>() {
+                    BackendLoadStatistic delegate(Long beId) {
+                        return new FakeBackendLoadStatistic(beId, null, null);
+                    }
+                };
                 minTimes = 0;
             }
         };
@@ -158,24 +180,28 @@ public class ColocateTableCheckerAndBalancerTest {
                 Lists.newArrayList(1L, 2L, 3L, 4L, 1L, 2L, 3L, 4L, 1L, 2L, 3L, 
4L, 1L, 2L, 3L));
         Deencapsulation.setField(colocateTableIndex, "group2Schema", 
group2Schema);
 
+        GlobalColocateStatistic globalColocateStatistic = 
createGlobalColocateStatistic(colocateTableIndex, groupId);
         List<List<Long>> balancedBackendsPerBucketSeq = Lists.newArrayList();
         List<Long> allAvailBackendIds = Lists.newArrayList(1L, 2L, 3L, 4L, 5L, 
6L, 7L, 8L, 9L);
         boolean changed = (Boolean) Deencapsulation.invoke(balancer, 
"relocateAndBalance", groupId,
                 Tag.DEFAULT_BACKEND_TAG, new HashSet<Long>(), 
allAvailBackendIds,
-                colocateTableIndex, infoService, statistic, 
balancedBackendsPerBucketSeq);
+                colocateTableIndex, infoService, statistic, 
globalColocateStatistic,
+                balancedBackendsPerBucketSeq, false);
         List<List<Long>> expected = Lists.partition(
-                Lists.newArrayList(9L, 5L, 3L, 4L, 6L, 8L, 7L, 6L, 1L, 2L, 9L, 
4L, 1L, 2L, 3L), 3);
-        Assert.assertTrue(changed);
+                Lists.newArrayList(8L, 5L, 6L, 5L, 6L, 7L, 9L, 4L, 1L, 2L, 3L, 
4L, 1L, 2L, 3L), 3);
+        Assert.assertTrue("" + globalColocateStatistic, changed);
         Assert.assertEquals(expected, balancedBackendsPerBucketSeq);
 
         // 2. balance a already balanced group
         colocateTableIndex = createColocateIndex(groupId,
                 Lists.newArrayList(9L, 8L, 7L, 8L, 6L, 5L, 9L, 4L, 1L, 2L, 3L, 
4L, 1L, 2L, 3L));
+        globalColocateStatistic = 
createGlobalColocateStatistic(colocateTableIndex, groupId);
         Deencapsulation.setField(colocateTableIndex, "group2Schema", 
group2Schema);
         balancedBackendsPerBucketSeq.clear();
         changed = (Boolean) Deencapsulation.invoke(balancer, 
"relocateAndBalance", groupId,
                 Tag.DEFAULT_BACKEND_TAG, new HashSet<Long>(), 
allAvailBackendIds,
-                colocateTableIndex, infoService, statistic, 
balancedBackendsPerBucketSeq);
+                colocateTableIndex, infoService, statistic, 
globalColocateStatistic,
+                balancedBackendsPerBucketSeq, false);
         System.out.println(balancedBackendsPerBucketSeq);
         Assert.assertFalse(changed);
         Assert.assertTrue(balancedBackendsPerBucketSeq.isEmpty());
@@ -214,7 +240,11 @@ public class ColocateTableCheckerAndBalancerTest {
                 result = backend9;
                 minTimes = 0;
                 statistic.getBackendLoadStatistic(anyLong);
-                result = null;
+                result = new Delegate<BackendLoadStatistic>() {
+                    BackendLoadStatistic delegate(Long beId) {
+                        return new FakeBackendLoadStatistic(beId, null, null);
+                    }
+                };
                 minTimes = 0;
             }
         };
@@ -229,24 +259,28 @@ public class ColocateTableCheckerAndBalancerTest {
         // 1. only one available backend
         // [[7], [7], [7], [7], [7]]
         ColocateTableIndex colocateTableIndex = createColocateIndex(groupId, 
Lists.newArrayList(7L, 7L, 7L, 7L, 7L));
+        GlobalColocateStatistic globalColocateStatistic = 
createGlobalColocateStatistic(colocateTableIndex, groupId);
         Deencapsulation.setField(colocateTableIndex, "group2Schema", 
group2Schema);
 
         List<List<Long>> balancedBackendsPerBucketSeq = Lists.newArrayList();
         List<Long> allAvailBackendIds = Lists.newArrayList(7L);
         boolean changed = Deencapsulation.invoke(balancer, 
"relocateAndBalance", groupId, Tag.DEFAULT_BACKEND_TAG,
-                new HashSet<Long>(), allAvailBackendIds, colocateTableIndex, 
infoService, statistic, balancedBackendsPerBucketSeq);
+                new HashSet<Long>(), allAvailBackendIds, colocateTableIndex, 
infoService, statistic, globalColocateStatistic,
+                balancedBackendsPerBucketSeq, false);
         Assert.assertFalse(changed);
 
         // 2. all backends are checked but this round is not changed
         // [[7], [7], [7], [7], [7]]
         // and add new backends 8, 9 that are on the same host with 7
         colocateTableIndex = createColocateIndex(groupId, 
Lists.newArrayList(7L, 7L, 7L, 7L, 7L));
+        globalColocateStatistic = 
createGlobalColocateStatistic(colocateTableIndex, groupId);
         Deencapsulation.setField(colocateTableIndex, "group2Schema", 
group2Schema);
 
         balancedBackendsPerBucketSeq = Lists.newArrayList();
         allAvailBackendIds = Lists.newArrayList(7L, 8L, 9L);
         changed = Deencapsulation.invoke(balancer, "relocateAndBalance", 
groupId, Tag.DEFAULT_BACKEND_TAG,
-                new HashSet<Long>(), allAvailBackendIds, colocateTableIndex, 
infoService, statistic, balancedBackendsPerBucketSeq);
+                new HashSet<Long>(), allAvailBackendIds, colocateTableIndex, 
infoService, statistic, globalColocateStatistic,
+                balancedBackendsPerBucketSeq, false);
         Assert.assertFalse(changed);
     }
 
@@ -271,13 +305,15 @@ public class ColocateTableCheckerAndBalancerTest {
         group2Schema.put(groupId, groupSchema);
 
         ColocateTableIndex colocateTableIndex = createColocateIndex(groupId, 
Lists.newArrayList(7L, 7L, 7L, 7L, 7L));
+        GlobalColocateStatistic globalColocateStatistic = 
createGlobalColocateStatistic(colocateTableIndex, groupId);
         Deencapsulation.setField(colocateTableIndex, "group2Schema", 
group2Schema);
 
         List<List<Long>> balancedBackendsPerBucketSeq = Lists.newArrayList();
         Set<Long> unAvailBackendIds = Sets.newHashSet(1L, 2L, 3L, 4L, 5L, 6L, 
7L, 8L, 9L);
         List<Long> availBackendIds = Lists.newArrayList();
         boolean changed = (Boolean) Deencapsulation.invoke(balancer, 
"relocateAndBalance", groupId, Tag.DEFAULT_BACKEND_TAG,
-                unAvailBackendIds, availBackendIds, colocateTableIndex, 
infoService, statistic, balancedBackendsPerBucketSeq);
+                unAvailBackendIds, availBackendIds, colocateTableIndex, 
infoService, statistic, globalColocateStatistic,
+                balancedBackendsPerBucketSeq, false);
         Assert.assertFalse(changed);
     }
 
@@ -295,19 +331,20 @@ public class ColocateTableCheckerAndBalancerTest {
             }
         };
 
+        GlobalColocateStatistic globalColocateStatistic = new 
GlobalColocateStatistic();
         // all buckets are on different be
         List<Long> allAvailBackendIds = Lists.newArrayList(1L, 2L, 3L, 4L, 5L, 
6L, 7L, 8L);
         Set<Long> unavailBackendIds = Sets.newHashSet(9L);
         List<Long> flatBackendsPerBucketSeq = Lists.newArrayList(1L, 2L, 3L, 
4L, 5L, 6L, 7L, 8L, 9L);
         List<Map.Entry<Long, Long>> backends = 
Deencapsulation.invoke(balancer, "getSortedBackendReplicaNumPairs",
-                allAvailBackendIds, unavailBackendIds, statistic, 
flatBackendsPerBucketSeq);
+                allAvailBackendIds, unavailBackendIds, statistic, 
globalColocateStatistic, flatBackendsPerBucketSeq);
         long[] backendIds = 
backends.stream().mapToLong(Map.Entry::getKey).toArray();
         Assert.assertArrayEquals(new long[]{7L, 8L, 6L, 2L, 3L, 5L, 4L, 1L}, 
backendIds);
 
         // 0,1 bucket on same be and 5, 6 on same be
         flatBackendsPerBucketSeq = Lists.newArrayList(1L, 1L, 3L, 4L, 5L, 6L, 
7L, 7L, 9L);
         backends = Deencapsulation.invoke(balancer, 
"getSortedBackendReplicaNumPairs", allAvailBackendIds, unavailBackendIds,
-                statistic, flatBackendsPerBucketSeq);
+                statistic, globalColocateStatistic, flatBackendsPerBucketSeq);
         backendIds = backends.stream().mapToLong(Map.Entry::getKey).toArray();
         Assert.assertArrayEquals(new long[]{7L, 1L, 6L, 3L, 5L, 4L, 8L, 2L}, 
backendIds);
     }
@@ -322,6 +359,12 @@ public class ColocateTableCheckerAndBalancerTest {
         public double getMixLoadScore() {
             return mixLoadScores.get(getBeId());
         }
+
+        @Override
+        public BalanceStatus isFit(long tabletSize, TStorageMedium medium, 
List<RootPathLoadStatistic> result,
+                boolean isSupplement) {
+            return BalanceStatus.OK;
+        }
     }
 
     @Test
@@ -599,4 +642,51 @@ public class ColocateTableCheckerAndBalancerTest {
         System.out.println(availableBeIds);
         Assert.assertArrayEquals(new long[]{2L, 4L}, 
availableBeIds.stream().mapToLong(i -> i).sorted().toArray());
     }
+
+    @Test
+    public void testGlobalColocateStatistic() {
+        GroupId groupId1 = new GroupId(1L, 10000L);
+        GroupId groupId2 = new GroupId(2L, 20000L);
+        GlobalColocateStatistic globalColocateStatistic = new 
GlobalColocateStatistic();
+        globalColocateStatistic.addGroup(groupId1, new 
ReplicaAllocation((short) 2),
+                Lists.newArrayList(Sets.newHashSet(1001L, 1002L), 
Sets.newHashSet(1002L, 1003L),
+                        Sets.newHashSet(1001L, 1003L)),
+                Lists.newArrayList(100L, 200L, 300L), 5);
+        globalColocateStatistic.addGroup(groupId2, new 
ReplicaAllocation((short) 1),
+                Lists.newArrayList(Sets.newHashSet(1001L), 
Sets.newHashSet(1002L),
+                        Sets.newHashSet(1003L), Sets.newHashSet(1001L)),
+                Lists.newArrayList(100L, 200L, 300L, 400L), 7);
+
+        Map<Long, BackendBuckets> backendBucketsMap = 
globalColocateStatistic.getBackendBucketsMap();
+        BackendBuckets backendBuckets1 = backendBucketsMap.get(1001L);
+        Assert.assertNotNull(backendBuckets1);
+        Assert.assertEquals(Lists.newArrayList(0, 2),
+                backendBuckets1.getGroupTabletOrderIndices().get(groupId1));
+        Assert.assertEquals(Lists.newArrayList(0, 3),
+                backendBuckets1.getGroupTabletOrderIndices().get(groupId2));
+        BackendBuckets backendBuckets2 = backendBucketsMap.get(1002L);
+        Assert.assertNotNull(backendBuckets2);
+        Assert.assertEquals(Lists.newArrayList(0, 1),
+                backendBuckets2.getGroupTabletOrderIndices().get(groupId1));
+        Assert.assertEquals(Lists.newArrayList(1),
+                backendBuckets2.getGroupTabletOrderIndices().get(groupId2));
+        BackendBuckets backendBuckets3 = backendBucketsMap.get(1003L);
+        Assert.assertNotNull(backendBuckets3);
+        Assert.assertEquals(Lists.newArrayList(1, 2),
+                backendBuckets3.getGroupTabletOrderIndices().get(groupId1));
+        Assert.assertEquals(Lists.newArrayList(2),
+                backendBuckets3.getGroupTabletOrderIndices().get(groupId2));
+
+        Map<GroupId, List<BucketStatistic>> allGroupBucketsMap = 
globalColocateStatistic.getAllGroupBucketsMap();
+        Assert.assertEquals(Lists.newArrayList(new BucketStatistic(0, 5, 
100L), new BucketStatistic(1, 5, 200L),
+                    new BucketStatistic(2, 5, 300L)),
+                allGroupBucketsMap.get(groupId1));
+        Assert.assertEquals(Lists.newArrayList(new BucketStatistic(0, 7, 
100L), new BucketStatistic(1, 7, 200L),
+                    new BucketStatistic(2, 7, 300L), new BucketStatistic(3, 7, 
400L)),
+                allGroupBucketsMap.get(groupId2));
+
+        Map<Tag, Integer> expectAllTagBucketNum = Maps.newHashMap();
+        expectAllTagBucketNum.put(Tag.DEFAULT_BACKEND_TAG, 10);
+        Assert.assertEquals(expectAllTagBucketNum, 
globalColocateStatistic.getAllTagBucketNum());
+    }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java 
b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java
index 95f71d0b51a..da03d42a644 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java
@@ -24,6 +24,7 @@ import org.apache.doris.catalog.MaterializedIndex;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Partition;
 import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.TableIf.TableType;
 import org.apache.doris.catalog.Tablet;
 import org.apache.doris.catalog.TabletInvertedIndex;
 import org.apache.doris.catalog.TabletMeta;
@@ -37,6 +38,7 @@ import com.google.common.collect.Table;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.stream.IntStream;
 
 public class RebalancerTestUtil {
@@ -110,7 +112,8 @@ public class RebalancerTestUtil {
     }
 
     public static void updateReplicaPathHash() {
-        Table<Long, Long, Replica> replicaMetaTable = 
Env.getCurrentInvertedIndex().getReplicaMetaTable();
+        Table<Long, Long, Replica> replicaMetaTable =
+                Env.getCurrentInvertedIndex().getReplicaMetaTable();
         for (Table.Cell<Long, Long, Replica> cell : 
replicaMetaTable.cellSet()) {
             long beId = cell.getColumnKey();
             Backend be = Env.getCurrentSystemInfo().getBackend(beId);
@@ -129,4 +132,47 @@ public class RebalancerTestUtil {
         }
     }
 
+
+    public static void updateReplicaDataSize(long minReplicaSize, int 
tableSkew,  int tabletSkew) {
+        Random random = new Random();
+        tableSkew = Math.max(tableSkew, 1);
+        tabletSkew = Math.max(tabletSkew, 1);
+        Env env = Env.getCurrentEnv();
+        List<Long> dbIds = env.getInternalCatalog().getDbIds();
+        for (Long dbId : dbIds) {
+            Database db = env.getInternalCatalog().getDbNullable(dbId);
+            if (db == null) {
+                continue;
+            }
+
+            if (db.isMysqlCompatibleDatabase()) {
+                continue;
+            }
+
+            for (org.apache.doris.catalog.Table table : db.getTables()) {
+                long tableBaseSize = minReplicaSize * (1 + 
random.nextInt(tableSkew));
+                table.readLock();
+                try {
+                    if (table.getType() != TableType.OLAP) {
+                        continue;
+                    }
+
+                    OlapTable tbl = (OlapTable) table;
+                    for (Partition partition : tbl.getAllPartitions()) {
+                        for (MaterializedIndex idx : 
partition.getMaterializedIndices(
+                                    MaterializedIndex.IndexExtState.VISIBLE)) {
+                            for (Tablet tablet : idx.getTablets()) {
+                                long tabletSize = tableBaseSize * (1 + 
random.nextInt(tabletSkew));
+                                for (Replica replica : tablet.getReplicas()) {
+                                    replica.updateStat(tabletSize, 1000L);
+                                }
+                            }
+                        }
+                    }
+                } finally {
+                    table.readUnlock();
+                }
+            }
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java 
b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
index 3a09cae73bb..bf2a7964b91 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
@@ -17,11 +17,14 @@
 
 package org.apache.doris.utframe;
 
+import org.apache.doris.catalog.CatalogTestUtil;
+import org.apache.doris.catalog.DiskInfo;
 import org.apache.doris.common.ClientPool;
 import org.apache.doris.proto.Data;
 import org.apache.doris.proto.InternalService;
 import org.apache.doris.proto.PBackendServiceGrpc;
 import org.apache.doris.proto.Types;
+import org.apache.doris.system.Backend;
 import org.apache.doris.thrift.BackendService;
 import org.apache.doris.thrift.FrontendService;
 import org.apache.doris.thrift.HeartbeatService;
@@ -35,6 +38,7 @@ import org.apache.doris.thrift.TCancelPlanFragmentResult;
 import org.apache.doris.thrift.TCheckStorageFormatResult;
 import org.apache.doris.thrift.TCloneReq;
 import org.apache.doris.thrift.TDiskTrashInfo;
+import org.apache.doris.thrift.TDropTabletReq;
 import org.apache.doris.thrift.TExecPlanFragmentParams;
 import org.apache.doris.thrift.TExecPlanFragmentResult;
 import org.apache.doris.thrift.TExportState;
@@ -45,6 +49,7 @@ import org.apache.doris.thrift.THeartbeatResult;
 import org.apache.doris.thrift.TIngestBinlogRequest;
 import org.apache.doris.thrift.TIngestBinlogResult;
 import org.apache.doris.thrift.TMasterInfo;
+import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TRoutineLoadTask;
 import org.apache.doris.thrift.TScanBatchResult;
 import org.apache.doris.thrift.TScanCloseParams;
@@ -124,11 +129,16 @@ public class MockedBackendFactory {
     // User can extends this abstract class to create other custom be thrift 
service
     public abstract static class BeThriftService implements 
BackendService.Iface {
         protected MockedBackend backend;
+        protected Backend backendInFe;
 
         public void setBackend(MockedBackend backend) {
             this.backend = backend;
         }
 
+        public void setBackendInFe(Backend backendInFe) {
+            this.backendInFe = backendInFe;
+        }
+
         public abstract void init();
     }
 
@@ -151,11 +161,15 @@ public class MockedBackendFactory {
                 @Override
                 public void run() {
                     while (true) {
+                        boolean ok = false;
+                        FrontendService.Client client = null;
+                        TNetworkAddress address = null;
                         try {
+                            address = backend.getFeAddress();
                             TAgentTaskRequest request = taskQueue.take();
                             System.out.println(
                                     "get agent task request. type: " + 
request.getTaskType() + ", signature: "
-                                    + request.getSignature() + ", fe addr: " + 
backend.getFeAddress());
+                                    + request.getSignature() + ", fe addr: " + 
address);
                             TFinishTaskRequest finishTaskRequest = new 
TFinishTaskRequest(tBackend,
                                     request.getTaskType(), 
request.getSignature(), new TStatus(TStatusCode.OK));
                             TTaskType taskType = request.getTaskType();
@@ -164,35 +178,79 @@ public class MockedBackendFactory {
                                 case ALTER:
                                     ++reportVersion;
                                     break;
+                                case DROP:
+                                    handleDropTablet(request, 
finishTaskRequest);
+                                    break;
                                 case CLONE:
-                                    handleClone(request, finishTaskRequest);
+                                    handleCloneTablet(request, 
finishTaskRequest);
                                     break;
                                 default:
                                     break;
                             }
                             finishTaskRequest.setReportVersion(reportVersion);
 
-                            FrontendService.Client client =
-                                    
ClientPool.frontendPool.borrowObject(backend.getFeAddress(), 2000);
-                            System.out.println("get fe " + 
backend.getFeAddress() + " client: " + client);
+                            client = 
ClientPool.frontendPool.borrowObject(address, 2000);
                             client.finishTask(finishTaskRequest);
+                            ok = true;
                         } catch (Exception e) {
                             e.printStackTrace();
+                        }  finally {
+                            if (ok) {
+                                ClientPool.frontendPool.returnObject(address, 
client);
+                            } else {
+                                
ClientPool.frontendPool.invalidateObject(address, client);
+                            }
                         }
                     }
                 }
 
-                private void handleClone(TAgentTaskRequest request, 
TFinishTaskRequest finishTaskRequest) {
+                private void handleDropTablet(TAgentTaskRequest request, 
TFinishTaskRequest finishTaskRequest) {
+                    TDropTabletReq req = request.getDropTabletReq();
+                    long dataSize = Math.max(1, 
CatalogTestUtil.getTabletDataSize(req.tablet_id));
+                    DiskInfo diskInfo = getDisk(-1);
+                    if (diskInfo != null) {
+                        diskInfo.setDataUsedCapacityB(Math.max(0L,
+                                    diskInfo.getDataUsedCapacityB() - 
dataSize));
+                        
diskInfo.setAvailableCapacityB(Math.min(diskInfo.getTotalCapacityB(),
+                                    diskInfo.getAvailableCapacityB() + 
dataSize));
+                    }
+                }
+
+                private void handleCloneTablet(TAgentTaskRequest request, 
TFinishTaskRequest finishTaskRequest) {
                     TCloneReq req = request.getCloneReq();
+                    long dataSize = Math.max(1, 
CatalogTestUtil.getTabletDataSize(req.tablet_id));
+                    long pathHash = req.dest_path_hash;
+                    DiskInfo diskInfo = getDisk(pathHash);
+                    if (diskInfo != null) {
+                        pathHash = diskInfo.getPathHash();
+                        
diskInfo.setDataUsedCapacityB(Math.min(diskInfo.getTotalCapacityB(),
+                                    diskInfo.getDataUsedCapacityB() + 
dataSize));
+                        diskInfo.setAvailableCapacityB(Math.max(0L,
+                                    diskInfo.getAvailableCapacityB() - 
dataSize));
+                    }
+
                     List<TTabletInfo> tabletInfos = Lists.newArrayList();
                     TTabletInfo tabletInfo = new TTabletInfo(req.tablet_id, 
req.schema_hash, req.committed_version,
-                            req.committed_version_hash, 1, 1);
+                            req.committed_version_hash, 1, dataSize);
                     tabletInfo.setStorageMedium(req.storage_medium);
-                    tabletInfo.setPathHash(req.dest_path_hash);
+                    tabletInfo.setPathHash(pathHash);
                     tabletInfo.setUsed(true);
                     tabletInfos.add(tabletInfo);
                     finishTaskRequest.setFinishTabletInfos(tabletInfos);
                 }
+
+                private DiskInfo getDisk(long pathHash) {
+                    DiskInfo diskInfo = null;
+                    for (DiskInfo tmpDiskInfo : 
backendInFe.getDisks().values()) {
+                        diskInfo = tmpDiskInfo;
+                        if (diskInfo.getPathHash() == pathHash
+                                || pathHash == -1L || pathHash == 0) {
+                            break;
+                        }
+                    }
+
+                    return diskInfo;
+                }
             }).start();
         }
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java 
b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
index 5edf5917a39..9a86b264a62 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
@@ -453,9 +453,10 @@ public abstract class TestWithFeService {
         int beHttpPort = findValidPort();
 
         // start be
+        MockedBackendFactory.BeThriftService beThriftService = new 
DefaultBeThriftServiceImpl();
         MockedBackend backend = MockedBackendFactory.createBackend(beHost, 
beHeartbeatPort, beThriftPort, beBrpcPort,
                 beHttpPort, new DefaultHeartbeatServiceImpl(beThriftPort, 
beHttpPort, beBrpcPort),
-                new DefaultBeThriftServiceImpl(), new 
DefaultPBackendServiceImpl());
+                beThriftService, new DefaultPBackendServiceImpl());
         backend.setFeAddress(new TNetworkAddress("127.0.0.1", feRpcPort));
         backend.start();
 
@@ -466,6 +467,7 @@ public abstract class TestWithFeService {
         diskInfo1.setTotalCapacityB(10L << 30);
         diskInfo1.setAvailableCapacityB(5L << 30);
         diskInfo1.setDataUsedCapacityB(480000);
+        diskInfo1.setPathHash(be.getId());
         Map<String, DiskInfo> disks = Maps.newHashMap();
         disks.put(diskInfo1.getRootPath(), diskInfo1);
         be.setDisks(ImmutableMap.copyOf(disks));
@@ -473,6 +475,7 @@ public abstract class TestWithFeService {
         be.setBePort(beThriftPort);
         be.setHttpPort(beHttpPort);
         be.setBrpcPort(beBrpcPort);
+        beThriftService.setBackendInFe(be);
         Env.getCurrentSystemInfo().addBackend(be);
         return be;
     }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java 
b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java
index 9f24461dbfc..a5475a3e5aa 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java
@@ -280,9 +280,10 @@ public class UtFrameUtils {
         int beHttpPort = findValidPort();
 
         // start be
+        MockedBackendFactory.BeThriftService beThriftService = new 
DefaultBeThriftServiceImpl();
         MockedBackend backend = MockedBackendFactory.createBackend(beHost, 
beHeartbeatPort, beThriftPort, beBrpcPort,
                 beHttpPort, new DefaultHeartbeatServiceImpl(beThriftPort, 
beHttpPort, beBrpcPort),
-                new DefaultBeThriftServiceImpl(), new 
DefaultPBackendServiceImpl());
+                beThriftService, new DefaultPBackendServiceImpl());
         backend.setFeAddress(new TNetworkAddress("127.0.0.1", feRpcPort));
         backend.start();
 
@@ -293,13 +294,16 @@ public class UtFrameUtils {
         diskInfo1.setTotalCapacityB(10L << 30);
         diskInfo1.setAvailableCapacityB(5L << 30);
         diskInfo1.setDataUsedCapacityB(480000);
+        diskInfo1.setPathHash(be.getId());
         disks.put(diskInfo1.getRootPath(), diskInfo1);
         be.setDisks(ImmutableMap.copyOf(disks));
         be.setAlive(true);
         be.setBePort(beThriftPort);
         be.setHttpPort(beHttpPort);
         be.setBrpcPort(beBrpcPort);
+        beThriftService.setBackendInFe(be);
         Env.getCurrentSystemInfo().addBackend(be);
+
         return be;
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to