This is an automated email from the ASF dual-hosted git repository. forwardxu pushed a commit to branch release-0.12.1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 78fe5c73a4acaf5c4b8ff69c3c95fbd7d4c2dbaf Author: 苏承祥 <[email protected]> AuthorDate: Tue Jan 3 15:31:01 2023 +0800 [HUDI-3572] support DAY_ROLLING strategy in ClusteringPlanPartitionFilterMode (#4966) (cherry picked from commit 41bea2fec54ae6c2376f5c88bd5a524b60b74a11) --- .../cluster/ClusteringPlanPartitionFilter.java | 23 +++++++++++++++++ .../cluster/ClusteringPlanPartitionFilterMode.java | 3 ++- .../TestSparkClusteringPlanPartitionFilter.java | 29 ++++++++++++++++++++++ 3 files changed, 54 insertions(+), 1 deletion(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilter.java index 3a889de753..ecc3706f67 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilter.java @@ -21,6 +21,9 @@ package org.apache.hudi.table.action.cluster; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieClusteringException; +import org.joda.time.DateTime; + +import java.util.ArrayList; import java.util.Comparator; import java.util.List; import java.util.stream.Collectors; @@ -31,6 +34,11 @@ import java.util.stream.Stream; * NONE: skip filter * RECENT DAYS: output recent partition given skip num and days lookback config * SELECTED_PARTITIONS: output partition falls in the [start, end] condition + * DAY_ROLLING: Clustering all partitions once a day to avoid clustering data of all partitions each time. + * sort partitions asc, choose which partition index % 24 = now_hour. + * tips: If hoodie.clustering.inline=true, try to reach the limit of hoodie.clustering.inline.max.commits every hour. + * If hoodie.clustering.async.enabled=true, try to reach the limit of hoodie.clustering.async.max.commits every hour. + * */ public class ClusteringPlanPartitionFilter { @@ -43,11 +51,26 @@ public class ClusteringPlanPartitionFilter { return recentDaysFilter(partitions, config); case SELECTED_PARTITIONS: return selectedPartitionsFilter(partitions, config); + case DAY_ROLLING: + return dayRollingFilter(partitions, config); default: throw new HoodieClusteringException("Unknown partition filter, filter mode: " + mode); } } + private static List<String> dayRollingFilter(List<String> partitions, HoodieWriteConfig config) { + int hour = DateTime.now().getHourOfDay(); + int len = partitions.size(); + List<String> selectPt = new ArrayList<>(); + partitions.sort(String::compareTo); + for (int i = 0; i < len; i++) { + if (i % 24 == hour) { + selectPt.add(partitions.get(i)); + } + } + return selectPt; + } + private static List<String> recentDaysFilter(List<String> partitions, HoodieWriteConfig config) { int targetPartitionsForClustering = config.getTargetPartitionsForClustering(); int skipPartitionsFromLatestForClustering = config.getSkipPartitionsFromLatestForClustering(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilterMode.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilterMode.java index fbaf79797f..261c1874cc 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilterMode.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilterMode.java @@ -24,5 +24,6 @@ package org.apache.hudi.table.action.cluster; public enum ClusteringPlanPartitionFilterMode { NONE, RECENT_DAYS, - SELECTED_PARTITIONS + SELECTED_PARTITIONS, + DAY_ROLLING } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestSparkClusteringPlanPartitionFilter.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestSparkClusteringPlanPartitionFilter.java index a68a9e3360..70643a327d 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestSparkClusteringPlanPartitionFilter.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestSparkClusteringPlanPartitionFilter.java @@ -26,6 +26,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieSparkCopyOnWriteTable; import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode; +import org.joda.time.DateTime; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mock; @@ -104,4 +105,32 @@ public class TestSparkClusteringPlanPartitionFilter { assertEquals(1, list.size()); assertSame("20211222", list.get(0)); } + + @Test + public void testDayRollingPartitionFilter() { + HoodieWriteConfig config = hoodieWriteConfigBuilder.withClusteringConfig(HoodieClusteringConfig.newBuilder() + .withClusteringPlanPartitionFilterMode(ClusteringPlanPartitionFilterMode.DAY_ROLLING) + .build()) + .build(); + PartitionAwareClusteringPlanStrategy sg = new SparkSizeBasedClusteringPlanStrategy(table, context, config); + ArrayList<String> fakeTimeBasedPartitionsPath = new ArrayList<>(); + for (int i = 0; i < 24; i++) { + fakeTimeBasedPartitionsPath.add("20220301" + (i >= 10 ? String.valueOf(i) : "0" + i)); + } + List filterPartitions = sg.filterPartitionPaths(fakeTimeBasedPartitionsPath); + assertEquals(1, filterPartitions.size()); + assertEquals(fakeTimeBasedPartitionsPath.get(DateTime.now().getHourOfDay()), filterPartitions.get(0)); + fakeTimeBasedPartitionsPath = new ArrayList<>(); + for (int i = 0; i < 24; i++) { + fakeTimeBasedPartitionsPath.add("20220301" + (i >= 10 ? String.valueOf(i) : "0" + i)); + fakeTimeBasedPartitionsPath.add("20220302" + (i >= 10 ? String.valueOf(i) : "0" + i)); + } + filterPartitions = sg.filterPartitionPaths(fakeTimeBasedPartitionsPath); + assertEquals(2, filterPartitions.size()); + + int hourOfDay = DateTime.now().getHourOfDay(); + String suffix = hourOfDay >= 10 ? hourOfDay + "" : "0" + hourOfDay; + assertEquals("20220301" + suffix, filterPartitions.get(0)); + assertEquals("20220302" + suffix, filterPartitions.get(1)); + } }
