This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 78fe5c73a4acaf5c4b8ff69c3c95fbd7d4c2dbaf
Author: 苏承祥 <[email protected]>
AuthorDate: Tue Jan 3 15:31:01 2023 +0800

    [HUDI-3572] support DAY_ROLLING strategy in 
ClusteringPlanPartitionFilterMode (#4966)
    
    (cherry picked from commit 41bea2fec54ae6c2376f5c88bd5a524b60b74a11)
---
 .../cluster/ClusteringPlanPartitionFilter.java     | 23 +++++++++++++++++
 .../cluster/ClusteringPlanPartitionFilterMode.java |  3 ++-
 .../TestSparkClusteringPlanPartitionFilter.java    | 29 ++++++++++++++++++++++
 3 files changed, 54 insertions(+), 1 deletion(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilter.java
index 3a889de753..ecc3706f67 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilter.java
@@ -21,6 +21,9 @@ package org.apache.hudi.table.action.cluster;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieClusteringException;
 
+import org.joda.time.DateTime;
+
+import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
 import java.util.stream.Collectors;
@@ -31,6 +34,11 @@ import java.util.stream.Stream;
  *  NONE: skip filter
  *  RECENT DAYS: output recent partition given skip num and days lookback 
config
  *  SELECTED_PARTITIONS: output partition falls in the [start, end] condition
+ *  DAY_ROLLING: Clustering all partitions once a day to avoid clustering data 
of all partitions each time.
+ *  sort partitions asc, choose which partition index % 24 = now_hour.
+ *  tips: If hoodie.clustering.inline=true, try to reach the limit of 
hoodie.clustering.inline.max.commits every hour.
+ *        If hoodie.clustering.async.enabled=true, try to reach the limit of 
hoodie.clustering.async.max.commits every hour.
+ *
  */
 public class ClusteringPlanPartitionFilter {
 
@@ -43,11 +51,26 @@ public class ClusteringPlanPartitionFilter {
         return recentDaysFilter(partitions, config);
       case SELECTED_PARTITIONS:
         return selectedPartitionsFilter(partitions, config);
+      case DAY_ROLLING:
+        return dayRollingFilter(partitions, config);
       default:
         throw new HoodieClusteringException("Unknown partition filter, filter 
mode: " + mode);
     }
   }
 
+  private static List<String> dayRollingFilter(List<String> partitions, 
HoodieWriteConfig config) {
+    int hour = DateTime.now().getHourOfDay();
+    int len = partitions.size();
+    List<String> selectPt = new ArrayList<>();
+    partitions.sort(String::compareTo);
+    for (int i = 0; i < len; i++) {
+      if (i % 24 == hour) {
+        selectPt.add(partitions.get(i));
+      }
+    }
+    return selectPt;
+  }
+
   private static List<String> recentDaysFilter(List<String> partitions, 
HoodieWriteConfig config) {
     int targetPartitionsForClustering = 
config.getTargetPartitionsForClustering();
     int skipPartitionsFromLatestForClustering = 
config.getSkipPartitionsFromLatestForClustering();
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilterMode.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilterMode.java
index fbaf79797f..261c1874cc 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilterMode.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilterMode.java
@@ -24,5 +24,6 @@ package org.apache.hudi.table.action.cluster;
 public enum ClusteringPlanPartitionFilterMode {
   NONE,
   RECENT_DAYS,
-  SELECTED_PARTITIONS
+  SELECTED_PARTITIONS,
+  DAY_ROLLING
 }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestSparkClusteringPlanPartitionFilter.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestSparkClusteringPlanPartitionFilter.java
index a68a9e3360..70643a327d 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestSparkClusteringPlanPartitionFilter.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestSparkClusteringPlanPartitionFilter.java
@@ -26,6 +26,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
 import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode;
 
+import org.joda.time.DateTime;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mock;
@@ -104,4 +105,32 @@ public class TestSparkClusteringPlanPartitionFilter {
     assertEquals(1, list.size());
     assertSame("20211222", list.get(0));
   }
+
+  @Test
+  public void testDayRollingPartitionFilter() {
+    HoodieWriteConfig config = 
hoodieWriteConfigBuilder.withClusteringConfig(HoodieClusteringConfig.newBuilder()
+            
.withClusteringPlanPartitionFilterMode(ClusteringPlanPartitionFilterMode.DAY_ROLLING)
+            .build())
+        .build();
+    PartitionAwareClusteringPlanStrategy sg = new 
SparkSizeBasedClusteringPlanStrategy(table, context, config);
+    ArrayList<String> fakeTimeBasedPartitionsPath = new ArrayList<>();
+    for (int i = 0; i < 24; i++) {
+      fakeTimeBasedPartitionsPath.add("20220301" + (i >= 10 ? 
String.valueOf(i) : "0" + i));
+    }
+    List filterPartitions = 
sg.filterPartitionPaths(fakeTimeBasedPartitionsPath);
+    assertEquals(1, filterPartitions.size());
+    
assertEquals(fakeTimeBasedPartitionsPath.get(DateTime.now().getHourOfDay()), 
filterPartitions.get(0));
+    fakeTimeBasedPartitionsPath = new ArrayList<>();
+    for (int i = 0; i < 24; i++) {
+      fakeTimeBasedPartitionsPath.add("20220301" + (i >= 10 ? 
String.valueOf(i) : "0" + i));
+      fakeTimeBasedPartitionsPath.add("20220302" + (i >= 10 ? 
String.valueOf(i) : "0" + i));
+    }
+    filterPartitions = sg.filterPartitionPaths(fakeTimeBasedPartitionsPath);
+    assertEquals(2, filterPartitions.size());
+
+    int hourOfDay = DateTime.now().getHourOfDay();
+    String suffix = hourOfDay >= 10 ? hourOfDay + "" : "0" + hourOfDay;
+    assertEquals("20220301" + suffix, filterPartitions.get(0));
+    assertEquals("20220302" + suffix, filterPartitions.get(1));
+  }
 }

Reply via email to