nsivabalan commented on a change in pull request #3646:
URL: https://github.com/apache/hudi/pull/3646#discussion_r790390998



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
##########
@@ -535,6 +540,11 @@ public Builder retainCommits(int commitsRetained) {
       return this;
     }
 
+    public Builder retainNumberOfHours(int cleanerHoursRetained) {

Review comment:
       cleanerNumHoursRetained

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
##########
@@ -330,6 +349,19 @@ public CleanPlanner(HoodieEngineContext context, 
HoodieTable<T, I, K, O> hoodieT
     }
     return deletePaths;
   }
+
+  /**
+   * This method finds the files to be cleaned based on the number of hours. 
If {@code config.getCleanerHoursRetained()} is set to 5,
+   * all the files with commit time earlier than 5 hours will be removed. Also 
the latest file for any file group is retained.
+   * This policy gives much more flexibility to users for retaining data for 
running incremental queries as compared to
+   * KEEP_LATEST_COMMITS cleaning policy. The default number of hours is 5.
+   * @param partitionPath partition path to check
+   * @return list of files to clean
+   */
+  private List<CleanFileInfo> getFilesToCleanKeepingLatestHours(String 
partitionPath) {
+    int commitsToRetain = 0;

Review comment:
       do we need this. if not, can we remove. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
##########
@@ -76,6 +76,11 @@
       .withDocumentation("Number of commits to retain, without cleaning. This 
will be retained for num_of_commits * time_between_commits "
           + "(scheduled). This also directly translates into how much data 
retention the table supports for incremental queries.");
 
+  public static final ConfigProperty<String> CLEANER_HOURS_RETAINED = 
ConfigProperty.key("hoodie.cleaner.hours.retained")
+          .defaultValue("24")
+          .withDocumentation("Number of hours for which commits need to be 
retained. This config provides a more flexible option as"
+          + "compared to number of commits retained for cleaning service");

Review comment:
       I feel this policy is not standalone in itself. For eg, 
numCommitsRetained and numFileVersionsRetained are self contained. But here, we 
follow similar logic we do for retain num commits. I mean, we clean up commits 
and not file versions. If you think about it, in addition to configuring hours 
retained, probably users have to choose whether they want to clean up files 
based on commits or based on file versions. 
   
   Let me know wdyt. 
   Either ways, we need to add more documentation here. 

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
##########
@@ -1240,6 +1244,154 @@ public void testKeepLatestCommits(boolean 
simulateFailureRetry, boolean enableIn
     assertTrue(testTable.baseFileExists(p0, "00000000000005", file3P0C2));
   }
 
+  /**
+   * Test cleaning policy based on number of hours retained policy. This test 
case covers the case when files will not be cleaned.
+   */
+  @ParameterizedTest
+  @MethodSource("argumentsForTestKeepLatestCommits")
+  public void testKeepXHoursNoCleaning(boolean simulateFailureRetry, boolean 
enableIncrementalClean, boolean enableBootstrapSourceClean) throws Exception {

Review comment:
       sg

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
##########
@@ -402,9 +436,16 @@ private String 
getLatestVersionBeforeCommit(List<FileSlice> fileSliceList, Hoodi
   public Option<HoodieInstant> getEarliestCommitToRetain() {
     Option<HoodieInstant> earliestCommitToRetain = Option.empty();
     int commitsRetained = config.getCleanerCommitsRetained();
+    int hoursRetained = config.getCleanerHoursRetained();
     if (config.getCleanerPolicy() == HoodieCleaningPolicy.KEEP_LATEST_COMMITS
         && commitTimeline.countInstants() > commitsRetained) {
-      earliestCommitToRetain = 
commitTimeline.nthInstant(commitTimeline.countInstants() - commitsRetained);
+      earliestCommitToRetain = 
commitTimeline.nthInstant(commitTimeline.countInstants() - commitsRetained); 
//15 instants total, 10 commits to retain, this gives 6th instant in the list
+    } else if (config.getCleanerPolicy() == 
HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) {
+      Instant instant = Instant.now();
+      ZonedDateTime commitDateTime = ZonedDateTime.ofInstant(instant, 
ZoneId.systemDefault());
+      String retainInstantsAfter = 
HoodieActiveTimeline.formatDate(Date.from(commitDateTime.minusHours(hoursRetained).toInstant()));

Review comment:
       retainInstantsAfter : may be we can rename this to earliestTimeToRetain. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
##########
@@ -299,14 +308,24 @@ public CleanPlanner(HoodieEngineContext context, 
HoodieTable<T, I, K, O> hoodieT
             // do not clean up a savepoint data file
             continue;
           }
-          // Dont delete the latest commit and also the last commit before the 
earliest commit we
-          // are retaining
-          // The window of commit retain == max query run time. So a query 
could be running which
-          // still
-          // uses this file.
-          if (fileCommitTime.equals(lastVersion) || 
(fileCommitTime.equals(lastVersionBeforeEarliestCommitToRetain))) {
-            // move on to the next file
-            continue;
+
+          if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) {
+            // Dont delete the latest commit and also the last commit before 
the earliest commit we
+            // are retaining
+            // The window of commit retain == max query run time. So a query 
could be running which
+            // still
+            // uses this file.
+            if (fileCommitTime.equals(lastVersion) || 
(fileCommitTime.equals(lastVersionBeforeEarliestCommitToRetain))) {
+              // move on to the next file
+              continue;
+            }
+          } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) {
+            // This block corresponds to KEEP_LATEST_BY_HOURS policy
+            // Do not delete the latest commit.
+            if (fileCommitTime.equals(lastVersion)) {

Review comment:
       Do we know why do have 
(fileCommitTime.equals(lastVersionBeforeEarliestCommitToRetain) in L318. I 
would prefer to keep the same logic unless we have strong reasons to remove. As 
I mentioned, cleaning based on num hours is just a diff way to conveying retain 
N commits. So, lets try to keep the logic similar across both. Easier to 
maintain. 
   

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
##########
@@ -1267,6 +1271,154 @@ public void testKeepLatestCommits(boolean 
simulateFailureRetry, boolean enableIn
     assertTrue(testTable.baseFileExists(p0, "00000000000005", file3P0C2));
   }
 
+  /**
+   * Test cleaning policy based on number of hours retained policy. This test 
case covers the case when files will not be cleaned.
+   */
+  @ParameterizedTest
+  @MethodSource("argumentsForTestKeepLatestCommits")
+  public void testKeepXHoursNoCleaning(boolean simulateFailureRetry, boolean 
enableIncrementalClean, boolean enableBootstrapSourceClean) throws Exception {
+    HoodieWriteConfig config = 
HoodieWriteConfig.newBuilder().withPath(basePath)
+            
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
+            .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+                    .withIncrementalCleaningMode(enableIncrementalClean)
+                    
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER)
+                    
.withCleanBootstrapBaseFileEnabled(enableBootstrapSourceClean)
+                    
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS).retainNumberOfHours(2).build())
+            .build();
+
+    HoodieTestTable testTable = HoodieTestTable.of(metaClient);
+    String p0 = "2020/01/01";
+    String p1 = "2020/01/02";
+    Map<String, List<BootstrapFileMapping>> bootstrapMapping = 
enableBootstrapSourceClean ? generateBootstrapIndexAndSourceData(p0, p1) : null;
+
+    String file1P0C0 = enableBootstrapSourceClean ? 
bootstrapMapping.get(p0).get(0).getFileId()
+            : UUID.randomUUID().toString();
+    String file1P1C0 = enableBootstrapSourceClean ? 
bootstrapMapping.get(p1).get(0).getFileId()
+            : UUID.randomUUID().toString();
+    Instant instant = Instant.now();
+    ZonedDateTime commitDateTime = ZonedDateTime.ofInstant(instant, 
ZoneId.systemDefault());
+    int minutesForFirstCommit = 90;
+    String firstCommitTs = 
HoodieActiveTimeline.formatDate(Date.from(commitDateTime.minusMinutes(minutesForFirstCommit).toInstant()));
+    testTable.addInflightCommit(firstCommitTs).withBaseFilesInPartition(p0, 
file1P0C0).withBaseFilesInPartition(p1, file1P1C0);
+
+    HoodieCommitMetadata commitMetadata = generateCommitMetadata(
+            Collections.unmodifiableMap(new HashMap<String, List<String>>() {
+              {
+                put(p0, CollectionUtils.createImmutableList(file1P0C0));
+                put(p1, CollectionUtils.createImmutableList(file1P1C0));
+              }
+            })
+    );
+    metaClient.getActiveTimeline().saveAsComplete(
+            new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, 
firstCommitTs),
+            
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+
+    List<HoodieCleanStat> hoodieCleanStatsOne = runCleaner(config, 
simulateFailureRetry);
+    assertEquals(0, hoodieCleanStatsOne.size(), "Must not scan any partitions 
and clean any files");
+    assertTrue(testTable.baseFileExists(p0, firstCommitTs, file1P0C0));
+    assertTrue(testTable.baseFileExists(p1, firstCommitTs, file1P1C0));
+
+    // make next commit, with 1 insert & 1 update per partition
+    int minutesForSecondCommit = 40;
+    String secondCommitTs = 
HoodieActiveTimeline.formatDate(Date.from(commitDateTime.minusMinutes(minutesForSecondCommit).toInstant()));
+    Map<String, String> partitionAndFileId002 = 
testTable.addInflightCommit(secondCommitTs).getFileIdsWithBaseFilesInPartitions(p0,
 p1);
+    String file2P0C1 = partitionAndFileId002.get(p0);
+    String file2P1C1 = partitionAndFileId002.get(p1);
+    testTable.forCommit(secondCommitTs).withBaseFilesInPartition(p0, 
file1P0C0).withBaseFilesInPartition(p1, file1P1C0);
+    commitMetadata = generateCommitMetadata(new HashMap<String, 
List<String>>() {
+      {
+        put(p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1));
+        put(p1, CollectionUtils.createImmutableList(file1P1C0, file2P1C1));
+      }
+    });
+    metaClient.getActiveTimeline().saveAsComplete(
+            new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, 
secondCommitTs),
+            
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+    List<HoodieCleanStat> hoodieCleanStatsTwo = runCleaner(config, 
simulateFailureRetry);
+    assertEquals(0, hoodieCleanStatsTwo.size(), "Must not scan any partitions 
and clean any files");
+    assertTrue(testTable.baseFileExists(p0, secondCommitTs, file2P0C1));
+    assertTrue(testTable.baseFileExists(p1, secondCommitTs, file2P1C1));
+    assertTrue(testTable.baseFileExists(p0, secondCommitTs, file1P0C0));
+    assertTrue(testTable.baseFileExists(p1, secondCommitTs, file1P1C0));
+  }

Review comment:
       wondering do we need the next test. can't we add another commit here so 
that first commit is eligible for cleaning and do some assertions. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
##########
@@ -402,9 +436,16 @@ private String 
getLatestVersionBeforeCommit(List<FileSlice> fileSliceList, Hoodi
   public Option<HoodieInstant> getEarliestCommitToRetain() {
     Option<HoodieInstant> earliestCommitToRetain = Option.empty();
     int commitsRetained = config.getCleanerCommitsRetained();
+    int hoursRetained = config.getCleanerHoursRetained();
     if (config.getCleanerPolicy() == HoodieCleaningPolicy.KEEP_LATEST_COMMITS
         && commitTimeline.countInstants() > commitsRetained) {
-      earliestCommitToRetain = 
commitTimeline.nthInstant(commitTimeline.countInstants() - commitsRetained);
+      earliestCommitToRetain = 
commitTimeline.nthInstant(commitTimeline.countInstants() - commitsRetained); 
//15 instants total, 10 commits to retain, this gives 6th instant in the list
+    } else if (config.getCleanerPolicy() == 
HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) {
+      Instant instant = Instant.now();
+      ZonedDateTime commitDateTime = ZonedDateTime.ofInstant(instant, 
ZoneId.systemDefault());

Review comment:
       commitDateTime: may be we can rename to currentDateTime. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to