bvaradar commented on a change in pull request #1576:
URL: https://github.com/apache/incubator-hudi/pull/1576#discussion_r418875613



##########
File path: 
hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
##########
@@ -111,36 +111,74 @@ public CleanPlanner(HoodieTable<T> hoodieTable, 
HoodieWriteConfig config) {
    * @throws IOException when underlying file-system throws this exception
    */
   public List<String> getPartitionPathsToClean(Option<HoodieInstant> 
newInstantToRetain) throws IOException {
-    if (config.incrementalCleanerModeEnabled() && 
newInstantToRetain.isPresent()
-        && (HoodieCleaningPolicy.KEEP_LATEST_COMMITS == 
config.getCleanerPolicy())) {
-      Option<HoodieInstant> lastClean =
-          
hoodieTable.getCleanTimeline().filterCompletedInstants().lastInstant();
+    switch (config.getCleanerPolicy()) {
+      case KEEP_LATEST_COMMITS: return 
getPartitionPathsForCleanByCommits(newInstantToRetain);
+      case KEEP_LATEST_FILE_VERSIONS: return scanAllPartitionsForCleaning();
+      default: throw new IllegalStateException("Unknown Cleaner Policy");
+    }
+  }
+
+  /**
+   * Return partition paths for cleaning by commits mode.
+   * @param instantToRetain Earliest Instant to retain
+   * @return list of partitions
+   * @throws IOException
+   */
+  private List<String> 
getPartitionPathsForCleanByCommits(Option<HoodieInstant> instantToRetain) 
throws IOException {
+    if (!instantToRetain.isPresent() && 
(HoodieCleaningPolicy.KEEP_LATEST_COMMITS == config.getCleanerPolicy())) {
+      LOG.info("No earliest commit to retain. No need to scan partitions !!");
+      return Collections.emptyList();
+    }
+
+    if (config.incrementalCleanerModeEnabled()) {
+      Option<HoodieInstant> lastClean = 
hoodieTable.getCleanTimeline().filterCompletedInstants().lastInstant();
       if (lastClean.isPresent()) {
         HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils
             
.deserializeHoodieCleanMetadata(hoodieTable.getActiveTimeline().getInstantDetails(lastClean.get()).get());
         if ((cleanMetadata.getEarliestCommitToRetain() != null)
             && (cleanMetadata.getEarliestCommitToRetain().length() > 0)) {
-          LOG.warn("Incremental Cleaning mode is enabled. Looking up 
partition-paths that have since changed "
-              + "since last cleaned at " + 
cleanMetadata.getEarliestCommitToRetain()
-              + ". New Instant to retain : " + newInstantToRetain);
-          return hoodieTable.getCompletedCommitsTimeline().getInstants()
-              .filter(instant ->
-                  HoodieTimeline.compareTimestamps(instant.getTimestamp(), 
HoodieTimeline.GREATER_THAN_OR_EQUALS, 
cleanMetadata.getEarliestCommitToRetain())
-                  && HoodieTimeline.compareTimestamps(instant.getTimestamp(), 
HoodieTimeline.LESSER_THAN, newInstantToRetain.get().getTimestamp())
-              ).flatMap(instant -> {
-                try {
-                  HoodieCommitMetadata commitMetadata = 
HoodieCommitMetadata.fromBytes(hoodieTable.getActiveTimeline().getInstantDetails(instant).get(),
 HoodieCommitMetadata.class);
-                  return 
commitMetadata.getPartitionToWriteStats().keySet().stream();
-                } catch (IOException e) {
-                  throw new HoodieIOException(e.getMessage(), e);
-                }
-              }).distinct().collect(Collectors.toList());
+          return 
getPartitionPathsForCleaningUsingIncrementalMode(cleanMetadata, 
instantToRetain);
         }
       }
     }
-    // Otherwise go to brute force mode of scanning all partitions
-    return FSUtils.getAllPartitionPaths(hoodieTable.getMetaClient().getFs(),
-        hoodieTable.getMetaClient().getBasePath(), 
config.shouldAssumeDatePartitioning());
+    return scanAllPartitionsForCleaning();
+  }
+
+  /**
+   * Use Incremental Mode for finding partition paths.
+   * @param cleanMetadata
+   * @param newInstantToRetain
+   * @return
+   */
+  private List<String> 
getPartitionPathsForCleaningUsingIncrementalMode(HoodieCleanMetadata 
cleanMetadata,
+      Option<HoodieInstant> newInstantToRetain) {
+    LOG.warn("Incremental Cleaning mode is enabled. Looking up partition-paths 
that have since changed "
+        + "since last cleaned at " + cleanMetadata.getEarliestCommitToRetain()
+        + ". New Instant to retain : " + newInstantToRetain);
+    return hoodieTable.getCompletedCommitsTimeline().getInstants().filter(
+        instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), 
HoodieTimeline.GREATER_THAN_OR_EQUALS,
+            cleanMetadata.getEarliestCommitToRetain()) && 
HoodieTimeline.compareTimestamps(instant.getTimestamp(),
+            HoodieTimeline.LESSER_THAN, 
newInstantToRetain.get().getTimestamp())).flatMap(instant -> {
+              try {
+                HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
+                    
.fromBytes(hoodieTable.getActiveTimeline().getInstantDetails(instant).get(),
+                        HoodieCommitMetadata.class);
+                return 
commitMetadata.getPartitionToWriteStats().keySet().stream();
+              } catch (IOException e) {
+                throw new HoodieIOException(e.getMessage(), e);
+              }
+            }).distinct().collect(Collectors.toList());
+  }
+
+  /**
+   * Scan and list all paritions for cleaning.
+   * @return all partitions paths for the dataset.
+   * @throws IOException
+   */
+  private List<String> scanAllPartitionsForCleaning() throws IOException {

Review comment:
       Done

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
##########
@@ -111,36 +111,74 @@ public CleanPlanner(HoodieTable<T> hoodieTable, 
HoodieWriteConfig config) {
    * @throws IOException when underlying file-system throws this exception
    */
   public List<String> getPartitionPathsToClean(Option<HoodieInstant> 
newInstantToRetain) throws IOException {
-    if (config.incrementalCleanerModeEnabled() && 
newInstantToRetain.isPresent()
-        && (HoodieCleaningPolicy.KEEP_LATEST_COMMITS == 
config.getCleanerPolicy())) {
-      Option<HoodieInstant> lastClean =
-          
hoodieTable.getCleanTimeline().filterCompletedInstants().lastInstant();
+    switch (config.getCleanerPolicy()) {
+      case KEEP_LATEST_COMMITS: return 
getPartitionPathsForCleanByCommits(newInstantToRetain);
+      case KEEP_LATEST_FILE_VERSIONS: return scanAllPartitionsForCleaning();
+      default: throw new IllegalStateException("Unknown Cleaner Policy");
+    }
+  }
+
+  /**
+   * Return partition paths for cleaning by commits mode.
+   * @param instantToRetain Earliest Instant to retain
+   * @return list of partitions
+   * @throws IOException
+   */
+  private List<String> 
getPartitionPathsForCleanByCommits(Option<HoodieInstant> instantToRetain) 
throws IOException {
+    if (!instantToRetain.isPresent() && 
(HoodieCleaningPolicy.KEEP_LATEST_COMMITS == config.getCleanerPolicy())) {
+      LOG.info("No earliest commit to retain. No need to scan partitions !!");
+      return Collections.emptyList();
+    }
+
+    if (config.incrementalCleanerModeEnabled()) {
+      Option<HoodieInstant> lastClean = 
hoodieTable.getCleanTimeline().filterCompletedInstants().lastInstant();
       if (lastClean.isPresent()) {
         HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils
             
.deserializeHoodieCleanMetadata(hoodieTable.getActiveTimeline().getInstantDetails(lastClean.get()).get());
         if ((cleanMetadata.getEarliestCommitToRetain() != null)
             && (cleanMetadata.getEarliestCommitToRetain().length() > 0)) {
-          LOG.warn("Incremental Cleaning mode is enabled. Looking up 
partition-paths that have since changed "
-              + "since last cleaned at " + 
cleanMetadata.getEarliestCommitToRetain()
-              + ". New Instant to retain : " + newInstantToRetain);
-          return hoodieTable.getCompletedCommitsTimeline().getInstants()
-              .filter(instant ->
-                  HoodieTimeline.compareTimestamps(instant.getTimestamp(), 
HoodieTimeline.GREATER_THAN_OR_EQUALS, 
cleanMetadata.getEarliestCommitToRetain())
-                  && HoodieTimeline.compareTimestamps(instant.getTimestamp(), 
HoodieTimeline.LESSER_THAN, newInstantToRetain.get().getTimestamp())
-              ).flatMap(instant -> {
-                try {
-                  HoodieCommitMetadata commitMetadata = 
HoodieCommitMetadata.fromBytes(hoodieTable.getActiveTimeline().getInstantDetails(instant).get(),
 HoodieCommitMetadata.class);
-                  return 
commitMetadata.getPartitionToWriteStats().keySet().stream();
-                } catch (IOException e) {
-                  throw new HoodieIOException(e.getMessage(), e);
-                }
-              }).distinct().collect(Collectors.toList());
+          return 
getPartitionPathsForCleaningUsingIncrementalMode(cleanMetadata, 
instantToRetain);
         }
       }
     }
-    // Otherwise go to brute force mode of scanning all partitions
-    return FSUtils.getAllPartitionPaths(hoodieTable.getMetaClient().getFs(),
-        hoodieTable.getMetaClient().getBasePath(), 
config.shouldAssumeDatePartitioning());
+    return scanAllPartitionsForCleaning();
+  }
+
+  /**
+   * Use Incremental Mode for finding partition paths.
+   * @param cleanMetadata
+   * @param newInstantToRetain
+   * @return
+   */
+  private List<String> 
getPartitionPathsForCleaningUsingIncrementalMode(HoodieCleanMetadata 
cleanMetadata,

Review comment:
       done

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
##########
@@ -111,36 +111,74 @@ public CleanPlanner(HoodieTable<T> hoodieTable, 
HoodieWriteConfig config) {
    * @throws IOException when underlying file-system throws this exception
    */
   public List<String> getPartitionPathsToClean(Option<HoodieInstant> 
newInstantToRetain) throws IOException {
-    if (config.incrementalCleanerModeEnabled() && 
newInstantToRetain.isPresent()
-        && (HoodieCleaningPolicy.KEEP_LATEST_COMMITS == 
config.getCleanerPolicy())) {
-      Option<HoodieInstant> lastClean =
-          
hoodieTable.getCleanTimeline().filterCompletedInstants().lastInstant();
+    switch (config.getCleanerPolicy()) {
+      case KEEP_LATEST_COMMITS: return 
getPartitionPathsForCleanByCommits(newInstantToRetain);

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to