This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 7aec6fc42b5 [fix](auto bucket) fix auto buckets calc using the first k 
partition #41675 (#41820)
7aec6fc42b5 is described below

commit 7aec6fc42b5d3d9c5ff4db98cd61cc315d7bea0f
Author: yujun <[email protected]>
AuthorDate: Tue Oct 15 10:26:46 2024 +0800

    [fix](auto bucket) fix auto buckets calc using the first k partition #41675 
(#41820)
    
    cherry pick from #41675
---
 .../org/apache/doris/clone/BeLoadRebalancer.java   | 10 ++++-
 .../doris/clone/DynamicPartitionScheduler.java     | 15 +++----
 .../doris/catalog/DynamicPartitionTableTest.java   | 49 +++++++++++++++++++---
 .../apache/doris/utframe/TestWithFeService.java    |  4 +-
 .../org/apache/doris/utframe/UtFrameUtils.java     |  4 +-
 5 files changed, 63 insertions(+), 19 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
index 78452000ca5..d0daa484913 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
@@ -113,7 +113,8 @@ public class BeLoadRebalancer extends Rebalancer {
                 numOfLowPaths += pathSlot.getTotalAvailBalanceSlotNum();
             }
         }
-        LOG.info("get number of low load paths: {}, with medium: {}", 
numOfLowPaths, medium);
+        LOG.info("get number of low load paths: {}, with medium: {}, tag: {}, 
isUrgent {}",
+                numOfLowPaths, medium, clusterStat.getTag(), isUrgent);
 
         List<String> alternativeTabletInfos = Lists.newArrayList();
         int clusterAvailableBEnum = infoService.getAllBackendIds(true).size();
@@ -121,6 +122,8 @@ public class BeLoadRebalancer extends Rebalancer {
                 .map(beStat -> 
Sets.newHashSet(invertedIndex.getTabletIdsByBackendId(beStat.getBeId())))
                 .collect(Collectors.toList());
 
+        boolean hasCandidateTablet = false;
+
         // choose tablets from high load backends.
         // BackendLoadStatistic is sorted by load score in ascend order,
         // so we need to traverse it from last to first
@@ -222,6 +225,8 @@ public class BeLoadRebalancer extends Rebalancer {
                         continue;
                     }
 
+                    hasCandidateTablet = true;
+
                     // for urgent disk, pick tablets order by size,
                     // then it may always pick tablets that was on the low 
backends.
                     if (!lowBETablets.isEmpty()
@@ -270,6 +275,9 @@ public class BeLoadRebalancer extends Rebalancer {
         if (!alternativeTablets.isEmpty()) {
             LOG.info("select alternative tablets, medium: {}, is urgent: {}, 
num: {}, detail: {}",
                     medium, isUrgent, alternativeTablets.size(), 
alternativeTabletInfos);
+        } else if (isUrgent && !hasCandidateTablet) {
+            LOG.info("urgent balance cann't found candidate tablets. medium: 
{}, tag: {}",
+                    medium, clusterStat.getTag());
         }
         return alternativeTablets;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
index 74f62bddb2d..1b00f041964 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
@@ -165,22 +165,19 @@ public class DynamicPartitionScheduler extends 
MasterDaemon {
             return historyPartitionsSize.get(0);
         }
 
-        int size = historyPartitionsSize.size() > 7 ? 7 : 
historyPartitionsSize.size();
-
         boolean isAscending = true;
-        for (int i = 1; i < size; i++) {
-            if (historyPartitionsSize.get(i) < historyPartitionsSize.get(i - 
1)) {
+        ArrayList<Long> ascendingDeltaSize = new ArrayList<Long>();
+        for (int i = Math.max(1, historyPartitionsSize.size() - 7); i < 
historyPartitionsSize.size(); i++) {
+            long delta = historyPartitionsSize.get(i) - 
historyPartitionsSize.get(i - 1);
+            if (delta < 0) {
                 isAscending = false;
                 break;
             }
+            ascendingDeltaSize.add(delta);
         }
 
         if (isAscending) {
-            ArrayList<Long> historyDeltaSize = Lists.newArrayList();
-            for (int i = 1; i < size; i++) {
-                historyDeltaSize.add(historyPartitionsSize.get(i) - 
historyPartitionsSize.get(i - 1));
-            }
-            return historyPartitionsSize.get(size - 1) + ema(historyDeltaSize, 
7);
+            return historyPartitionsSize.get(historyPartitionsSize.size() - 1) 
+ ema(ascendingDeltaSize, 7);
         } else {
             return ema(historyPartitionsSize, 7);
         }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java
index 419707f7cee..23adb61eee3 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java
@@ -1735,6 +1735,8 @@ public class DynamicPartitionTableTest {
                 + " PROPERTIES (\n"
                 + " \"dynamic_partition.enable\" = \"true\",\n"
                 + " \"dynamic_partition.time_unit\" = \"YEAR\",\n"
+                + " \"dynamic_partition.start\" = \"-50\",\n"
+                + " \"dynamic_partition.create_history_partition\" = 
\"true\",\n"
                 + " \"dynamic_partition.end\" = \"1\",\n"
                 + " \"dynamic_partition.prefix\" = \"p\",\n"
                 + " \"replication_allocation\" = \"tag.location.default: 1\"\n"
@@ -1743,22 +1745,59 @@ public class DynamicPartitionTableTest {
         Database db = 
Env.getCurrentInternalCatalog().getDbOrAnalysisException("test");
         OlapTable table = (OlapTable) 
db.getTableOrAnalysisException("test_autobucket_dynamic_partition");
         List<Partition> partitions = 
Lists.newArrayList(table.getAllPartitions());
-        Assert.assertEquals(2, partitions.size());
+        Assert.assertEquals(52, partitions.size());
         for (Partition partition : partitions) {
             Assert.assertEquals(FeConstants.default_bucket_num, 
partition.getDistributionInfo().getBucketNum());
             partition.setVisibleVersionAndTime(2L, System.currentTimeMillis());
         }
         RebalancerTestUtil.updateReplicaDataSize(1, 1, 1);
 
-        String alterStmt =
+        String alterStmt1 =
                 "alter table test.test_autobucket_dynamic_partition set 
('dynamic_partition.end' = '2')";
-        ExceptionChecker.expectThrowsNoException(() -> alterTable(alterStmt));
+        ExceptionChecker.expectThrowsNoException(() -> alterTable(alterStmt1));
         List<Pair<Long, Long>> tempDynamicPartitionTableInfo = 
Lists.newArrayList(Pair.of(db.getId(), table.getId()));
         
Env.getCurrentEnv().getDynamicPartitionScheduler().executeDynamicPartition(tempDynamicPartitionTableInfo,
 false);
 
         partitions = Lists.newArrayList(table.getAllPartitions());
         partitions.sort(Comparator.comparing(Partition::getId));
-        Assert.assertEquals(3, partitions.size());
-        Assert.assertEquals(1, 
partitions.get(2).getDistributionInfo().getBucketNum());
+        Assert.assertEquals(53, partitions.size());
+        Assert.assertEquals(1, partitions.get(partitions.size() - 
1).getDistributionInfo().getBucketNum());
+
+        table.readLock();
+        try {
+            // first 40 partitions with size 0,  then 13 partitions with size 
100GB(10GB * 10 buckets)
+            for (int i = 0; i < 52; i++) {
+                Partition partition = partitions.get(i);
+                partition.updateVisibleVersion(2L);
+                for (MaterializedIndex idx : partition.getMaterializedIndices(
+                        MaterializedIndex.IndexExtState.VISIBLE)) {
+                    Assert.assertEquals(10, idx.getTablets().size());
+                    for (Tablet tablet : idx.getTablets()) {
+                        for (Replica replica : tablet.getReplicas()) {
+                            replica.updateVersion(2L);
+                            replica.setDataSize(i < 40 ? 0L : 10L << 30);
+                            replica.setRowCount(1000L);
+                        }
+                    }
+                }
+                if (i >= 40) {
+                    // first 52 partitions are 10 
buckets(FeConstants.default_bucket_num)
+                    Assert.assertEquals(10 * (10L << 30), 
partition.getAllDataSize(true));
+                }
+            }
+        } finally {
+            table.readUnlock();
+        }
+
+        String alterStmt2 =
+                "alter table test.test_autobucket_dynamic_partition set 
('dynamic_partition.end' = '3')";
+        ExceptionChecker.expectThrowsNoException(() -> alterTable(alterStmt2));
+        
Env.getCurrentEnv().getDynamicPartitionScheduler().executeDynamicPartition(tempDynamicPartitionTableInfo,
 false);
+
+        partitions = Lists.newArrayList(table.getAllPartitions());
+        partitions.sort(Comparator.comparing(Partition::getId));
+        Assert.assertEquals(54, partitions.size());
+        // 100GB total, 1GB per bucket, should 100 buckets.
+        Assert.assertEquals(100, partitions.get(partitions.size() - 
1).getDistributionInfo().getBucketNum());
     }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java 
b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
index 510b2ba00eb..aa5ebf83292 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
@@ -517,8 +517,8 @@ public abstract class TestWithFeService {
         Backend be = new Backend(Env.getCurrentEnv().getNextId(), 
backend.getHost(), backend.getHeartbeatPort());
         DiskInfo diskInfo1 = new DiskInfo("/path" + be.getId());
         diskInfo1.setPathHash(be.getId());
-        diskInfo1.setTotalCapacityB(10L << 30);
-        diskInfo1.setAvailableCapacityB(5L << 30);
+        diskInfo1.setTotalCapacityB(10L << 40);
+        diskInfo1.setAvailableCapacityB(5L << 40);
         diskInfo1.setDataUsedCapacityB(480000);
         diskInfo1.setPathHash(be.getId());
         Map<String, DiskInfo> disks = Maps.newHashMap();
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java 
b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java
index a253efcc29b..f76ad145e39 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java
@@ -296,8 +296,8 @@ public class UtFrameUtils {
         Backend be = new Backend(Env.getCurrentEnv().getNextId(), 
backend.getHost(), backend.getHeartbeatPort());
         Map<String, DiskInfo> disks = Maps.newHashMap();
         DiskInfo diskInfo1 = new DiskInfo("/path" + be.getId());
-        diskInfo1.setTotalCapacityB(10L << 30);
-        diskInfo1.setAvailableCapacityB(5L << 30);
+        diskInfo1.setTotalCapacityB(10L << 40);
+        diskInfo1.setAvailableCapacityB(5L << 40);
         diskInfo1.setDataUsedCapacityB(480000);
         diskInfo1.setPathHash(be.getId());
         disks.put(diskInfo1.getRootPath(), diskInfo1);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to