This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 6101c5a2e8e [chore](autobucket) add autobucket test and log (#36874)
6101c5a2e8e is described below
commit 6101c5a2e8eef27776fc22ff796def55ea3a220e
Author: yujun <[email protected]>
AuthorDate: Thu Jun 27 13:44:18 2024 +0800
[chore](autobucket) add autobucket test and log (#36874)
We met unexpect autobucket case for online env. Add log for
investigation.
---
.../doris/clone/DynamicPartitionScheduler.java | 59 +++++++++++++++++-----
.../doris/catalog/DynamicPartitionTableTest.java | 38 ++++++++++++++
2 files changed, 84 insertions(+), 13 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
index 8c5f4f669c5..d17af1836fe 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
@@ -50,6 +50,7 @@ import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.RangeUtils;
import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.rpc.RpcException;
import org.apache.doris.thrift.TStorageMedium;
import com.google.common.base.Strings;
@@ -71,6 +72,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
/**
* This class is used to periodically add or drop partition on an olapTable
which specify dynamic partition properties
@@ -186,7 +188,7 @@ public class DynamicPartitionScheduler extends MasterDaemon
{
}
private static int getBucketsNum(DynamicPartitionProperty property,
OlapTable table,
- String nowPartitionName, boolean executeFirstTime) {
+ String partitionName, String nowPartitionName, boolean
executeFirstTime) {
// if execute first time, all partitions no contain data
if (!table.isAutoBucket() || executeFirstTime) {
return property.getBuckets();
@@ -194,27 +196,56 @@ public class DynamicPartitionScheduler extends
MasterDaemon {
// auto bucket
// get all history partitions
- ArrayList<Long> partitionSizeArray = Lists.newArrayList();
RangePartitionInfo info = (RangePartitionInfo)
(table.getPartitionInfo());
List<Map.Entry<Long, PartitionItem>> idToItems = new
ArrayList<>(info.getIdToItem(false).entrySet());
idToItems.sort(Comparator.comparing(o -> ((RangePartitionItem)
o.getValue()).getItems().upperEndpoint()));
- for (Map.Entry<Long, PartitionItem> idToItem : idToItems) {
- Partition partition = table.getPartition(idToItem.getKey());
- // exclude current partition because its data isn't enough one
week/day/hour.
- if (partition != null &&
!partition.getName().equals(nowPartitionName)
- && partition.getVisibleVersion() >= 2) {
- partitionSizeArray.add(partition.getAllDataSize(true));
+ List<Partition> partitions = idToItems.stream()
+ .map(entry -> table.getPartition(entry.getKey()))
+ .filter(partition -> partition != null &&
!partition.getName().equals(nowPartitionName))
+ .collect(Collectors.toList());
+ List<Long> visibleVersions = null;
+ try {
+ visibleVersions = Partition.getVisibleVersions(partitions);
+ } catch (RpcException e) {
+ LOG.warn("autobucket use property's buckets get visible version
fail, table: [{}-{}], "
+ + "partition: {}, buckets num: {}, exception: ",
+ table.getName(), table.getId(), partitionName,
property.getBuckets(), e);
+ return property.getBuckets();
+ }
+
+ List<Partition> hasDataPartitions = Lists.newArrayList();
+ for (int i = 0; i < partitions.size(); i++) {
+ if (visibleVersions.get(i) >= 2) {
+ hasDataPartitions.add(partitions.get(i));
}
}
// no exist history partition data
- if (partitionSizeArray.isEmpty()) {
+ if (hasDataPartitions.isEmpty()) {
+ LOG.info("autobucket use property's buckets due to all partitions
no data, table: [{}-{}], "
+ + "partition: {}, buckets num: {}",
+ table.getName(), table.getId(), partitionName,
property.getBuckets());
return property.getBuckets();
}
+ ArrayList<Long> partitionSizeArray = hasDataPartitions.stream()
+ .map(partition -> partition.getAllDataSize(true))
+ .collect(Collectors.toCollection(ArrayList::new));
+ long estimatePartitionSize = getNextPartitionSize(partitionSizeArray);
// plus 5 for uncompressed data
- long uncompressedPartitionSize =
getNextPartitionSize(partitionSizeArray) * 5;
- return AutoBucketUtils.getBucketsNum(uncompressedPartitionSize,
Config.autobucket_min_buckets);
+ long uncompressedPartitionSize = estimatePartitionSize * 5;
+ int bucketsNum =
AutoBucketUtils.getBucketsNum(uncompressedPartitionSize,
Config.autobucket_min_buckets);
+ LOG.info("autobucket calc with {} history partitions, table: [{}-{}],
partition: {}, buckets num: {}, "
+ + " estimate partition size: {}, last partitions(partition
name, local size, remote size): {}",
+ hasDataPartitions.size(), table.getName(), table.getId(),
partitionName, bucketsNum,
+ estimatePartitionSize,
+ hasDataPartitions.stream()
+ .skip(Math.max(0, hasDataPartitions.size() - 7))
+ .map(partition -> "(" + partition.getName() + ", " +
partition.getDataSize(true)
+ + ", " + partition.getRemoteDataSize() + ")")
+ .collect(Collectors.toList()));
+
+ return bucketsNum;
}
private ArrayList<AddPartitionClause> getAddPartitionClause(Database db,
OlapTable olapTable,
@@ -320,7 +351,8 @@ public class DynamicPartitionScheduler extends MasterDaemon
{
DistributionDesc distributionDesc = null;
DistributionInfo distributionInfo =
olapTable.getDefaultDistributionInfo();
- int bucketsNum = getBucketsNum(dynamicPartitionProperty,
olapTable, nowPartitionName, executeFirstTime);
+ int bucketsNum = getBucketsNum(dynamicPartitionProperty,
olapTable, partitionName,
+ nowPartitionName, executeFirstTime);
if (distributionInfo.getType() ==
DistributionInfo.DistributionInfoType.HASH) {
HashDistributionInfo hashDistributionInfo =
(HashDistributionInfo) distributionInfo;
List<String> distColumnNames = new ArrayList<>();
@@ -488,7 +520,8 @@ public class DynamicPartitionScheduler extends MasterDaemon
{
return dropPartitionClauses;
}
- private void executeDynamicPartition(Collection<Pair<Long, Long>>
dynamicPartitionTableInfoCol,
+ // make public just for fe ut
+ public void executeDynamicPartition(Collection<Pair<Long, Long>>
dynamicPartitionTableInfoCol,
boolean executeFirstTime) throws DdlException {
Iterator<Pair<Long, Long>> iterator =
dynamicPartitionTableInfoCol.iterator();
while (iterator.hasNext()) {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java
b/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java
index d457be0324f..c2f13837329 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java
@@ -22,11 +22,13 @@ import org.apache.doris.analysis.CreateDbStmt;
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
import org.apache.doris.clone.DynamicPartitionScheduler;
+import org.apache.doris.clone.RebalancerTestUtil;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ExceptionChecker;
import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.Pair;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TStorageMedium;
@@ -46,6 +48,7 @@ import java.time.format.DateTimeFormatter;
import java.util.Calendar;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.Iterator;
@@ -1719,4 +1722,39 @@ public class DynamicPartitionTableTest {
+ ");";
ExceptionChecker.expectThrowsNoException(() ->
createTable(createOlapTblStmt2));
}
+
+ @Test
+ public void testAutoBuckets() throws Exception {
+ String createOlapTblStmt = " CREATE TABLE
test.test_autobucket_dynamic_partition \n"
+ + " (k1 DATETIME)\n"
+ + " PARTITION BY RANGE (k1) () DISTRIBUTED BY HASH (k1)
BUCKETS AUTO\n"
+ + " PROPERTIES (\n"
+ + " \"dynamic_partition.enable\" = \"true\",\n"
+ + " \"dynamic_partition.time_unit\" = \"YEAR\",\n"
+ + " \"dynamic_partition.end\" = \"1\",\n"
+ + " \"dynamic_partition.prefix\" = \"p\",\n"
+ + " \"replication_allocation\" = \"tag.location.default: 1\"\n"
+ + ")";
+ ExceptionChecker.expectThrowsNoException(() ->
createTable(createOlapTblStmt));
+ Database db =
Env.getCurrentInternalCatalog().getDbOrAnalysisException("test");
+ OlapTable table = (OlapTable)
db.getTableOrAnalysisException("test_autobucket_dynamic_partition");
+ List<Partition> partitions =
Lists.newArrayList(table.getAllPartitions());
+ Assert.assertEquals(2, partitions.size());
+ for (Partition partition : partitions) {
+ Assert.assertEquals(FeConstants.default_bucket_num,
partition.getDistributionInfo().getBucketNum());
+ partition.setVisibleVersionAndTime(2L, System.currentTimeMillis());
+ }
+ RebalancerTestUtil.updateReplicaDataSize(1, 1, 1);
+
+ String alterStmt =
+ "alter table test.test_autobucket_dynamic_partition set
('dynamic_partition.end' = '2')";
+ ExceptionChecker.expectThrowsNoException(() -> alterTable(alterStmt));
+ List<Pair<Long, Long>> tempDynamicPartitionTableInfo =
Lists.newArrayList(Pair.of(db.getId(), table.getId()));
+
Env.getCurrentEnv().getDynamicPartitionScheduler().executeDynamicPartition(tempDynamicPartitionTableInfo,
false);
+
+ partitions = Lists.newArrayList(table.getAllPartitions());
+ partitions.sort(Comparator.comparing(Partition::getId));
+ Assert.assertEquals(3, partitions.size());
+ Assert.assertEquals(1,
partitions.get(2).getDistributionInfo().getBucketNum());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]