This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 506447f  [HUDI-850] Avoid unnecessary listings in incremental cleaning 
mode (#1576)
506447f is described below

commit 506447fd4fde4cd922f7aa8f4e17a7f06666dc97
Author: Balaji Varadarajan <[email protected]>
AuthorDate: Fri May 1 21:37:21 2020 -0700

    [HUDI-850] Avoid unnecessary listings in incremental cleaning mode (#1576)
---
 .../hudi/table/action/clean/CleanPlanner.java      | 87 ++++++++++++++++------
 .../java/org/apache/hudi/table/TestCleaner.java    | 43 ++++++-----
 .../hudi/utilities/TestHoodieSnapshotExporter.java |  3 -
 3 files changed, 85 insertions(+), 48 deletions(-)

diff --git 
a/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
 
b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
index 9f14e6c..815b41d 100644
--- 
a/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
+++ 
b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
@@ -39,7 +39,6 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
-
 import org.apache.hudi.exception.HoodieSavepointException;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.log4j.LogManager;
@@ -48,6 +47,7 @@ import org.apache.log4j.Logger;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -111,36 +111,77 @@ public class CleanPlanner<T extends 
HoodieRecordPayload<T>> implements Serializa
    * @throws IOException when underlying file-system throws this exception
    */
   public List<String> getPartitionPathsToClean(Option<HoodieInstant> 
newInstantToRetain) throws IOException {
-    if (config.incrementalCleanerModeEnabled() && 
newInstantToRetain.isPresent()
-        && (HoodieCleaningPolicy.KEEP_LATEST_COMMITS == 
config.getCleanerPolicy())) {
-      Option<HoodieInstant> lastClean =
-          
hoodieTable.getCleanTimeline().filterCompletedInstants().lastInstant();
+    switch (config.getCleanerPolicy()) {
+      case KEEP_LATEST_COMMITS:
+        return getPartitionPathsForCleanByCommits(newInstantToRetain);
+      case KEEP_LATEST_FILE_VERSIONS:
+        return getPartitionPathsForFullCleaning();
+      default:
+        throw new IllegalStateException("Unknown Cleaner Policy");
+    }
+  }
+
+  /**
+   * Return partition paths for cleaning by commits mode.
+   * @param instantToRetain Earliest Instant to retain
+   * @return list of partitions
+   * @throws IOException
+   */
+  private List<String> 
getPartitionPathsForCleanByCommits(Option<HoodieInstant> instantToRetain) 
throws IOException {
+    if (!instantToRetain.isPresent()) {
+      LOG.info("No earliest commit to retain. No need to scan partitions !!");
+      return Collections.emptyList();
+    }
+
+    if (config.incrementalCleanerModeEnabled()) {
+      Option<HoodieInstant> lastClean = 
hoodieTable.getCleanTimeline().filterCompletedInstants().lastInstant();
       if (lastClean.isPresent()) {
         HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils
             
.deserializeHoodieCleanMetadata(hoodieTable.getActiveTimeline().getInstantDetails(lastClean.get()).get());
         if ((cleanMetadata.getEarliestCommitToRetain() != null)
             && (cleanMetadata.getEarliestCommitToRetain().length() > 0)) {
-          LOG.warn("Incremental Cleaning mode is enabled. Looking up 
partition-paths that have since changed "
-              + "since last cleaned at " + 
cleanMetadata.getEarliestCommitToRetain()
-              + ". New Instant to retain : " + newInstantToRetain);
-          return hoodieTable.getCompletedCommitsTimeline().getInstants()
-              .filter(instant ->
-                  HoodieTimeline.compareTimestamps(instant.getTimestamp(), 
HoodieTimeline.GREATER_THAN_OR_EQUALS, 
cleanMetadata.getEarliestCommitToRetain())
-                  && HoodieTimeline.compareTimestamps(instant.getTimestamp(), 
HoodieTimeline.LESSER_THAN, newInstantToRetain.get().getTimestamp())
-              ).flatMap(instant -> {
-                try {
-                  HoodieCommitMetadata commitMetadata = 
HoodieCommitMetadata.fromBytes(hoodieTable.getActiveTimeline().getInstantDetails(instant).get(),
 HoodieCommitMetadata.class);
-                  return 
commitMetadata.getPartitionToWriteStats().keySet().stream();
-                } catch (IOException e) {
-                  throw new HoodieIOException(e.getMessage(), e);
-                }
-              }).distinct().collect(Collectors.toList());
+          return getPartitionPathsForIncrementalCleaning(cleanMetadata, 
instantToRetain);
         }
       }
     }
-    // Otherwise go to brute force mode of scanning all partitions
-    return FSUtils.getAllPartitionPaths(hoodieTable.getMetaClient().getFs(),
-        hoodieTable.getMetaClient().getBasePath(), 
config.shouldAssumeDatePartitioning());
+    return getPartitionPathsForFullCleaning();
+  }
+
+  /**
+   * Use Incremental Mode for finding partition paths.
+   * @param cleanMetadata
+   * @param newInstantToRetain
+   * @return
+   */
+  private List<String> 
getPartitionPathsForIncrementalCleaning(HoodieCleanMetadata cleanMetadata,
+      Option<HoodieInstant> newInstantToRetain) {
+    LOG.warn("Incremental Cleaning mode is enabled. Looking up partition-paths 
that have since changed "
+        + "since last cleaned at " + cleanMetadata.getEarliestCommitToRetain()
+        + ". New Instant to retain : " + newInstantToRetain);
+    return hoodieTable.getCompletedCommitsTimeline().getInstants().filter(
+        instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), 
HoodieTimeline.GREATER_THAN_OR_EQUALS,
+            cleanMetadata.getEarliestCommitToRetain()) && 
HoodieTimeline.compareTimestamps(instant.getTimestamp(),
+            HoodieTimeline.LESSER_THAN, 
newInstantToRetain.get().getTimestamp())).flatMap(instant -> {
+              try {
+                HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
+                    
.fromBytes(hoodieTable.getActiveTimeline().getInstantDetails(instant).get(),
+                        HoodieCommitMetadata.class);
+                return 
commitMetadata.getPartitionToWriteStats().keySet().stream();
+              } catch (IOException e) {
+                throw new HoodieIOException(e.getMessage(), e);
+              }
+            }).distinct().collect(Collectors.toList());
+  }
+
+  /**
+   * Scan and list all paritions for cleaning.
+   * @return all partitions paths for the dataset.
+   * @throws IOException
+   */
+  private List<String> getPartitionPathsForFullCleaning() throws IOException {
+    // Go to brute force mode of scanning all partitions
+    return FSUtils.getAllPartitionPaths(hoodieTable.getMetaClient().getFs(), 
hoodieTable.getMetaClient().getBasePath(),
+        config.shouldAssumeDatePartitioning());
   }
 
   /**
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java 
b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java
index ad05890..296530b 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java
@@ -104,7 +104,8 @@ public class TestCleaner extends TestHoodieClientBase {
    */
   private void insertFirstBigBatchForClientCleanerTest(HoodieWriteConfig cfg, 
HoodieWriteClient client,
       Function2<List<HoodieRecord>, String, Integer> recordGenFunction,
-      Function3<JavaRDD<WriteStatus>, HoodieWriteClient, 
JavaRDD<HoodieRecord>, String> insertFn) throws Exception {
+      Function3<JavaRDD<WriteStatus>, HoodieWriteClient, 
JavaRDD<HoodieRecord>, String> insertFn,
+      HoodieCleaningPolicy cleaningPolicy) throws Exception {
 
     /*
      * do a big insert (this is basically same as insert part of upsert, just 
adding it here so we can catch breakages
@@ -127,11 +128,15 @@ public class TestCleaner extends TestHoodieClientBase {
     HoodieTable table = HoodieTable.create(metaClient, client.getConfig(), 
jsc);
 
     assertFalse(table.getCompletedCommitsTimeline().empty());
-    String instantTime = 
table.getCompletedCommitsTimeline().getInstants().findFirst().get().getTimestamp();
-    assertFalse(table.getCompletedCleanTimeline().empty());
-    assertEquals(instantTime,
-        
table.getCompletedCleanTimeline().getInstants().findFirst().get().getTimestamp(),
-        "The clean instant should be the same as the commit instant");
+    if (cleaningPolicy.equals(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)) {
+      // We no longer write empty cleaner plans when there are not enough 
commits present
+      assertTrue(table.getCompletedCleanTimeline().empty());
+    } else {
+      String instantTime = 
table.getCompletedCommitsTimeline().getInstants().findFirst().get().getTimestamp();
+      assertFalse(table.getCompletedCleanTimeline().empty());
+      assertEquals(instantTime, 
table.getCompletedCleanTimeline().getInstants().findFirst().get().getTimestamp(),
+          "The clean instant should be the same as the commit instant");
+    }
 
     HoodieIndex index = HoodieIndex.createIndex(cfg, jsc);
     List<HoodieRecord> taggedRecords = 
index.tagLocation(jsc.parallelize(records, 1), jsc, table).collect();
@@ -201,7 +206,8 @@ public class TestCleaner extends TestHoodieClientBase {
       final Function2<List<HoodieRecord>, String, Integer> 
recordUpsertGenWrappedFunction =
           generateWrapRecordsFn(isPreppedAPI, cfg, 
dataGen::generateUniqueUpdates);
 
-      insertFirstBigBatchForClientCleanerTest(cfg, client, 
recordInsertGenWrappedFunction, insertFn);
+      insertFirstBigBatchForClientCleanerTest(cfg, client, 
recordInsertGenWrappedFunction, insertFn,
+          HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS);
 
       Map<HoodieFileGroupId, FileSlice> compactionFileIdToLatestFileSlice = 
new HashMap<>();
       metaClient = HoodieTableMetaClient.reload(metaClient);
@@ -349,7 +355,7 @@ public class TestCleaner extends TestHoodieClientBase {
     int maxCommits = 3; // keep upto 3 commits from the past
     HoodieWriteConfig cfg = getConfigBuilder()
         .withCompactionConfig(HoodieCompactionConfig.newBuilder()
-            
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainCommits(maxCommits).build())
+            
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(maxCommits).build())
         .withParallelism(1, 
1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1)
         
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
         .build();
@@ -361,7 +367,8 @@ public class TestCleaner extends TestHoodieClientBase {
     final Function2<List<HoodieRecord>, String, Integer> 
recordUpsertGenWrappedFunction =
         generateWrapRecordsFn(isPreppedAPI, cfg, 
dataGen::generateUniqueUpdates);
 
-    insertFirstBigBatchForClientCleanerTest(cfg, client, 
recordInsertGenWrappedFunction, insertFn);
+    insertFirstBigBatchForClientCleanerTest(cfg, client, 
recordInsertGenWrappedFunction, insertFn,
+        HoodieCleaningPolicy.KEEP_LATEST_COMMITS);
 
     // Keep doing some writes and clean inline. Make sure we have expected 
number of files remaining.
     HoodieTestUtils.monotonicIncreasingCommitTimestamps(8, 
1).forEach(newCommitTime -> {
@@ -376,7 +383,9 @@ public class TestCleaner extends TestHoodieClientBase {
         metaClient = HoodieTableMetaClient.reload(metaClient);
         HoodieTable table1 = HoodieTable.create(metaClient, cfg, jsc);
         HoodieTimeline activeTimeline = table1.getCompletedCommitsTimeline();
-        Option<HoodieInstant> earliestRetainedCommit = 
activeTimeline.nthFromLastInstant(maxCommits - 1);
+        // NOTE: See CleanPlanner#getFilesToCleanKeepingLatestCommits. We 
explicitly keep one commit before earliest
+        // commit
+        Option<HoodieInstant> earliestRetainedCommit = 
activeTimeline.nthFromLastInstant(maxCommits);
         Set<HoodieInstant> acceptableCommits = 
activeTimeline.getInstants().collect(Collectors.toSet());
         if (earliestRetainedCommit.isPresent()) {
           acceptableCommits
@@ -751,12 +760,7 @@ public class TestCleaner extends TestHoodieClientBase {
     metaClient = HoodieTableMetaClient.reload(metaClient);
 
     List<HoodieCleanStat> hoodieCleanStatsOne = runCleaner(config, 
simulateFailureRetry);
-    assertEquals(0,
-        getCleanStat(hoodieCleanStatsOne, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles()
-            .size(), "Must not clean any files");
-    assertEquals(0,
-        getCleanStat(hoodieCleanStatsOne, 
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).getSuccessDeleteFiles()
-            .size(), "Must not clean any files");
+    assertEquals(0, hoodieCleanStatsOne.size(), "Must not scan any partitions 
and clean any files");
     assertTrue(HoodieTestUtils.doesDataFileExist(basePath, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000",
         file1P0C0));
     assertTrue(HoodieTestUtils.doesDataFileExist(basePath, 
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000",
@@ -786,12 +790,7 @@ public class TestCleaner extends TestHoodieClientBase {
         new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "001"),
         
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
     List<HoodieCleanStat> hoodieCleanStatsTwo = runCleaner(config, 
simulateFailureRetry);
-    assertEquals(0,
-        getCleanStat(hoodieCleanStatsTwo, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).getSuccessDeleteFiles()
-            .size(), "Must not clean any files");
-    assertEquals(0,
-        getCleanStat(hoodieCleanStatsTwo, 
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH).getSuccessDeleteFiles()
-            .size(), "Must not clean any files");
+    assertEquals(0, hoodieCleanStatsTwo.size(), "Must not scan any partitions 
and clean any files");
     assertTrue(HoodieTestUtils.doesDataFileExist(basePath, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001",
         file2P0C1));
     assertTrue(HoodieTestUtils.doesDataFileExist(basePath, 
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "001",
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieSnapshotExporter.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieSnapshotExporter.java
index fda192f..087a283 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieSnapshotExporter.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieSnapshotExporter.java
@@ -139,9 +139,6 @@ public class TestHoodieSnapshotExporter extends 
HoodieClientTestHarness {
       new HoodieSnapshotExporter().export(jsc, cfg);
 
       // Check results
-      assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + 
".clean")));
-      assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + 
".clean.inflight")));
-      assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + 
".clean.requested")));
       assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + 
".commit")));
       assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + 
".commit.requested")));
       assertTrue(dfs.exists(new Path(targetPath + "/.hoodie/" + COMMIT_TIME + 
".inflight")));

Reply via email to