codope commented on code in PR #6581:
URL: https://github.com/apache/hudi/pull/6581#discussion_r1002393516


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java:
##########
@@ -148,13 +151,26 @@ public class HoodieClusteringConfig extends HoodieConfig {
       .key(PLAN_PARTITION_FILTER_MODE)
       .defaultValue(ClusteringPlanPartitionFilterMode.NONE)
       .sinceVersion("0.11.0")
-      .withDocumentation("Partition filter mode used in the creation of 
clustering plan. Available values are - "
-          + "NONE: do not filter table partition and thus the clustering plan 
will include all partitions that have clustering candidate."
-          + "RECENT_DAYS: keep a continuous range of partitions, worked 
together with configs '" + DAYBASED_LOOKBACK_PARTITIONS.key() + "' and '"
+      .withDocumentation("Partition Filter mode used in the creation of 
clustering plan. Available values are - "
+          + "NONE: do not filter anything and thus the clustering plan will 
include all file slices that have clustering candidate."
+          + "RECENT: keep a continuous range of partitions, worked together 
with configs '" + DAYBASED_LOOKBACK_PARTITIONS.key() + "' and '"

Review Comment:
   this is still `RECENT_DAYS` right?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java:
##########
@@ -83,12 +100,37 @@ public Option<HoodieClusteringPlan> 
generateClusteringPlan() {
       return Option.empty();
     }
 
+    Option<HoodieDefaultTimeline> toFilterTimeline = Option.empty();
+    Option<HoodieInstant> latestCompletedInstant = Option.empty();
+    // if filtering based on file slices are enabled, we need to pass the 
toFilter timeline below to assist in filtering.
+    if (config.getClusteringPlanFilterMode() != ClusteringPlanFilterMode.NONE) 
{
+      Option<HoodieInstant> latestScheduledReplaceCommit = 
metaClient.getActiveTimeline()
+          .filter(instant -> 
instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION))
+          .filter(instant -> instant.isRequested()).lastInstant();
+      if (latestScheduledReplaceCommit.isPresent()) {
+        HoodieClusteringPlan clusteringPlan = 
ClusteringUtils.getClusteringPlan(
+                metaClient, 
HoodieTimeline.getReplaceCommitRequestedInstant(latestScheduledReplaceCommit.get().getTimestamp()))
+            .map(Pair::getRight).orElseThrow(() -> new 
HoodieClusteringException(
+                "Unable to read clustering plan for instant: " + 
latestScheduledReplaceCommit.get().getTimestamp()));
+        if 
(!StringUtils.isNullOrEmpty(clusteringPlan.getLatestCompletedInstant())) {
+          toFilterTimeline = Option.of((HoodieDefaultTimeline)
+              metaClient.getActiveTimeline().filter(instant -> 
HoodieTimeline.compareTimestamps(instant.getTimestamp(), 
HoodieTimeline.GREATER_THAN, clusteringPlan.getLatestCompletedInstant())));
+        }
+      } else {
+        // if last clustering was archived
+        toFilterTimeline = Option.of(metaClient.getActiveTimeline());
+      }
+      latestCompletedInstant = 
metaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
+    }
+
+    Option<HoodieDefaultTimeline> finalToFilterTimeline = toFilterTimeline;
     List<HoodieClusteringGroup> clusteringGroups = getEngineContext()
         .flatMap(
             partitionPaths,
             partitionPath -> {
               List<FileSlice> fileSlicesEligible = 
getFileSlicesEligibleForClustering(partitionPath).collect(Collectors.toList());
-              return buildClusteringGroupsForPartition(partitionPath, 
fileSlicesEligible).limit(getWriteConfig().getClusteringMaxNumGroups());
+              List<FileSlice> filteredFileSlices = 
filterFileSlices(fileSlicesEligible, finalToFilterTimeline, 
config.getClusteringPlanFilterMode());

Review Comment:
   this follows partition filtering.. should we disable one when the other is 
enabled otherwise it creates complicated combinations, e.g. let's say when 
users want certain partitions to be clustered using SELECTED_PARTITIONS 
starategy no matter whether they were part of recent commits or not?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java:
##########
@@ -61,6 +62,8 @@ public class HoodieClusteringConfig extends HoodieConfig {
       
"org.apache.hudi.client.clustering.run.strategy.JavaSortAndSizeExecutionStrategy";
   public static final String PLAN_PARTITION_FILTER_MODE =
       "hoodie.clustering.plan.partition.filter.mode";
+  public static final String PLAN_FILTER_MODE =

Review Comment:
   Is additional config really necessary? Can we fold this into the existing 
config? Though I like the name of the new config, both configs serve a common 
purpose of filtering (in different ways), and lesser the configs the better. If 
we have to define a new config, then let's rename it to  
`hoodie.clustering.plan.filegroup.filter.mode` just to be more explicit and 
differentiate from the existing partition filter config.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to