This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 2444f40 [HUDI-3095] abstract partition filter logic to enable code
reuse (#4454)
2444f40 is described below
commit 2444f40a4be5bbf0bf210dee5690267a9a1e35c8
Author: Yuwei XIAO <[email protected]>
AuthorDate: Fri Dec 31 13:37:52 2021 +0800
[HUDI-3095] abstract partition filter logic to enable code reuse (#4454)
* [HUDI-3095] abstract partition filter logic to enable code reuse
* [HUDI-3095] address reviews
---
.../apache/hudi/config/HoodieClusteringConfig.java | 43 +++++++++
.../org/apache/hudi/config/HoodieWriteConfig.java | 14 +++
.../cluster/BaseClusteringPlanActionExecutor.java | 3 +-
.../cluster/ClusteringPlanPartitionFilter.java | 68 +++++++++++++
.../cluster/ClusteringPlanPartitionFilterMode.java | 28 ++++++
.../cluster/strategy/ClusteringPlanStrategy.java | 34 +++++++
.../PartitionAwareClusteringPlanStrategy.java | 6 +-
...tClusteringPlanStrategyConfigCompatibility.java | 66 +++++++++++++
.../JavaRecentDaysClusteringPlanStrategy.java | 65 -------------
.../JavaSizeBasedClusteringPlanStrategy.java | 5 -
.../SparkRecentDaysClusteringPlanStrategy.java | 63 ------------
...rkSelectedPartitionsClusteringPlanStrategy.java | 66 -------------
.../strategy/SparkSingleFileSortPlanStrategy.java | 1 +
.../SparkSizeBasedClusteringPlanStrategy.java | 5 -
.../TestSparkRecentDaysClusteringPlanStrategy.java | 66 -------------
.../TestSparkClusteringPlanPartitionFilter.java | 107 +++++++++++++++++++++
16 files changed, 368 insertions(+), 272 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
index 4f80b66..9486ad4 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.util.TypeUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieNotSupportedException;
+import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode;
import javax.annotation.Nonnull;
import java.io.File;
@@ -54,6 +55,8 @@ public class HoodieClusteringConfig extends HoodieConfig {
"org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy";
public static final String JAVA_SORT_AND_SIZE_EXECUTION_STRATEGY =
"org.apache.hudi.client.clustering.run.strategy.JavaSortAndSizeExecutionStrategy";
+ public static final String PLAN_PARTITION_FILTER_MODE =
+ "hoodie.clustering.plan.partition.filter.mode";
// Any Space-filling curves optimize(z-order/hilbert) params can be saved
with this prefix
public static final String LAYOUT_OPTIMIZE_PARAM_PREFIX =
"hoodie.layout.optimize.";
@@ -64,6 +67,20 @@ public class HoodieClusteringConfig extends HoodieConfig {
.sinceVersion("0.7.0")
.withDocumentation("Number of partitions to list to create
ClusteringPlan");
+ public static final ConfigProperty<String> PARTITION_FILTER_BEGIN_PARTITION
= ConfigProperty
+ .key(CLUSTERING_STRATEGY_PARAM_PREFIX + "cluster.begin.partition")
+ .noDefaultValue()
+ .sinceVersion("0.11.0")
+ .withDocumentation("Begin partition used to filter partition
(inclusive), only effective when the filter mode '"
+ + PLAN_PARTITION_FILTER_MODE + "' is " +
ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name());
+
+ public static final ConfigProperty<String> PARTITION_FILTER_END_PARTITION =
ConfigProperty
+ .key(CLUSTERING_STRATEGY_PARAM_PREFIX + "cluster.end.partition")
+ .noDefaultValue()
+ .sinceVersion("0.11.0")
+ .withDocumentation("End partition used to filter partition (inclusive),
only effective when the filter mode '"
+ + PLAN_PARTITION_FILTER_MODE + "' is " +
ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name());
+
public static final ConfigProperty<String> PLAN_STRATEGY_SMALL_FILE_LIMIT =
ConfigProperty
.key(CLUSTERING_STRATEGY_PARAM_PREFIX + "small.file.limit")
.defaultValue(String.valueOf(600 * 1024 * 1024L))
@@ -110,6 +127,17 @@ public class HoodieClusteringConfig extends HoodieConfig {
.sinceVersion("0.9.0")
.withDocumentation("Number of partitions to skip from latest when
choosing partitions to create ClusteringPlan");
+ public static final ConfigProperty<ClusteringPlanPartitionFilterMode>
PLAN_PARTITION_FILTER_MODE_NAME = ConfigProperty
+ .key(PLAN_PARTITION_FILTER_MODE)
+ .defaultValue(ClusteringPlanPartitionFilterMode.NONE)
+ .sinceVersion("0.11.0")
+ .withDocumentation("Partition filter mode used in the creation of
clustering plan. Available values are - "
+ + "NONE: do not filter table partition and thus the clustering plan
will include all partitions that have clustering candidate."
+ + "RECENT_DAYS: keep a continuous range of partitions, worked
together with configs '" + DAYBASED_LOOKBACK_PARTITIONS.key() + "' and '"
+ + PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST.key() + "."
+ + "SELECTED_PARTITIONS: keep partitions that are in the specified
range ['" + PARTITION_FILTER_BEGIN_PARTITION.key() + "', '"
+ + PARTITION_FILTER_END_PARTITION.key() + "'].");
+
public static final ConfigProperty<String>
PLAN_STRATEGY_MAX_BYTES_PER_OUTPUT_FILEGROUP = ConfigProperty
.key(CLUSTERING_STRATEGY_PARAM_PREFIX + "max.bytes.per.group")
.defaultValue(String.valueOf(2 * 1024 * 1024 * 1024L))
@@ -381,6 +409,11 @@ public class HoodieClusteringConfig extends HoodieConfig {
return this;
}
+ public Builder
withClusteringPlanPartitionFilterMode(ClusteringPlanPartitionFilterMode mode) {
+ clusteringConfig.setValue(PLAN_PARTITION_FILTER_MODE_NAME.key(),
mode.toString());
+ return this;
+ }
+
public Builder withClusteringExecutionStrategyClass(String
runClusteringStrategyClass) {
clusteringConfig.setValue(EXECUTION_STRATEGY_CLASS_NAME,
runClusteringStrategyClass);
return this;
@@ -396,6 +429,16 @@ public class HoodieClusteringConfig extends HoodieConfig {
return this;
}
+ public Builder withClusteringPartitionFilterBeginPartition(String begin) {
+ clusteringConfig.setValue(PARTITION_FILTER_BEGIN_PARTITION, begin);
+ return this;
+ }
+
+ public Builder withClusteringPartitionFilterEndPartition(String end) {
+ clusteringConfig.setValue(PARTITION_FILTER_END_PARTITION, end);
+ return this;
+ }
+
public Builder withClusteringPlanSmallFileLimit(long
clusteringSmallFileLimit) {
clusteringConfig.setValue(PLAN_STRATEGY_SMALL_FILE_LIMIT,
String.valueOf(clusteringSmallFileLimit));
return this;
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 74a17bf..d8e4bd3 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -53,6 +53,7 @@ import org.apache.hudi.keygen.constant.KeyGeneratorType;
import org.apache.hudi.metrics.MetricsReporterType;
import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite;
import org.apache.hudi.table.RandomFileIdPrefixProvider;
+import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode;
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
@@ -1215,6 +1216,19 @@ public class HoodieWriteConfig extends HoodieConfig {
return getString(HoodieClusteringConfig.PLAN_STRATEGY_CLASS_NAME);
}
+ public ClusteringPlanPartitionFilterMode
getClusteringPlanPartitionFilterMode() {
+ String mode =
getString(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME);
+ return ClusteringPlanPartitionFilterMode.valueOf(mode);
+ }
+
+ public String getBeginPartitionForClustering() {
+ return getString(HoodieClusteringConfig.PARTITION_FILTER_BEGIN_PARTITION);
+ }
+
+ public String getEndPartitionForClustering() {
+ return getString(HoodieClusteringConfig.PARTITION_FILTER_END_PARTITION);
+ }
+
public String getClusteringExecutionStrategyClass() {
return getString(HoodieClusteringConfig.EXECUTION_STRATEGY_CLASS_NAME);
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/BaseClusteringPlanActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/BaseClusteringPlanActionExecutor.java
index 8071bfb..a1820ed 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/BaseClusteringPlanActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/BaseClusteringPlanActionExecutor.java
@@ -79,7 +79,8 @@ public abstract class BaseClusteringPlanActionExecutor<T
extends HoodieRecordPay
LOG.info("Generating clustering plan for table " + config.getBasePath());
ClusteringPlanStrategy strategy = (ClusteringPlanStrategy)
- ReflectionUtils.loadClass(config.getClusteringPlanStrategyClass(),
table, context, config);
+
ReflectionUtils.loadClass(ClusteringPlanStrategy.checkAndGetClusteringPlanStrategy(config),
table, context, config);
+
return strategy.generateClusteringPlan();
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilter.java
new file mode 100644
index 0000000..a63eb3b
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilter.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.cluster;
+
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieClusteringException;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Partition filter utilities. Currently, we support three mode:
+ * NONE: skip filter
+ * RECENT DAYS: output recent partition given skip num and days lookback
config
+ * SELECTED_PARTITIONS: output partition falls in the [start, end] condition
+ */
+public class ClusteringPlanPartitionFilter {
+
+ public static List<String> filter(List<String> partitions, HoodieWriteConfig
config) {
+ ClusteringPlanPartitionFilterMode mode =
config.getClusteringPlanPartitionFilterMode();
+ switch (mode) {
+ case NONE:
+ return partitions;
+ case RECENT_DAYS:
+ return recentDaysFilter(partitions, config);
+ case SELECTED_PARTITIONS:
+ return selectedPartitionsFilter(partitions, config);
+ default:
+ throw new HoodieClusteringException("Unknown partition filter, filter
mode: " + mode);
+ }
+ }
+
+ private static List<String> recentDaysFilter(List<String> partitions,
HoodieWriteConfig config) {
+ int targetPartitionsForClustering =
config.getTargetPartitionsForClustering();
+ int skipPartitionsFromLatestForClustering =
config.getSkipPartitionsFromLatestForClustering();
+ return partitions.stream()
+ .sorted(Comparator.reverseOrder())
+ .skip(Math.max(skipPartitionsFromLatestForClustering, 0))
+ .limit(targetPartitionsForClustering > 0 ?
targetPartitionsForClustering : partitions.size())
+ .collect(Collectors.toList());
+ }
+
+ private static List<String> selectedPartitionsFilter(List<String>
partitions, HoodieWriteConfig config) {
+ String beginPartition = config.getBeginPartitionForClustering();
+ String endPartition = config.getEndPartitionForClustering();
+ List<String> filteredPartitions = partitions.stream()
+ .filter(path -> path.compareTo(beginPartition) >= 0 &&
path.compareTo(endPartition) <= 0)
+ .collect(Collectors.toList());
+ return filteredPartitions;
+ }
+}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilterMode.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilterMode.java
new file mode 100644
index 0000000..fbaf797
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilterMode.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.cluster;
+
+/**
+ * Clustering partition filter mode
+ */
+public enum ClusteringPlanPartitionFilterMode {
+ NONE,
+ RECENT_DAYS,
+ SELECTED_PARTITIONS
+}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java
index 273ebce..479f639 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringPlanStrategy.java
@@ -30,8 +30,11 @@ import
org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode;
+
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -56,6 +59,37 @@ public abstract class ClusteringPlanStrategy<T extends
HoodieRecordPayload,I,K,O
private final transient HoodieEngineContext engineContext;
private final HoodieWriteConfig writeConfig;
+ /**
+ * Check if the given class is deprecated.
+ * If it is, then try to convert it to suitable one and update the write
config accordingly.
+ * @param config write config
+ * @return class name of clustering plan strategy
+ */
+ public static String checkAndGetClusteringPlanStrategy(HoodieWriteConfig
config) {
+ String className = config.getClusteringPlanStrategyClass();
+ String sparkSizeBasedClassName =
HoodieClusteringConfig.SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY;
+ String sparkSelectedPartitionsClassName =
"org.apache.hudi.client.clustering.plan.strategy.SparkSelectedPartitionsClusteringPlanStrategy";
+ String sparkRecentDaysClassName =
"org.apache.hudi.client.clustering.plan.strategy.SparkRecentDaysClusteringPlanStrategy";
+ String javaSelectedPartitionClassName =
"org.apache.hudi.client.clustering.plan.strategy.JavaRecentDaysClusteringPlanStrategy";
+ String javaSizeBasedClassName =
HoodieClusteringConfig.JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY;
+
+ String logStr = "The clustering plan '%s' is deprecated. Please set the
plan as '%s' and set '%s' as '%s' to achieve the same behaviour";
+ if (sparkRecentDaysClassName.equals(className)) {
+ config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME,
ClusteringPlanPartitionFilterMode.RECENT_DAYS.name());
+ LOG.warn(String.format(logStr, className, sparkSizeBasedClassName,
HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(),
ClusteringPlanPartitionFilterMode.RECENT_DAYS.name()));
+ return sparkSizeBasedClassName;
+ } else if (sparkSelectedPartitionsClassName.equals(className)) {
+ config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME,
ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name());
+ LOG.warn(String.format(logStr, className, sparkSizeBasedClassName,
HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(),
ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name()));
+ return sparkSizeBasedClassName;
+ } else if (javaSelectedPartitionClassName.equals(className)) {
+ config.setValue(HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME,
ClusteringPlanPartitionFilterMode.RECENT_DAYS.name());
+ LOG.warn(String.format(logStr, className, javaSizeBasedClassName,
HoodieClusteringConfig.PLAN_PARTITION_FILTER_MODE_NAME.key(),
ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS.name()));
+ return javaSizeBasedClassName;
+ }
+ return className;
+ }
+
public ClusteringPlanStrategy(HoodieTable table, HoodieEngineContext
engineContext, HoodieWriteConfig writeConfig) {
this.writeConfig = writeConfig;
this.hoodieTable = table;
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
index 4d91636..7246810 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
@@ -29,6 +29,8 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilter;
+
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -56,7 +58,9 @@ public abstract class PartitionAwareClusteringPlanStrategy<T
extends HoodieRecor
* Return list of partition paths to be considered for clustering.
*/
protected List<String> filterPartitionPaths(List<String> partitionPaths) {
- return partitionPaths;
+ List<String> filteredPartitions =
ClusteringPlanPartitionFilter.filter(partitionPaths, getWriteConfig());
+ LOG.debug("Filtered to the following partitions: " + filteredPartitions);
+ return filteredPartitions;
}
@Override
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/cluster/strategy/TestClusteringPlanStrategyConfigCompatibility.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/cluster/strategy/TestClusteringPlanStrategyConfigCompatibility.java
new file mode 100644
index 0000000..34626a8
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/cluster/strategy/TestClusteringPlanStrategyConfigCompatibility.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.cluster.strategy;
+
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode;
+import org.apache.hudi.table.action.cluster.strategy.ClusteringPlanStrategy;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.stream.Stream;
+
+public class TestClusteringPlanStrategyConfigCompatibility {
+
+ private static Stream<Arguments> configParams() {
+ /**
+ * (user specified class, converted class, filter mode)
+ */
+ Object[][] data = new Object[][] {
+
{"org.apache.hudi.client.clustering.plan.strategy.SparkRecentDaysClusteringPlanStrategy",
+
"org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy",
+ ClusteringPlanPartitionFilterMode.RECENT_DAYS},
+
{"org.apache.hudi.client.clustering.plan.strategy.SparkSelectedPartitionsClusteringPlanStrategy",
+
"org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy",
+ ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS},
+
{"org.apache.hudi.client.clustering.plan.strategy.JavaRecentDaysClusteringPlanStrategy",
+
"org.apache.hudi.client.clustering.plan.strategy.JavaSizeBasedClusteringPlanStrategy",
+ ClusteringPlanPartitionFilterMode.RECENT_DAYS}
+ };
+ return Stream.of(data).map(Arguments::of);
+ }
+
+ @ParameterizedTest()
+ @MethodSource("configParams")
+ public void testCheckAndGetClusteringPlanStrategy(String oldClass, String
newClass, ClusteringPlanPartitionFilterMode mode) {
+ HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+ .withPath("")
+ .withClusteringConfig(HoodieClusteringConfig.newBuilder()
+ .withClusteringPlanStrategyClass(oldClass)
+ .build())
+ .build();
+
+ Assertions.assertEquals(newClass,
ClusteringPlanStrategy.checkAndGetClusteringPlanStrategy(config));
+ Assertions.assertEquals(mode,
config.getClusteringPlanPartitionFilterMode());
+ }
+}
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaRecentDaysClusteringPlanStrategy.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaRecentDaysClusteringPlanStrategy.java
deleted file mode 100644
index 6d9b2ee..0000000
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaRecentDaysClusteringPlanStrategy.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hudi.client.clustering.plan.strategy;
-
-import org.apache.hudi.client.common.HoodieJavaEngineContext;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.HoodieJavaCopyOnWriteTable;
-import org.apache.hudi.table.HoodieJavaMergeOnReadTable;
-
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-
-import java.util.Comparator;
-import java.util.List;
-import java.util.stream.Collectors;
-
-/**
- * Clustering Strategy that only looks at latest
'daybased.lookback.partitions' partitions
- * for Java engine.
- */
-public class JavaRecentDaysClusteringPlanStrategy<T extends
HoodieRecordPayload<T>>
- extends JavaSizeBasedClusteringPlanStrategy<T> {
- private static final Logger LOG =
LogManager.getLogger(JavaRecentDaysClusteringPlanStrategy.class);
-
- public JavaRecentDaysClusteringPlanStrategy(HoodieJavaCopyOnWriteTable<T>
table,
- HoodieJavaEngineContext
engineContext,
- HoodieWriteConfig writeConfig) {
- super(table, engineContext, writeConfig);
- }
-
- public JavaRecentDaysClusteringPlanStrategy(HoodieJavaMergeOnReadTable<T>
table,
- HoodieJavaEngineContext
engineContext,
- HoodieWriteConfig writeConfig) {
- super(table, engineContext, writeConfig);
- }
-
- @Override
- protected List<String> filterPartitionPaths(List<String> partitionPaths) {
- int targetPartitionsForClustering =
getWriteConfig().getTargetPartitionsForClustering();
- int skipPartitionsFromLatestForClustering =
getWriteConfig().getSkipPartitionsFromLatestForClustering();
- return partitionPaths.stream()
- .sorted(Comparator.reverseOrder())
- .skip(Math.max(skipPartitionsFromLatestForClustering, 0))
- .limit(targetPartitionsForClustering > 0 ?
targetPartitionsForClustering : partitionPaths.size())
- .collect(Collectors.toList());
- }
-}
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaSizeBasedClusteringPlanStrategy.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaSizeBasedClusteringPlanStrategy.java
index 9052f03..ec7202f 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaSizeBasedClusteringPlanStrategy.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaSizeBasedClusteringPlanStrategy.java
@@ -114,11 +114,6 @@ public class JavaSizeBasedClusteringPlanStrategy<T extends
HoodieRecordPayload<T
}
@Override
- protected List<String> filterPartitionPaths(List<String> partitionPaths) {
- return partitionPaths;
- }
-
- @Override
protected Stream<FileSlice> getFileSlicesEligibleForClustering(final String
partition) {
return super.getFileSlicesEligibleForClustering(partition)
// Only files that have basefile size smaller than small file size are
eligible.
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkRecentDaysClusteringPlanStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkRecentDaysClusteringPlanStrategy.java
deleted file mode 100644
index ad19824..0000000
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkRecentDaysClusteringPlanStrategy.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.client.clustering.plan.strategy;
-
-import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
-import org.apache.hudi.table.HoodieSparkMergeOnReadTable;
-
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-
-import java.util.Comparator;
-import java.util.List;
-import java.util.stream.Collectors;
-
-/**
- * Clustering Strategy that only looks at latest
'daybased.lookback.partitions' partitions.
- */
-public class SparkRecentDaysClusteringPlanStrategy<T extends
HoodieRecordPayload<T>>
- extends SparkSizeBasedClusteringPlanStrategy<T> {
- private static final Logger LOG =
LogManager.getLogger(SparkRecentDaysClusteringPlanStrategy.class);
-
- public SparkRecentDaysClusteringPlanStrategy(HoodieSparkCopyOnWriteTable<T>
table,
- HoodieSparkEngineContext
engineContext,
- HoodieWriteConfig writeConfig) {
- super(table, engineContext, writeConfig);
- }
-
- public SparkRecentDaysClusteringPlanStrategy(HoodieSparkMergeOnReadTable<T>
table,
- HoodieSparkEngineContext
engineContext,
- HoodieWriteConfig writeConfig) {
- super(table, engineContext, writeConfig);
- }
-
- @Override
- protected List<String> filterPartitionPaths(List<String> partitionPaths) {
- int targetPartitionsForClustering =
getWriteConfig().getTargetPartitionsForClustering();
- int skipPartitionsFromLatestForClustering =
getWriteConfig().getSkipPartitionsFromLatestForClustering();
- return partitionPaths.stream()
- .sorted(Comparator.reverseOrder())
- .skip(Math.max(skipPartitionsFromLatestForClustering, 0))
- .limit(targetPartitionsForClustering > 0 ?
targetPartitionsForClustering : partitionPaths.size())
- .collect(Collectors.toList());
- }
-}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSelectedPartitionsClusteringPlanStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSelectedPartitionsClusteringPlanStrategy.java
deleted file mode 100644
index 549935d..0000000
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSelectedPartitionsClusteringPlanStrategy.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.client.clustering.plan.strategy;
-
-import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
-import org.apache.hudi.table.HoodieSparkMergeOnReadTable;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-
-import java.util.List;
-import java.util.stream.Collectors;
-
-import static
org.apache.hudi.config.HoodieClusteringConfig.CLUSTERING_STRATEGY_PARAM_PREFIX;
-
-/**
- * Clustering Strategy to filter just specified partitions from [begin, end].
Note both begin and end are inclusive.
- */
-public class SparkSelectedPartitionsClusteringPlanStrategy<T extends
HoodieRecordPayload<T>>
- extends SparkSizeBasedClusteringPlanStrategy<T> {
- private static final Logger LOG =
LogManager.getLogger(SparkSelectedPartitionsClusteringPlanStrategy.class);
-
- public static final String CONF_BEGIN_PARTITION =
CLUSTERING_STRATEGY_PARAM_PREFIX + "cluster.begin.partition";
- public static final String CONF_END_PARTITION =
CLUSTERING_STRATEGY_PARAM_PREFIX + "cluster.end.partition";
-
- public
SparkSelectedPartitionsClusteringPlanStrategy(HoodieSparkCopyOnWriteTable<T>
table,
-
HoodieSparkEngineContext engineContext,
- HoodieWriteConfig
writeConfig) {
- super(table, engineContext, writeConfig);
- }
-
- public
SparkSelectedPartitionsClusteringPlanStrategy(HoodieSparkMergeOnReadTable<T>
table,
-
HoodieSparkEngineContext engineContext,
- HoodieWriteConfig
writeConfig) {
- super(table, engineContext, writeConfig);
- }
-
- @Override
- protected List<String> filterPartitionPaths(List<String> partitionPaths) {
- String beginPartition =
getWriteConfig().getProps().getProperty(CONF_BEGIN_PARTITION);
- String endPartition =
getWriteConfig().getProps().getProperty(CONF_END_PARTITION);
- List<String> filteredPartitions = partitionPaths.stream()
- .filter(path -> path.compareTo(beginPartition) >= 0 &&
path.compareTo(endPartition) <= 0)
- .collect(Collectors.toList());
- LOG.info("Filtered to the following partitions: " + filteredPartitions);
- return filteredPartitions;
- }
-}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSingleFileSortPlanStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSingleFileSortPlanStrategy.java
index b98dbac..88c3057 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSingleFileSortPlanStrategy.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSingleFileSortPlanStrategy.java
@@ -48,6 +48,7 @@ public class SparkSingleFileSortPlanStrategy<T extends
HoodieRecordPayload<T>>
super(table, engineContext, writeConfig);
}
+ @Override
protected Stream<HoodieClusteringGroup>
buildClusteringGroupsForPartition(String partitionPath, List<FileSlice>
fileSlices) {
List<Pair<List<FileSlice>, Integer>> fileSliceGroups = fileSlices.stream()
.map(fileSlice -> Pair.of(Collections.singletonList(fileSlice),
1)).collect(Collectors.toList());
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSizeBasedClusteringPlanStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSizeBasedClusteringPlanStrategy.java
index 7295118..b38931c 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSizeBasedClusteringPlanStrategy.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkSizeBasedClusteringPlanStrategy.java
@@ -116,11 +116,6 @@ public class SparkSizeBasedClusteringPlanStrategy<T
extends HoodieRecordPayload<
}
@Override
- protected List<String> filterPartitionPaths(List<String> partitionPaths) {
- return partitionPaths;
- }
-
- @Override
protected Stream<FileSlice> getFileSlicesEligibleForClustering(final String
partition) {
return super.getFileSlicesEligibleForClustering(partition)
// Only files that have basefile size smaller than small file size are
eligible.
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/clustering/plan/strategy/TestSparkRecentDaysClusteringPlanStrategy.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/clustering/plan/strategy/TestSparkRecentDaysClusteringPlanStrategy.java
deleted file mode 100644
index a9c7b71..0000000
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/clustering/plan/strategy/TestSparkRecentDaysClusteringPlanStrategy.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.client.clustering.plan.strategy;
-
-import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.config.HoodieClusteringConfig;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.mockito.Mock;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertSame;
-
-public class TestSparkRecentDaysClusteringPlanStrategy {
- @Mock
- HoodieSparkCopyOnWriteTable table;
- @Mock
- HoodieSparkEngineContext context;
- HoodieWriteConfig hoodieWriteConfig;
-
- @BeforeEach
- public void setUp() {
- this.hoodieWriteConfig = HoodieWriteConfig
- .newBuilder()
- .withPath("Fake_Table_Path")
- .withClusteringConfig(HoodieClusteringConfig
- .newBuilder()
- .withClusteringSkipPartitionsFromLatest(1)
- .withClusteringTargetPartitions(1)
- .build())
- .build();
- }
-
- @Test
- public void testFilterPartitionPaths() {
- SparkRecentDaysClusteringPlanStrategy sg = new
SparkRecentDaysClusteringPlanStrategy(table, context, hoodieWriteConfig);
- ArrayList<String> fakeTimeBasedPartitionsPath = new ArrayList<>();
- fakeTimeBasedPartitionsPath.add("20210718");
- fakeTimeBasedPartitionsPath.add("20210716");
- fakeTimeBasedPartitionsPath.add("20210719");
- List list = sg.filterPartitionPaths(fakeTimeBasedPartitionsPath);
- assertEquals(1, list.size());
- assertSame("20210718", list.get(0));
- }
-}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestSparkClusteringPlanPartitionFilter.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestSparkClusteringPlanPartitionFilter.java
new file mode 100644
index 0000000..a68a9e3
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestSparkClusteringPlanPartitionFilter.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.cluster.strategy;
+
+import
org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
+import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertSame;
+
+public class TestSparkClusteringPlanPartitionFilter {
+ @Mock
+ HoodieSparkCopyOnWriteTable table;
+ @Mock
+ HoodieSparkEngineContext context;
+ HoodieWriteConfig.Builder hoodieWriteConfigBuilder;
+
+ @BeforeEach
+ public void setUp() {
+ this.hoodieWriteConfigBuilder = HoodieWriteConfig
+ .newBuilder()
+ .withPath("Fake_Table_Path");
+ }
+
+ @Test
+ public void testFilterPartitionNoFilter() {
+ HoodieWriteConfig config =
hoodieWriteConfigBuilder.withClusteringConfig(HoodieClusteringConfig.newBuilder()
+
.withClusteringPlanPartitionFilterMode(ClusteringPlanPartitionFilterMode.NONE)
+ .build())
+ .build();
+
+ PartitionAwareClusteringPlanStrategy sg = new
SparkSizeBasedClusteringPlanStrategy(table, context, config);
+ ArrayList<String> fakeTimeBasedPartitionsPath = new ArrayList<>();
+ fakeTimeBasedPartitionsPath.add("20210718");
+ fakeTimeBasedPartitionsPath.add("20210716");
+ fakeTimeBasedPartitionsPath.add("20210719");
+ List list = sg.filterPartitionPaths(fakeTimeBasedPartitionsPath);
+ assertEquals(3, list.size());
+ }
+
+ @Test
+ public void testFilterPartitionRecentDays() {
+ HoodieWriteConfig config =
hoodieWriteConfigBuilder.withClusteringConfig(HoodieClusteringConfig.newBuilder()
+ .withClusteringSkipPartitionsFromLatest(1)
+ .withClusteringTargetPartitions(1)
+
.withClusteringPlanPartitionFilterMode(ClusteringPlanPartitionFilterMode.RECENT_DAYS)
+ .build())
+ .build();
+
+ PartitionAwareClusteringPlanStrategy sg = new
SparkSizeBasedClusteringPlanStrategy(table, context, config);
+ ArrayList<String> fakeTimeBasedPartitionsPath = new ArrayList<>();
+ fakeTimeBasedPartitionsPath.add("20210718");
+ fakeTimeBasedPartitionsPath.add("20210716");
+ fakeTimeBasedPartitionsPath.add("20210719");
+ List list = sg.filterPartitionPaths(fakeTimeBasedPartitionsPath);
+ assertEquals(1, list.size());
+ assertSame("20210718", list.get(0));
+ }
+
+ @Test
+ public void testFilterPartitionSelectedPartitions() {
+ HoodieWriteConfig config =
hoodieWriteConfigBuilder.withClusteringConfig(HoodieClusteringConfig.newBuilder()
+ .withClusteringPartitionFilterBeginPartition("20211222")
+ .withClusteringPartitionFilterEndPartition("20211223")
+
.withClusteringPlanPartitionFilterMode(ClusteringPlanPartitionFilterMode.SELECTED_PARTITIONS)
+ .build())
+ .build();
+
+ PartitionAwareClusteringPlanStrategy sg = new
SparkSizeBasedClusteringPlanStrategy(table, context, config);
+ ArrayList<String> fakeTimeBasedPartitionsPath = new ArrayList<>();
+ fakeTimeBasedPartitionsPath.add("20211220");
+ fakeTimeBasedPartitionsPath.add("20211221");
+ fakeTimeBasedPartitionsPath.add("20211222");
+ fakeTimeBasedPartitionsPath.add("20211224");
+ List list = sg.filterPartitionPaths(fakeTimeBasedPartitionsPath);
+ assertEquals(1, list.size());
+ assertSame("20211222", list.get(0));
+ }
+}