This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 474295cf315 [chore](autobucket) add autobucket test and log #36874
(#36907)
474295cf315 is described below
commit 474295cf31513775fc08bde748b07d2f8cef691a
Author: yujun <[email protected]>
AuthorDate: Thu Jun 27 22:30:13 2024 +0800
[chore](autobucket) add autobucket test and log #36874 (#36907)
cherry pick from #36874
---
.../doris/clone/DynamicPartitionScheduler.java | 47 +++++++++++++++-------
.../doris/catalog/DynamicPartitionTableTest.java | 38 +++++++++++++++++
2 files changed, 70 insertions(+), 15 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
index 8c5f4f669c5..38d68c320ec 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
@@ -71,6 +71,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
/**
* This class is used to periodically add or drop partition on an olapTable
which specify dynamic partition properties
@@ -186,7 +187,7 @@ public class DynamicPartitionScheduler extends MasterDaemon
{
}
private static int getBucketsNum(DynamicPartitionProperty property,
OlapTable table,
- String nowPartitionName, boolean executeFirstTime) {
+ String partitionName, String nowPartitionName, boolean
executeFirstTime) {
// if execute first time, all partitions no contain data
if (!table.isAutoBucket() || executeFirstTime) {
return property.getBuckets();
@@ -194,27 +195,41 @@ public class DynamicPartitionScheduler extends
MasterDaemon {
// auto bucket
// get all history partitions
- ArrayList<Long> partitionSizeArray = Lists.newArrayList();
RangePartitionInfo info = (RangePartitionInfo)
(table.getPartitionInfo());
List<Map.Entry<Long, PartitionItem>> idToItems = new
ArrayList<>(info.getIdToItem(false).entrySet());
idToItems.sort(Comparator.comparing(o -> ((RangePartitionItem)
o.getValue()).getItems().upperEndpoint()));
- for (Map.Entry<Long, PartitionItem> idToItem : idToItems) {
- Partition partition = table.getPartition(idToItem.getKey());
- // exclude current partition because its data isn't enough one
week/day/hour.
- if (partition != null &&
!partition.getName().equals(nowPartitionName)
- && partition.getVisibleVersion() >= 2) {
- partitionSizeArray.add(partition.getAllDataSize(true));
- }
- }
+ List<Partition> partitions = idToItems.stream()
+ .map(entry -> table.getPartition(entry.getKey()))
+ .filter(partition -> partition != null &&
!partition.getName().equals(nowPartitionName)
+ && partition.getVisibleVersion() >= 2)
+ .collect(Collectors.toList());
// no exist history partition data
- if (partitionSizeArray.isEmpty()) {
+ if (partitions.isEmpty()) {
+ LOG.info("autobucket use property's buckets due to all partitions
no data, table: [{}-{}], "
+ + "partition: {}, buckets num: {}",
+ table.getName(), table.getId(), partitionName,
property.getBuckets());
return property.getBuckets();
}
+ ArrayList<Long> partitionSizeArray = partitions.stream()
+ .map(partition -> partition.getAllDataSize(true))
+ .collect(Collectors.toCollection(ArrayList::new));
+ long estimatePartitionSize = getNextPartitionSize(partitionSizeArray);
// plus 5 for uncompressed data
- long uncompressedPartitionSize =
getNextPartitionSize(partitionSizeArray) * 5;
- return AutoBucketUtils.getBucketsNum(uncompressedPartitionSize,
Config.autobucket_min_buckets);
+ long uncompressedPartitionSize = estimatePartitionSize * 5;
+ int bucketsNum =
AutoBucketUtils.getBucketsNum(uncompressedPartitionSize,
Config.autobucket_min_buckets);
+ LOG.info("autobucket calc with {} history partitions, table: [{}-{}],
partition: {}, buckets num: {}, "
+ + " estimate partition size: {}, last partitions(partition
name, local size, remote size): {}",
+ partitions.size(), table.getName(), table.getId(),
partitionName, bucketsNum,
+ estimatePartitionSize,
+ partitions.stream()
+ .skip(Math.max(0, partitions.size() - 7))
+ .map(partition -> "(" + partition.getName() + ", " +
partition.getDataSize(true)
+ + ", " + partition.getRemoteDataSize() + ")")
+ .collect(Collectors.toList()));
+
+ return bucketsNum;
}
private ArrayList<AddPartitionClause> getAddPartitionClause(Database db,
OlapTable olapTable,
@@ -320,7 +335,8 @@ public class DynamicPartitionScheduler extends MasterDaemon
{
DistributionDesc distributionDesc = null;
DistributionInfo distributionInfo =
olapTable.getDefaultDistributionInfo();
- int bucketsNum = getBucketsNum(dynamicPartitionProperty,
olapTable, nowPartitionName, executeFirstTime);
+ int bucketsNum = getBucketsNum(dynamicPartitionProperty,
olapTable, partitionName,
+ nowPartitionName, executeFirstTime);
if (distributionInfo.getType() ==
DistributionInfo.DistributionInfoType.HASH) {
HashDistributionInfo hashDistributionInfo =
(HashDistributionInfo) distributionInfo;
List<String> distColumnNames = new ArrayList<>();
@@ -488,7 +504,8 @@ public class DynamicPartitionScheduler extends MasterDaemon
{
return dropPartitionClauses;
}
- private void executeDynamicPartition(Collection<Pair<Long, Long>>
dynamicPartitionTableInfoCol,
+ // make public just for fe ut
+ public void executeDynamicPartition(Collection<Pair<Long, Long>>
dynamicPartitionTableInfoCol,
boolean executeFirstTime) throws DdlException {
Iterator<Pair<Long, Long>> iterator =
dynamicPartitionTableInfoCol.iterator();
while (iterator.hasNext()) {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java
b/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java
index d457be0324f..c2f13837329 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java
@@ -22,11 +22,13 @@ import org.apache.doris.analysis.CreateDbStmt;
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
import org.apache.doris.clone.DynamicPartitionScheduler;
+import org.apache.doris.clone.RebalancerTestUtil;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ExceptionChecker;
import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.Pair;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TStorageMedium;
@@ -46,6 +48,7 @@ import java.time.format.DateTimeFormatter;
import java.util.Calendar;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.Iterator;
@@ -1719,4 +1722,39 @@ public class DynamicPartitionTableTest {
+ ");";
ExceptionChecker.expectThrowsNoException(() ->
createTable(createOlapTblStmt2));
}
+
+ @Test
+ public void testAutoBuckets() throws Exception {
+ String createOlapTblStmt = " CREATE TABLE
test.test_autobucket_dynamic_partition \n"
+ + " (k1 DATETIME)\n"
+ + " PARTITION BY RANGE (k1) () DISTRIBUTED BY HASH (k1)
BUCKETS AUTO\n"
+ + " PROPERTIES (\n"
+ + " \"dynamic_partition.enable\" = \"true\",\n"
+ + " \"dynamic_partition.time_unit\" = \"YEAR\",\n"
+ + " \"dynamic_partition.end\" = \"1\",\n"
+ + " \"dynamic_partition.prefix\" = \"p\",\n"
+ + " \"replication_allocation\" = \"tag.location.default: 1\"\n"
+ + ")";
+ ExceptionChecker.expectThrowsNoException(() ->
createTable(createOlapTblStmt));
+ Database db =
Env.getCurrentInternalCatalog().getDbOrAnalysisException("test");
+ OlapTable table = (OlapTable)
db.getTableOrAnalysisException("test_autobucket_dynamic_partition");
+ List<Partition> partitions =
Lists.newArrayList(table.getAllPartitions());
+ Assert.assertEquals(2, partitions.size());
+ for (Partition partition : partitions) {
+ Assert.assertEquals(FeConstants.default_bucket_num,
partition.getDistributionInfo().getBucketNum());
+ partition.setVisibleVersionAndTime(2L, System.currentTimeMillis());
+ }
+ RebalancerTestUtil.updateReplicaDataSize(1, 1, 1);
+
+ String alterStmt =
+ "alter table test.test_autobucket_dynamic_partition set
('dynamic_partition.end' = '2')";
+ ExceptionChecker.expectThrowsNoException(() -> alterTable(alterStmt));
+ List<Pair<Long, Long>> tempDynamicPartitionTableInfo =
Lists.newArrayList(Pair.of(db.getId(), table.getId()));
+
Env.getCurrentEnv().getDynamicPartitionScheduler().executeDynamicPartition(tempDynamicPartitionTableInfo,
false);
+
+ partitions = Lists.newArrayList(table.getAllPartitions());
+ partitions.sort(Comparator.comparing(Partition::getId));
+ Assert.assertEquals(3, partitions.size());
+ Assert.assertEquals(1,
partitions.get(2).getDistributionInfo().getBucketNum());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]