This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 7aec6fc42b5 [fix](auto bucket) fix auto buckets calc using the first k
partition #41675 (#41820)
7aec6fc42b5 is described below
commit 7aec6fc42b5d3d9c5ff4db98cd61cc315d7bea0f
Author: yujun <[email protected]>
AuthorDate: Tue Oct 15 10:26:46 2024 +0800
[fix](auto bucket) fix auto buckets calc using the first k partition #41675
(#41820)
cherry pick from #41675
---
.../org/apache/doris/clone/BeLoadRebalancer.java | 10 ++++-
.../doris/clone/DynamicPartitionScheduler.java | 15 +++----
.../doris/catalog/DynamicPartitionTableTest.java | 49 +++++++++++++++++++---
.../apache/doris/utframe/TestWithFeService.java | 4 +-
.../org/apache/doris/utframe/UtFrameUtils.java | 4 +-
5 files changed, 63 insertions(+), 19 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
index 78452000ca5..d0daa484913 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/BeLoadRebalancer.java
@@ -113,7 +113,8 @@ public class BeLoadRebalancer extends Rebalancer {
numOfLowPaths += pathSlot.getTotalAvailBalanceSlotNum();
}
}
- LOG.info("get number of low load paths: {}, with medium: {}",
numOfLowPaths, medium);
+ LOG.info("get number of low load paths: {}, with medium: {}, tag: {},
isUrgent {}",
+ numOfLowPaths, medium, clusterStat.getTag(), isUrgent);
List<String> alternativeTabletInfos = Lists.newArrayList();
int clusterAvailableBEnum = infoService.getAllBackendIds(true).size();
@@ -121,6 +122,8 @@ public class BeLoadRebalancer extends Rebalancer {
.map(beStat ->
Sets.newHashSet(invertedIndex.getTabletIdsByBackendId(beStat.getBeId())))
.collect(Collectors.toList());
+ boolean hasCandidateTablet = false;
+
// choose tablets from high load backends.
// BackendLoadStatistic is sorted by load score in ascend order,
// so we need to traverse it from last to first
@@ -222,6 +225,8 @@ public class BeLoadRebalancer extends Rebalancer {
continue;
}
+ hasCandidateTablet = true;
+
// for urgent disk, pick tablets order by size,
// then it may always pick tablets that was on the low
backends.
if (!lowBETablets.isEmpty()
@@ -270,6 +275,9 @@ public class BeLoadRebalancer extends Rebalancer {
if (!alternativeTablets.isEmpty()) {
LOG.info("select alternative tablets, medium: {}, is urgent: {},
num: {}, detail: {}",
medium, isUrgent, alternativeTablets.size(),
alternativeTabletInfos);
+ } else if (isUrgent && !hasCandidateTablet) {
+ LOG.info("urgent balance cann't found candidate tablets. medium:
{}, tag: {}",
+ medium, clusterStat.getTag());
}
return alternativeTablets;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
index 74f62bddb2d..1b00f041964 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
@@ -165,22 +165,19 @@ public class DynamicPartitionScheduler extends
MasterDaemon {
return historyPartitionsSize.get(0);
}
- int size = historyPartitionsSize.size() > 7 ? 7 :
historyPartitionsSize.size();
-
boolean isAscending = true;
- for (int i = 1; i < size; i++) {
- if (historyPartitionsSize.get(i) < historyPartitionsSize.get(i -
1)) {
+ ArrayList<Long> ascendingDeltaSize = new ArrayList<Long>();
+ for (int i = Math.max(1, historyPartitionsSize.size() - 7); i <
historyPartitionsSize.size(); i++) {
+ long delta = historyPartitionsSize.get(i) -
historyPartitionsSize.get(i - 1);
+ if (delta < 0) {
isAscending = false;
break;
}
+ ascendingDeltaSize.add(delta);
}
if (isAscending) {
- ArrayList<Long> historyDeltaSize = Lists.newArrayList();
- for (int i = 1; i < size; i++) {
- historyDeltaSize.add(historyPartitionsSize.get(i) -
historyPartitionsSize.get(i - 1));
- }
- return historyPartitionsSize.get(size - 1) + ema(historyDeltaSize,
7);
+ return historyPartitionsSize.get(historyPartitionsSize.size() - 1)
+ ema(ascendingDeltaSize, 7);
} else {
return ema(historyPartitionsSize, 7);
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java
b/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java
index 419707f7cee..23adb61eee3 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java
@@ -1735,6 +1735,8 @@ public class DynamicPartitionTableTest {
+ " PROPERTIES (\n"
+ " \"dynamic_partition.enable\" = \"true\",\n"
+ " \"dynamic_partition.time_unit\" = \"YEAR\",\n"
+ + " \"dynamic_partition.start\" = \"-50\",\n"
+ + " \"dynamic_partition.create_history_partition\" =
\"true\",\n"
+ " \"dynamic_partition.end\" = \"1\",\n"
+ " \"dynamic_partition.prefix\" = \"p\",\n"
+ " \"replication_allocation\" = \"tag.location.default: 1\"\n"
@@ -1743,22 +1745,59 @@ public class DynamicPartitionTableTest {
Database db =
Env.getCurrentInternalCatalog().getDbOrAnalysisException("test");
OlapTable table = (OlapTable)
db.getTableOrAnalysisException("test_autobucket_dynamic_partition");
List<Partition> partitions =
Lists.newArrayList(table.getAllPartitions());
- Assert.assertEquals(2, partitions.size());
+ Assert.assertEquals(52, partitions.size());
for (Partition partition : partitions) {
Assert.assertEquals(FeConstants.default_bucket_num,
partition.getDistributionInfo().getBucketNum());
partition.setVisibleVersionAndTime(2L, System.currentTimeMillis());
}
RebalancerTestUtil.updateReplicaDataSize(1, 1, 1);
- String alterStmt =
+ String alterStmt1 =
"alter table test.test_autobucket_dynamic_partition set
('dynamic_partition.end' = '2')";
- ExceptionChecker.expectThrowsNoException(() -> alterTable(alterStmt));
+ ExceptionChecker.expectThrowsNoException(() -> alterTable(alterStmt1));
List<Pair<Long, Long>> tempDynamicPartitionTableInfo =
Lists.newArrayList(Pair.of(db.getId(), table.getId()));
Env.getCurrentEnv().getDynamicPartitionScheduler().executeDynamicPartition(tempDynamicPartitionTableInfo,
false);
partitions = Lists.newArrayList(table.getAllPartitions());
partitions.sort(Comparator.comparing(Partition::getId));
- Assert.assertEquals(3, partitions.size());
- Assert.assertEquals(1,
partitions.get(2).getDistributionInfo().getBucketNum());
+ Assert.assertEquals(53, partitions.size());
+ Assert.assertEquals(1, partitions.get(partitions.size() -
1).getDistributionInfo().getBucketNum());
+
+ table.readLock();
+ try {
+ // first 40 partitions with size 0, then 13 partitions with size
100GB(10GB * 10 buckets)
+ for (int i = 0; i < 52; i++) {
+ Partition partition = partitions.get(i);
+ partition.updateVisibleVersion(2L);
+ for (MaterializedIndex idx : partition.getMaterializedIndices(
+ MaterializedIndex.IndexExtState.VISIBLE)) {
+ Assert.assertEquals(10, idx.getTablets().size());
+ for (Tablet tablet : idx.getTablets()) {
+ for (Replica replica : tablet.getReplicas()) {
+ replica.updateVersion(2L);
+ replica.setDataSize(i < 40 ? 0L : 10L << 30);
+ replica.setRowCount(1000L);
+ }
+ }
+ }
+ if (i >= 40) {
+ // first 52 partitions are 10
buckets(FeConstants.default_bucket_num)
+ Assert.assertEquals(10 * (10L << 30),
partition.getAllDataSize(true));
+ }
+ }
+ } finally {
+ table.readUnlock();
+ }
+
+ String alterStmt2 =
+ "alter table test.test_autobucket_dynamic_partition set
('dynamic_partition.end' = '3')";
+ ExceptionChecker.expectThrowsNoException(() -> alterTable(alterStmt2));
+
Env.getCurrentEnv().getDynamicPartitionScheduler().executeDynamicPartition(tempDynamicPartitionTableInfo,
false);
+
+ partitions = Lists.newArrayList(table.getAllPartitions());
+ partitions.sort(Comparator.comparing(Partition::getId));
+ Assert.assertEquals(54, partitions.size());
+ // 100GB total, 1GB per bucket, should 100 buckets.
+ Assert.assertEquals(100, partitions.get(partitions.size() -
1).getDistributionInfo().getBucketNum());
}
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
index 510b2ba00eb..aa5ebf83292 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
@@ -517,8 +517,8 @@ public abstract class TestWithFeService {
Backend be = new Backend(Env.getCurrentEnv().getNextId(),
backend.getHost(), backend.getHeartbeatPort());
DiskInfo diskInfo1 = new DiskInfo("/path" + be.getId());
diskInfo1.setPathHash(be.getId());
- diskInfo1.setTotalCapacityB(10L << 30);
- diskInfo1.setAvailableCapacityB(5L << 30);
+ diskInfo1.setTotalCapacityB(10L << 40);
+ diskInfo1.setAvailableCapacityB(5L << 40);
diskInfo1.setDataUsedCapacityB(480000);
diskInfo1.setPathHash(be.getId());
Map<String, DiskInfo> disks = Maps.newHashMap();
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java
b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java
index a253efcc29b..f76ad145e39 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java
@@ -296,8 +296,8 @@ public class UtFrameUtils {
Backend be = new Backend(Env.getCurrentEnv().getNextId(),
backend.getHost(), backend.getHeartbeatPort());
Map<String, DiskInfo> disks = Maps.newHashMap();
DiskInfo diskInfo1 = new DiskInfo("/path" + be.getId());
- diskInfo1.setTotalCapacityB(10L << 30);
- diskInfo1.setAvailableCapacityB(5L << 30);
+ diskInfo1.setTotalCapacityB(10L << 40);
+ diskInfo1.setAvailableCapacityB(5L << 40);
diskInfo1.setDataUsedCapacityB(480000);
diskInfo1.setPathHash(be.getId());
disks.put(diskInfo1.getRootPath(), diskInfo1);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]