This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2444f40  [HUDI-3095] abstract partition filter logic to enable code 
reuse (#4454)
2444f40 is described below

commit 2444f40a4be5bbf0bf210dee5690267a9a1e35c8
Author: Yuwei XIAO <[email protected]>
AuthorDate: Fri Dec 31 13:37:52 2021 +0800

    [HUDI-3095] abstract partition filter logic to enable code reuse (#4454)
    
    * [HUDI-3095] abstract partition filter logic to enable code reuse
    
    * [HUDI-3095] address reviews
---
 .../apache/hudi/config/HoodieClusteringConfig.java |  43 +++++++++
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  14 +++
 .../cluster/BaseClusteringPlanActionExecutor.java  |   3 +-
 .../cluster/ClusteringPlanPartitionFilter.java     |  68 +++++++++++++
 .../cluster/ClusteringPlanPartitionFilterMode.java |  28 ++++++
 .../cluster/strategy/ClusteringPlanStrategy.java   |  34 +++++++
 .../PartitionAwareClusteringPlanStrategy.java      |   6 +-
 ...tClusteringPlanStrategyConfigCompatibility.java |  66 +++++++++++++
 .../JavaRecentDaysClusteringPlanStrategy.java      |  65 -------------
 .../JavaSizeBasedClusteringPlanStrategy.java       |   5 -
 .../SparkRecentDaysClusteringPlanStrategy.java     |  63 ------------
 ...rkSelectedPartitionsClusteringPlanStrategy.java |  66 -------------
 .../strategy/SparkSingleFileSortPlanStrategy.java  |   1 +
 .../SparkSizeBasedClusteringPlanStrategy.java      |   5 -
 .../TestSparkRecentDaysClusteringPlanStrategy.java |  66 -------------
 .../TestSparkClusteringPlanPartitionFilter.java    | 107 +++++++++++++++++++++
 16 files changed, 368 insertions(+), 272 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
index 4f80b66..9486ad4 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.util.TypeUtils;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieNotSupportedException;
+import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode;
 
 import javax.annotation.Nonnull;
 import java.io.File;
@@ -54,6 +55,8 @@ public class HoodieClusteringConfig extends HoodieConfig {
       
"org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy";
   public static final String JAVA_SORT_AND_SIZE_EXECUTION_STRATEGY =
       
"org.apache.hudi.client.clustering.run.strategy.JavaSortAndSizeExecutionStrategy";
+  public static final String PLAN_PARTITION_FILTER_MODE =
+      "hoodie.clustering.plan.partition.filter.mode";
 
   // Any Space-filling curves optimize(z-order/hilbert) params can be saved 
with this prefix
   public static final String LAYOUT_OPTIMIZE_PARAM_PREFIX = 
"hoodie.layout.optimize.";
@@ -64,6 +67,20 @@ public class HoodieClusteringConfig extends HoodieConfig {
       .sinceVersion("0.7.0")
       .withDocumentation("Number of partitions to list to create 
ClusteringPlan");
 
+  public static final ConfigProperty<String> PARTITION_FILTER_BEGIN_PARTITION 
= ConfigProperty
+      .key(CLUSTERING_STRATEGY_PARAM_PREFIX + "cluster.begin.partition")
+      .noDefaultValue()
+      .sinceVersion("0.11.0")
+      .withDocumentation("Begin partition used to filter partition 
(inclusive), only effective when the filter mode '"
+          + PLAN_PARTITION_FILTER_MODE + "' is " + 
ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name());
+
+  public static final ConfigProperty<String> PARTITION_FILTER_END_PARTITION = 
ConfigProperty
+      .key(CLUSTERING_STRATEGY_PARAM_PREFIX + "cluster.end.partition")
+      .noDefaultValue()
+      .sinceVersion("0.11.0")
+      .withDocumentation("End partition used to filter partition (inclusive), 
only effective when the filter mode '"
+          + PLAN_PARTITION_FILTER_MODE + "' is " + 
ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name());
+
   public static final ConfigProperty<String> PLAN_STRATEGY_SMALL_FILE_LIMIT = 
ConfigProperty
       .key(CLUSTERING_STRATEGY_PARAM_PREFIX + "small.file.limit")
       .defaultValue(String.valueOf(600 * 1024 * 1024L))
@@ -110,6 +127,17 @@ public class HoodieClusteringConfig extends HoodieConfig {
       .sinceVersion("0.9.0")
       .withDocumentation("Number of partitions to skip from latest when 
choosing partitions to create ClusteringPlan");
 
+  public static final ConfigProperty<ClusteringPlanPartitionFilterMode> 
PLAN_PARTITION_FILTER_MODE_NAME = ConfigProperty
+      .key(PLAN_PARTITION_FILTER_MODE)
+      .defaultValue(ClusteringPlanPartitionFilterMode.NONE)
+      .sinceVersion("0.11.0")
+      .withDocumentation("Partition filter mode used in the creation of 
clustering plan. Available values are - "
+          + "NONE: do not filter table partition and thus the clustering plan 
will include all partitions that have clustering candidate."
+          + "RECENT_DAYS: keep a continuous range of partitions, worked 
together with configs '" + DAYBASED_LOOKBACK_PARTITIONS.key() + "' and '"
+          + PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST.key() + "."
+          + "SELECTED_PARTITIONS: keep partitions that are in the specified 
range ['" + PARTITION_FILTER_BEGIN_PARTITION.key() + "', '"
+          + PARTITION_FILTER_END_PARTITION.key() + "'].");
+
   public static final ConfigProperty<String> 
PLAN_STRATEGY_MAX_BYTES_PER_OUTPUT_FILEGROUP = ConfigProperty
       .key(CLUSTERING_STRATEGY_PARAM_PREFIX + "max.bytes.per.group")
       .defaultValue(String.valueOf(2 * 1024 * 1024 * 1024L))
@@ -381,6 +409,11 @@ public class HoodieClusteringConfig extends HoodieConfig {
       return this;
     }
 
+    public Builder 
withClusteringPlanPartitionFilterMode(ClusteringPlanPartitionFilterMode mode) {
+      clusteringConfig.setValue(PLAN_PARTITION_FILTER_MODE_NAME.key(), 
mode.toString());
+      return this;
+    }
+
     public Builder withClusteringExecutionStrategyClass(String 
runClusteringStrategyClass) {
       clusteringConfig.setValue(EXECUTION_STRATEGY_CLASS_NAME, 
runClusteringStrategyClass);
       return this;
@@ -396,6 +429,16 @@ public class HoodieClusteringConfig extends HoodieConfig {
       return this;
     }
 
+    public Builder withClusteringPartitionFilterBeginPartition(String begin) {
+      clusteringConfig.setValue(PARTITION_FILTER_BEGIN_PARTITION, begin);
+      return this;
+    }
+
+    public Builder withClusteringPartitionFilterEndPartition(String end) {
+      clusteringConfig.setValue(PARTITION_FILTER_END_PARTITION, end);
+      return this;
+    }
+
     public Builder withClusteringPlanSmallFileLimit(long 
clusteringSmallFileLimit) {
       clusteringConfig.setValue(PLAN_STRATEGY_SMALL_FILE_LIMIT, 
String.valueOf(clusteringSmallFileLimit));
       return this;
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 74a17bf..d8e4bd3 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -53,6 +53,7 @@ import org.apache.hudi.keygen.constant.KeyGeneratorType;
 import org.apache.hudi.metrics.MetricsReporterType;
 import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite;
 import org.apache.hudi.table.RandomFileIdPrefixProvider;
+import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode;
 import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
 import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
 
@@ -1215,6 +1216,19 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getString(HoodieClusteringConfig.PLAN_STRATEGY_CLASS_NAME);
   }
 
+  public ClusteringPlanPartitionFilterMode 
getClusteringPlanPartitionFilterMode() {
+    String mode = 
getString(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME);
+    return ClusteringPlanPartitionFilterMode.valueOf(mode);
+  }
+
+  public String getBeginPartitionForClustering() {
+    return getString(HoodieClusteringConfig.PARTITION_FILTER_BEGIN_PARTITION);
+  }
+
+  public String getEndPartitionForClustering() {
+    return getString(HoodieClusteringConfig.PARTITION_FILTER_END_PARTITION);
+  }
+
   public String getClusteringExecutionStrategyClass() {
     return getString(HoodieClusteringConfig.EXECUTION_STRATEGY_CLASS_NAME);
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/BaseClusteringPlanActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/BaseClusteringPlanActionExecutor.java
index 8071bfb..a1820ed 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/BaseClusteringPlanActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/BaseClusteringPlanActionExecutor.java
@@ -79,7 +79,8 @@ public abstract class BaseClusteringPlanActionExecutor<T 
extends HoodieRecordPay
 
     LOG.info("Generating clustering plan for table " + config.getBasePath());
     ClusteringPlanStrategy strategy = (ClusteringPlanStrategy)
-        ReflectionUtils.loadClass(config.getClusteringPlanStrategyClass(), 
table, context, config);
+        
ReflectionUtils.loadClass(ClusteringPlanStrategy.checkAndGetClusteringPlanStrategy(config),
 table, context, config);
+
     return strategy.generateClusteringPlan();
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilter.java
new file mode 100644
index 0000000..a63eb3b
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilter.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.cluster;
+
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieClusteringException;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Partition filter utilities. Currently, we support three mode:
+ *  NONE: skip filter
+ *  RECENT DAYS: output recent partition given skip num and days lookback 
config
+ *  SELECTED_PARTITIONS: output partition falls in the [start, end] condition
+ */
+public class ClusteringPlanPartitionFilter {
+
+  public static List<String> filter(List<String> partitions, HoodieWriteConfig 
config) {
+    ClusteringPlanPartitionFilterMode mode = 
config.getClusteringPlanPartitionFilterMode();
+    switch (mode) {
+      case NONE:
+        return partitions;
+      case RECENT_DAYS:
+        return recentDaysFilter(partitions, config);
+      case SELECTED_PARTITIONS:
+        return selectedPartitionsFilter(partitions, config);
+      default:
+        throw new HoodieClusteringException("Unknown partition filter, filter 
mode: " + mode);
+    }
+  }
+
+  private static List<String> recentDaysFilter(List<String> partitions, 
HoodieWriteConfig config) {
+    int targetPartitionsForClustering = 
config.getTargetPartitionsForClustering();
+    int skipPartitionsFromLatestForClustering = 
config.getSkipPartitionsFromLatestForClustering();
+    return partitions.stream()
+        .sorted(Comparator.reverseOrder())
+        .skip(Math.max(skipPartitionsFromLatestForClustering, 0))
+        .limit(targetPartitionsForClustering > 0 ? 
targetPartitionsForClustering : partitions.size())
+        .collect(Collectors.toList());
+  }
+
+  private static List<String> selectedPartitionsFilter(List<String> 
partitions, HoodieWriteConfig config) {
+    String beginPartition = config.getBeginPartitionForClustering();
+    String endPartition = config.getEndPartitionForClustering();
+    List<String> filteredPartitions = partitions.stream()
+        .filter(path -> path.compareTo(beginPartition) >= 0 && 
path.compareTo(endPartition) <= 0)
+        .collect(Collectors.toList());
+    return filteredPartitions;
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilterMode.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilterMode.java
new file mode 100644
index 0000000..fbaf797
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilterMode.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.cluster;
+
+/**
+ * Clustering partition filter mode
+ */
+public enum ClusteringPlanPartitionFilterMode {
+  NONE,
+  RECENT_DAYS,
+  SELECTED_PARTITIONS
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java
index 273ebce..479f639 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java
@@ -30,8 +30,11 @@ import 
org.apache.hudi.common.table.view.SyncableFileSystemView;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieClusteringConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode;
+
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -56,6 +59,37 @@ public abstract class ClusteringPlanStrategy<T extends 
HoodieRecordPayload,I,K,O
   private final transient HoodieEngineContext engineContext;
   private final HoodieWriteConfig writeConfig;
 
+  /**
+   * Check if the given class is deprecated.
+   * If it is, then try to convert it to suitable one and update the write 
config accordingly.
+   * @param config write config
+   * @return class name of clustering plan strategy
+   */
+  public static String checkAndGetClusteringPlanStrategy(HoodieWriteConfig 
config) {
+    String className = config.getClusteringPlanStrategyClass();
+    String sparkSizeBasedClassName = 
HoodieClusteringConfig.SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY;
+    String sparkSelectedPartitionsClassName = 
"org.apache.hudi.client.clustering.plan.strategy.SparkSelectedPartitionsClusteringPlanStrategy";
+    String sparkRecentDaysClassName = 
"org.apache.hudi.client.clustering.plan.strategy.SparkRecentDaysClusteringPlanStrategy";
+    String javaSelectedPartitionClassName = 
"org.apache.hudi.client.clustering.plan.strategy.JavaRecentDaysClusteringPlanStrategy";
+    String javaSizeBasedClassName = 
HoodieClusteringConfig.JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY;
+
+    String logStr = "The clustering plan '%s' is deprecated. Please set the 
plan as '%s' and set '%s' as '%s' to achieve the same behaviour";
+    if (sparkRecentDaysClassName.equals(className)) {
+      config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME, 
ClusteringPlanPartitionFilterMode.RECENT_DAYS.name());
+      LOG.warn(String.format(logStr, className, sparkSizeBasedClassName, 
HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(), 
ClusteringPlanPartitionFilterMode.RECENT_DAYS.name()));
+      return sparkSizeBasedClassName;
+    } else if (sparkSelectedPartitionsClassName.equals(className)) {
+      config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME, 
ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name());
+      LOG.warn(String.format(logStr, className, sparkSizeBasedClassName, 
HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(), 
ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name()));
+      return sparkSizeBasedClassName;
+    } else if (javaSelectedPartitionClassName.equals(className)) {
+      config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME, 
ClusteringPlanPartitionFilterMode.RECENT_DAYS.name());
+      LOG.warn(String.format(logStr, className, javaSizeBasedClassName, 
HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(), 
ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name()));
+      return javaSizeBasedClassName;
+    }
+    return className;
+  }
+
   public ClusteringPlanStrategy(HoodieTable table, HoodieEngineContext 
engineContext, HoodieWriteConfig writeConfig) {
     this.writeConfig = writeConfig;
     this.hoodieTable = table;
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
index 4d91636..7246810 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
@@ -29,6 +29,8 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilter;
+
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -56,7 +58,9 @@ public abstract class PartitionAwareClusteringPlanStrategy<T 
extends HoodieRecor
    * Return list of partition paths to be considered for clustering.
    */
   protected List<String> filterPartitionPaths(List<String> partitionPaths) {
-    return partitionPaths;
+    List<String> filteredPartitions = 
ClusteringPlanPartitionFilter.filter(partitionPaths, getWriteConfig());
+    LOG.debug("Filtered to the following partitions: " + filteredPartitions);
+    return filteredPartitions;
   }
 
   @Override
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/cluster/strategy/TestClusteringPlanStrategyConfigCompatibility.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/cluster/strategy/TestClusteringPlanStrategyConfigCompatibility.java
new file mode 100644
index 0000000..34626a8
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/cluster/strategy/TestClusteringPlanStrategyConfigCompatibility.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.cluster.strategy;
+
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode;
+import org.apache.hudi.table.action.cluster.strategy.ClusteringPlanStrategy;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.stream.Stream;
+
+public class TestClusteringPlanStrategyConfigCompatibility {
+
+  private static Stream<Arguments> configParams() {
+    /**
+     * (user specified class, converted class, filter mode)
+     */
+    Object[][] data = new Object[][] {
+        
{"org.apache.hudi.client.clustering.plan.strategy.SparkRecentDaysClusteringPlanStrategy",
+            
"org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy",
+            ClusteringPlanPartitionFilterMode.RECENT_DAYS},
+        
{"org.apache.hudi.client.clustering.plan.strategy.SparkSelectedPartitionsClusteringPlanStrategy",
+            
"org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy",
+            ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS},
+        
{"org.apache.hudi.client.clustering.plan.strategy.JavaRecentDaysClusteringPlanStrategy",
+            
"org.apache.hudi.client.clustering.plan.strategy.JavaSizeBasedClusteringPlanStrategy",
+            ClusteringPlanPartitionFilterMode.RECENT_DAYS}
+    };
+    return Stream.of(data).map(Arguments::of);
+  }
+
+  @ParameterizedTest()
+  @MethodSource("configParams")
+  public void testCheckAndGetClusteringPlanStrategy(String oldClass, String 
newClass, ClusteringPlanPartitionFilterMode mode) {
+    HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+        .withPath("")
+        .withClusteringConfig(HoodieClusteringConfig.newBuilder()
+            .withClusteringPlanStrategyClass(oldClass)
+            .build())
+        .build();
+
+    Assertions.assertEquals(newClass, 
ClusteringPlanStrategy.checkAndGetClusteringPlanStrategy(config));
+    Assertions.assertEquals(mode, 
config.getClusteringPlanPartitionFilterMode());
+  }
+}
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaRecentDaysClusteringPlanStrategy.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaRecentDaysClusteringPlanStrategy.java
deleted file mode 100644
index 6d9b2ee..0000000
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaRecentDaysClusteringPlanStrategy.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hudi.client.clustering.plan.strategy;
-
-import org.apache.hudi.client.common.HoodieJavaEngineContext;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.HoodieJavaCopyOnWriteTable;
-import org.apache.hudi.table.HoodieJavaMergeOnReadTable;
-
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-
-import java.util.Comparator;
-import java.util.List;
-import java.util.stream.Collectors;
-
-/**
- * Clustering Strategy that only looks at latest 
'daybased.lookback.partitions' partitions
- * for Java engine.
- */
-public class JavaRecentDaysClusteringPlanStrategy<T extends 
HoodieRecordPayload<T>>
-    extends JavaSizeBasedClusteringPlanStrategy<T> {
-  private static final Logger LOG = 
LogManager.getLogger(JavaRecentDaysClusteringPlanStrategy.class);
-
-  public JavaRecentDaysClusteringPlanStrategy(HoodieJavaCopyOnWriteTable<T> 
table,
-                                              HoodieJavaEngineContext 
engineContext,
-                                              HoodieWriteConfig writeConfig) {
-    super(table, engineContext, writeConfig);
-  }
-
-  public JavaRecentDaysClusteringPlanStrategy(HoodieJavaMergeOnReadTable<T> 
table,
-                                              HoodieJavaEngineContext 
engineContext,
-                                              HoodieWriteConfig writeConfig) {
-    super(table, engineContext, writeConfig);
-  }
-
-  @Override
-  protected List<String> filterPartitionPaths(List<String> partitionPaths) {
-    int targetPartitionsForClustering = 
getWriteConfig().getTargetPartitionsForClustering();
-    int skipPartitionsFromLatestForClustering = 
getWriteConfig().getSkipPartitionsFromLatestForClustering();
-    return partitionPaths.stream()
-        .sorted(Comparator.reverseOrder())
-        .skip(Math.max(skipPartitionsFromLatestForClustering, 0))
-        .limit(targetPartitionsForClustering > 0 ? 
targetPartitionsForClustering : partitionPaths.size())
-        .collect(Collectors.toList());
-  }
-}
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaSizeBasedClusteringPlanStrategy.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaSizeBasedClusteringPlanStrategy.java
index 9052f03..ec7202f 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaSizeBasedClusteringPlanStrategy.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaSizeBasedClusteringPlanStrategy.java
@@ -114,11 +114,6 @@ public class JavaSizeBasedClusteringPlanStrategy<T extends 
HoodieRecordPayload<T
   }
 
   @Override
-  protected List<String> filterPartitionPaths(List<String> partitionPaths) {
-    return partitionPaths;
-  }
-
-  @Override
   protected Stream<FileSlice> getFileSlicesEligibleForClustering(final String 
partition) {
     return super.getFileSlicesEligibleForClustering(partition)
         // Only files that have basefile size smaller than small file size are 
eligible.
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkRecentDaysClusteringPlanStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkRecentDaysClusteringPlanStrategy.java
deleted file mode 100644
index ad19824..0000000
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkRecentDaysClusteringPlanStrategy.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.client.clustering.plan.strategy;
-
-import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
-import org.apache.hudi.table.HoodieSparkMergeOnReadTable;
-
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-
-import java.util.Comparator;
-import java.util.List;
-import java.util.stream.Collectors;
-
-/**
- * Clustering Strategy that only looks at latest 
'daybased.lookback.partitions' partitions.
- */
-public class SparkRecentDaysClusteringPlanStrategy<T extends 
HoodieRecordPayload<T>>
-    extends SparkSizeBasedClusteringPlanStrategy<T> {
-  private static final Logger LOG = 
LogManager.getLogger(SparkRecentDaysClusteringPlanStrategy.class);
-
-  public SparkRecentDaysClusteringPlanStrategy(HoodieSparkCopyOnWriteTable<T> 
table,
-                                               HoodieSparkEngineContext 
engineContext,
-                                               HoodieWriteConfig writeConfig) {
-    super(table, engineContext, writeConfig);
-  }
-
-  public SparkRecentDaysClusteringPlanStrategy(HoodieSparkMergeOnReadTable<T> 
table,
-                                               HoodieSparkEngineContext 
engineContext,
-                                               HoodieWriteConfig writeConfig) {
-    super(table, engineContext, writeConfig);
-  }
-
-  @Override
-  protected List<String> filterPartitionPaths(List<String> partitionPaths) {
-    int targetPartitionsForClustering = 
getWriteConfig().getTargetPartitionsForClustering();
-    int skipPartitionsFromLatestForClustering = 
getWriteConfig().getSkipPartitionsFromLatestForClustering();
-    return partitionPaths.stream()
-        .sorted(Comparator.reverseOrder())
-        .skip(Math.max(skipPartitionsFromLatestForClustering, 0))
-        .limit(targetPartitionsForClustering > 0 ? 
targetPartitionsForClustering : partitionPaths.size())
-        .collect(Collectors.toList());
-  }
-}
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSelectedPartitionsClusteringPlanStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSelectedPartitionsClusteringPlanStrategy.java
deleted file mode 100644
index 549935d..0000000
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSelectedPartitionsClusteringPlanStrategy.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.client.clustering.plan.strategy;
-
-import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
-import org.apache.hudi.table.HoodieSparkMergeOnReadTable;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-
-import java.util.List;
-import java.util.stream.Collectors;
-
-import static 
org.apache.hudi.config.HoodieClusteringConfig.CLUSTERING_STRATEGY_PARAM_PREFIX;
-
-/**
- * Clustering Strategy to filter just specified partitions from [begin, end]. 
Note both begin and end are inclusive.
- */
-public class SparkSelectedPartitionsClusteringPlanStrategy<T extends 
HoodieRecordPayload<T>>
-    extends SparkSizeBasedClusteringPlanStrategy<T> {
-  private static final Logger LOG = 
LogManager.getLogger(SparkSelectedPartitionsClusteringPlanStrategy.class);
-
-  public static final String CONF_BEGIN_PARTITION = 
CLUSTERING_STRATEGY_PARAM_PREFIX + "cluster.begin.partition";
-  public static final String CONF_END_PARTITION = 
CLUSTERING_STRATEGY_PARAM_PREFIX + "cluster.end.partition";
-  
-  public 
SparkSelectedPartitionsClusteringPlanStrategy(HoodieSparkCopyOnWriteTable<T> 
table,
-                                                       
HoodieSparkEngineContext engineContext,
-                                                       HoodieWriteConfig 
writeConfig) {
-    super(table, engineContext, writeConfig);
-  }
-
-  public 
SparkSelectedPartitionsClusteringPlanStrategy(HoodieSparkMergeOnReadTable<T> 
table,
-                                                       
HoodieSparkEngineContext engineContext,
-                                                       HoodieWriteConfig 
writeConfig) {
-    super(table, engineContext, writeConfig);
-  }
-
-  @Override
-  protected List<String> filterPartitionPaths(List<String> partitionPaths) {
-    String beginPartition = 
getWriteConfig().getProps().getProperty(CONF_BEGIN_PARTITION);
-    String endPartition = 
getWriteConfig().getProps().getProperty(CONF_END_PARTITION);
-    List<String> filteredPartitions = partitionPaths.stream()
-        .filter(path -> path.compareTo(beginPartition) >= 0 && 
path.compareTo(endPartition) <= 0)
-        .collect(Collectors.toList());
-    LOG.info("Filtered to the following partitions: " + filteredPartitions);
-    return filteredPartitions;
-  }
-}
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSingleFileSortPlanStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSingleFileSortPlanStrategy.java
index b98dbac..88c3057 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSingleFileSortPlanStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSingleFileSortPlanStrategy.java
@@ -48,6 +48,7 @@ public class SparkSingleFileSortPlanStrategy<T extends 
HoodieRecordPayload<T>>
     super(table, engineContext, writeConfig);
   }
 
+  @Override
   protected Stream<HoodieClusteringGroup> 
buildClusteringGroupsForPartition(String partitionPath, List<FileSlice> 
fileSlices) {
     List<Pair<List<FileSlice>, Integer>> fileSliceGroups = fileSlices.stream()
         .map(fileSlice -> Pair.of(Collections.singletonList(fileSlice), 
1)).collect(Collectors.toList());
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSizeBasedClusteringPlanStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSizeBasedClusteringPlanStrategy.java
index 7295118..b38931c 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSizeBasedClusteringPlanStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSizeBasedClusteringPlanStrategy.java
@@ -116,11 +116,6 @@ public class SparkSizeBasedClusteringPlanStrategy<T 
extends HoodieRecordPayload<
   }
 
   @Override
-  protected List<String> filterPartitionPaths(List<String> partitionPaths) {
-    return partitionPaths;
-  }
-
-  @Override
   protected Stream<FileSlice> getFileSlicesEligibleForClustering(final String 
partition) {
     return super.getFileSlicesEligibleForClustering(partition)
         // Only files that have basefile size smaller than small file size are 
eligible.
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/clustering/plan/strategy/TestSparkRecentDaysClusteringPlanStrategy.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/clustering/plan/strategy/TestSparkRecentDaysClusteringPlanStrategy.java
deleted file mode 100644
index a9c7b71..0000000
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/clustering/plan/strategy/TestSparkRecentDaysClusteringPlanStrategy.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.client.clustering.plan.strategy;
-
-import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.config.HoodieClusteringConfig;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.mockito.Mock;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertSame;
-
-public class TestSparkRecentDaysClusteringPlanStrategy {
-  @Mock
-  HoodieSparkCopyOnWriteTable table;
-  @Mock
-  HoodieSparkEngineContext context;
-  HoodieWriteConfig hoodieWriteConfig;
-
-  @BeforeEach
-  public void setUp() {
-    this.hoodieWriteConfig = HoodieWriteConfig
-            .newBuilder()
-            .withPath("Fake_Table_Path")
-            .withClusteringConfig(HoodieClusteringConfig
-                    .newBuilder()
-                    .withClusteringSkipPartitionsFromLatest(1)
-                    .withClusteringTargetPartitions(1)
-                    .build())
-            .build();
-  }
-
-  @Test
-  public void testFilterPartitionPaths() {
-    SparkRecentDaysClusteringPlanStrategy sg = new 
SparkRecentDaysClusteringPlanStrategy(table, context, hoodieWriteConfig);
-    ArrayList<String> fakeTimeBasedPartitionsPath = new ArrayList<>();
-    fakeTimeBasedPartitionsPath.add("20210718");
-    fakeTimeBasedPartitionsPath.add("20210716");
-    fakeTimeBasedPartitionsPath.add("20210719");
-    List list = sg.filterPartitionPaths(fakeTimeBasedPartitionsPath);
-    assertEquals(1, list.size());
-    assertSame("20210718", list.get(0));
-  }
-}
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestSparkClusteringPlanPartitionFilter.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestSparkClusteringPlanPartitionFilter.java
new file mode 100644
index 0000000..a68a9e3
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestSparkClusteringPlanPartitionFilter.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.cluster.strategy;
+
+import 
org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
+import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertSame;
+
+public class TestSparkClusteringPlanPartitionFilter {
+  @Mock
+  HoodieSparkCopyOnWriteTable table;
+  @Mock
+  HoodieSparkEngineContext context;
+  HoodieWriteConfig.Builder hoodieWriteConfigBuilder;
+
+  @BeforeEach
+  public void setUp() {
+    this.hoodieWriteConfigBuilder = HoodieWriteConfig
+            .newBuilder()
+            .withPath("Fake_Table_Path");
+  }
+
+  @Test
+  public void testFilterPartitionNoFilter() {
+    HoodieWriteConfig config = 
hoodieWriteConfigBuilder.withClusteringConfig(HoodieClusteringConfig.newBuilder()
+            
.withClusteringPlanPartitionFilterMode(ClusteringPlanPartitionFilterMode.NONE)
+            .build())
+        .build();
+
+    PartitionAwareClusteringPlanStrategy sg = new 
SparkSizeBasedClusteringPlanStrategy(table, context, config);
+    ArrayList<String> fakeTimeBasedPartitionsPath = new ArrayList<>();
+    fakeTimeBasedPartitionsPath.add("20210718");
+    fakeTimeBasedPartitionsPath.add("20210716");
+    fakeTimeBasedPartitionsPath.add("20210719");
+    List list = sg.filterPartitionPaths(fakeTimeBasedPartitionsPath);
+    assertEquals(3, list.size());
+  }
+
+  @Test
+  public void testFilterPartitionRecentDays() {
+    HoodieWriteConfig config = 
hoodieWriteConfigBuilder.withClusteringConfig(HoodieClusteringConfig.newBuilder()
+            .withClusteringSkipPartitionsFromLatest(1)
+            .withClusteringTargetPartitions(1)
+            
.withClusteringPlanPartitionFilterMode(ClusteringPlanPartitionFilterMode.RECENT_DAYS)
+            .build())
+        .build();
+
+    PartitionAwareClusteringPlanStrategy sg = new 
SparkSizeBasedClusteringPlanStrategy(table, context, config);
+    ArrayList<String> fakeTimeBasedPartitionsPath = new ArrayList<>();
+    fakeTimeBasedPartitionsPath.add("20210718");
+    fakeTimeBasedPartitionsPath.add("20210716");
+    fakeTimeBasedPartitionsPath.add("20210719");
+    List list = sg.filterPartitionPaths(fakeTimeBasedPartitionsPath);
+    assertEquals(1, list.size());
+    assertSame("20210718", list.get(0));
+  }
+
+  @Test
+  public void testFilterPartitionSelectedPartitions() {
+    HoodieWriteConfig config = 
hoodieWriteConfigBuilder.withClusteringConfig(HoodieClusteringConfig.newBuilder()
+            .withClusteringPartitionFilterBeginPartition("20211222")
+            .withClusteringPartitionFilterEndPartition("20211223")
+            
.withClusteringPlanPartitionFilterMode(ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS)
+            .build())
+        .build();
+
+    PartitionAwareClusteringPlanStrategy sg = new 
SparkSizeBasedClusteringPlanStrategy(table, context, config);
+    ArrayList<String> fakeTimeBasedPartitionsPath = new ArrayList<>();
+    fakeTimeBasedPartitionsPath.add("20211220");
+    fakeTimeBasedPartitionsPath.add("20211221");
+    fakeTimeBasedPartitionsPath.add("20211222");
+    fakeTimeBasedPartitionsPath.add("20211224");
+    List list = sg.filterPartitionPaths(fakeTimeBasedPartitionsPath);
+    assertEquals(1, list.size());
+    assertSame("20211222", list.get(0));
+  }
+}

Reply via email to