This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ee5bb7c2e36 [HUDI-6370] Automatically decide archival boundary based 
on cleaning configs (#8972)
ee5bb7c2e36 is described below

commit ee5bb7c2e36960cb4a0889b7363cbcb164b4a941
Author: Y Ethan Guo <[email protected]>
AuthorDate: Wed Jun 14 19:51:48 2023 -0700

    [HUDI-6370] Automatically decide archival boundary based on cleaning 
configs (#8972)
---
 .../apache/hudi/client/HoodieTimelineArchiver.java |  93 +++++++++++++-----
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  13 +--
 .../hudi/table/action/clean/CleanPlanner.java      |  42 ++------
 .../apache/hudi/io/TestHoodieTimelineArchiver.java | 107 ++++++++++++++-------
 .../org/apache/hudi/common/util/CleanerUtils.java  |  42 +++++++-
 5 files changed, 199 insertions(+), 98 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
index 0a6659cb1e7..74eeeaf22f4 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
@@ -48,6 +48,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
 import org.apache.hudi.common.table.timeline.TimelineUtils;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.util.CleanerUtils;
 import org.apache.hudi.common.util.ClusteringUtils;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.CompactionUtils;
@@ -75,11 +76,9 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.text.ParseException;
 import java.time.Instant;
-import java.time.ZonedDateTime;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -88,10 +87,15 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
-import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS;
 import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.compareTimestamps;
+import static org.apache.hudi.config.HoodieArchivalConfig.MAX_COMMITS_TO_KEEP;
+import static org.apache.hudi.config.HoodieArchivalConfig.MIN_COMMITS_TO_KEEP;
+import static 
org.apache.hudi.config.HoodieCleanConfig.CLEANER_COMMITS_RETAINED;
+import static 
org.apache.hudi.config.HoodieCleanConfig.CLEANER_FILE_VERSIONS_RETAINED;
+import static org.apache.hudi.config.HoodieCleanConfig.CLEANER_HOURS_RETAINED;
+import static org.apache.hudi.config.HoodieCleanConfig.CLEANER_POLICY;
 
 /**
  * Archiver to bound the growth of files under .hoodie meta path.
@@ -114,9 +118,69 @@ public class HoodieTimelineArchiver<T extends 
HoodieAvroPayload, I, K, O> {
     this.table = table;
     this.metaClient = table.getMetaClient();
     this.archiveFilePath = 
HoodieArchivedTimeline.getArchiveLogPath(metaClient.getArchivePath());
-    this.maxInstantsToKeep = config.getMaxCommitsToKeep();
-    this.minInstantsToKeep = config.getMinCommitsToKeep();
     this.txnManager = new TransactionManager(config, 
table.getMetaClient().getFs());
+
+    Option<HoodieInstant> earliestCommitToRetain = Option.empty();
+    HoodieTimeline completedCommitsTimeline = 
table.getCompletedCommitsTimeline();
+    Option<HoodieInstant> latestCommit = 
completedCommitsTimeline.lastInstant();
+    HoodieCleaningPolicy cleanerPolicy = config.getCleanerPolicy();
+    int cleanerCommitsRetained = config.getCleanerCommitsRetained();
+    int cleanerHoursRetained = config.getCleanerHoursRetained();
+
+    try {
+      earliestCommitToRetain = CleanerUtils.getEarliestCommitToRetain(
+          metaClient.getActiveTimeline().getCommitsTimeline(),
+          cleanerPolicy,
+          cleanerCommitsRetained,
+          latestCommit.isPresent()
+              ? 
HoodieActiveTimeline.parseDateFromInstantTime(latestCommit.get().getTimestamp()).toInstant()
+              : Instant.now(),
+          cleanerHoursRetained,
+          metaClient.getTableConfig().getTimelineTimezone());
+    } catch (ParseException e) {
+      LOG.warn("Error parsing instant time: " + 
latestCommit.get().getTimestamp());
+    }
+
+    int configuredMinInstantsToKeep = config.getMinCommitsToKeep();
+    int configuredMaxInstantsToKeep = config.getMaxCommitsToKeep();
+    if (earliestCommitToRetain.isPresent()) {
+      int minInstantsToKeepBasedOnCleaning =
+          
completedCommitsTimeline.findInstantsAfter(earliestCommitToRetain.get().getTimestamp())
+              .countInstants() + 2;
+      if (configuredMinInstantsToKeep < minInstantsToKeepBasedOnCleaning) {
+        this.maxInstantsToKeep = minInstantsToKeepBasedOnCleaning
+            + configuredMaxInstantsToKeep - configuredMinInstantsToKeep;
+        this.minInstantsToKeep = minInstantsToKeepBasedOnCleaning;
+        LOG.warn("The configured archival configs {}={} is more aggressive 
than the cleaning "
+                + "configs as the earliest commit to retain is {}. Adjusted 
the archival configs "
+                + "to be {}={} and {}={}",
+            MIN_COMMITS_TO_KEEP.key(), configuredMinInstantsToKeep, 
earliestCommitToRetain.get(),
+            MIN_COMMITS_TO_KEEP.key(), this.minInstantsToKeep,
+            MAX_COMMITS_TO_KEEP.key(), this.maxInstantsToKeep);
+        switch (cleanerPolicy) {
+          case KEEP_LATEST_COMMITS:
+            LOG.warn("Cleaning configs: {}=KEEP_LATEST_COMMITS {}={}", 
CLEANER_POLICY.key(),
+                CLEANER_COMMITS_RETAINED.key(), cleanerCommitsRetained);
+            break;
+          case KEEP_LATEST_BY_HOURS:
+            LOG.warn("Cleaning configs: {}=KEEP_LATEST_BY_HOURS {}={}", 
CLEANER_POLICY.key(),
+                CLEANER_HOURS_RETAINED.key(), cleanerHoursRetained);
+            break;
+          case KEEP_LATEST_FILE_VERSIONS:
+            LOG.warn("Cleaning configs: {}=CLEANER_FILE_VERSIONS_RETAINED 
{}={}", CLEANER_POLICY.key(),
+                CLEANER_FILE_VERSIONS_RETAINED.key(), 
config.getCleanerFileVersionsRetained());
+            break;
+          default:
+            break;
+        }
+      } else {
+        this.maxInstantsToKeep = configuredMaxInstantsToKeep;
+        this.minInstantsToKeep = configuredMinInstantsToKeep;
+      }
+    } else {
+      this.maxInstantsToKeep = configuredMaxInstantsToKeep;
+      this.minInstantsToKeep = configuredMinInstantsToKeep;
+    }
   }
 
   private Writer openWriter() {
@@ -492,24 +556,7 @@ public class HoodieTimelineArchiver<T extends 
HoodieAvroPayload, I, K, O> {
                       HoodieTimeline.compareTimestamps(s.getTimestamp(), 
LESSER_THAN, instantToRetain.getTimestamp()))
                   .orElse(true)
           );
-      List<HoodieInstant> instantsToArchive = 
instantToArchiveStream.limit(commitTimeline.countInstants() - 
minInstantsToKeep).collect(Collectors.toList());
-      // If cleaner is based on hours, lets ensure hudi does not archive 
commits yet to cleaned by the cleaner.
-      if (config.getCleanerPolicy() == 
HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS && !instantsToArchive.isEmpty()) {
-        String latestCommitToArchive = 
instantsToArchive.get(instantsToArchive.size() - 1).getTimestamp();
-        try {
-          Instant latestCommitInstant = 
HoodieActiveTimeline.parseDateFromInstantTime(commitTimeline.lastInstant().get().getTimestamp()).toInstant();
-          ZonedDateTime currentDateTime = 
ZonedDateTime.ofInstant(latestCommitInstant, 
metaClient.getTableConfig().getTimelineTimezone().getZoneId());
-          String earliestTimeToRetain = 
HoodieActiveTimeline.formatDate(Date.from(currentDateTime.minusHours(config.getCleanerHoursRetained()).toInstant()));
-          if (HoodieTimeline.compareTimestamps(latestCommitToArchive, 
GREATER_THAN_OR_EQUALS, earliestTimeToRetain)) {
-            throw new HoodieIOException("Please align your archival configs 
based on cleaner configs. 'hoodie.keep.min.commits' : "
-                + config.getMinCommitsToKeep() + " + should be greater than "
-                + " 'hoodie.cleaner.hours.retained' : " + 
config.getCleanerHoursRetained());
-          }
-        } catch (ParseException e) {
-          throw new HoodieIOException("Failed to parse latest commit instant 
time " + commitTimeline.lastInstant().get().getTimestamp() + e.getMessage());
-        }
-      }
-      return instantsToArchive.stream();
+      return instantToArchiveStream.limit(commitTimeline.countInstants() - 
minInstantsToKeep);
     } else {
       return Stream.empty();
     }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 4307015aed6..cd81c4fda07 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -3103,12 +3103,13 @@ public class HoodieWriteConfig extends HoodieConfig {
                 "Increase %s=%d to be greater than %s=%d.",
                 HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.key(), 
maxInstantsToKeep,
                 HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(), 
minInstantsToKeep));
-        checkArgument(minInstantsToKeep > cleanerCommitsRetained,
-            String.format(
-                "Increase %s=%d to be greater than %s=%d. Otherwise, there is 
risk of incremental pull "
-                    + "missing data from few instants.",
-                HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(), 
minInstantsToKeep,
-                HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(), 
cleanerCommitsRetained));
+        if (minInstantsToKeep <= cleanerCommitsRetained) {
+          LOG.warn("Increase {}={} to be greater than {}={} (there is risk of 
incremental pull "
+                  + "missing data from few instants based on the current 
configuration). "
+                  + "The Hudi archiver will automatically adjust the 
configuration regardless.",
+              HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(), 
minInstantsToKeep,
+              HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(), 
cleanerCommitsRetained);
+        }
       }
 
       boolean inlineCompact = 
writeConfig.getBoolean(HoodieCompactionConfig.INLINE_COMPACT);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
index 751edfded59..d855edc3569 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
@@ -32,7 +32,6 @@ import org.apache.hudi.common.model.HoodieFileGroup;
 import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
-import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
@@ -40,6 +39,7 @@ import 
org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV1Migrati
 import 
org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV2MigrationHandler;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.table.view.SyncableFileSystemView;
+import org.apache.hudi.common.util.CleanerUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -54,10 +54,8 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.io.Serializable;
 import java.time.Instant;
-import java.time.ZonedDateTime;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Date;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -510,37 +508,13 @@ public class CleanPlanner<T, I, K, O> implements 
Serializable {
    * Returns earliest commit to retain based on cleaning policy.
    */
   public Option<HoodieInstant> getEarliestCommitToRetain() {
-    Option<HoodieInstant> earliestCommitToRetain = Option.empty();
-    int commitsRetained = config.getCleanerCommitsRetained();
-    int hoursRetained = config.getCleanerHoursRetained();
-    if (config.getCleanerPolicy() == HoodieCleaningPolicy.KEEP_LATEST_COMMITS
-        && commitTimeline.countInstants() > commitsRetained) {
-      Option<HoodieInstant> earliestPendingCommits = 
hoodieTable.getMetaClient()
-          .getActiveTimeline()
-          .getCommitsTimeline()
-          .filter(s -> !s.isCompleted()).firstInstant();
-      if (earliestPendingCommits.isPresent()) {
-        // Earliest commit to retain must not be later than the earliest 
pending commit
-        earliestCommitToRetain =
-            commitTimeline.nthInstant(commitTimeline.countInstants() - 
commitsRetained).map(nthInstant -> {
-              if (nthInstant.compareTo(earliestPendingCommits.get()) <= 0) {
-                return Option.of(nthInstant);
-              } else {
-                return 
commitTimeline.findInstantsBefore(earliestPendingCommits.get().getTimestamp()).lastInstant();
-              }
-            }).orElse(Option.empty());
-      } else {
-        earliestCommitToRetain = 
commitTimeline.nthInstant(commitTimeline.countInstants()
-            - commitsRetained); //15 instants total, 10 commits to retain, 
this gives 6th instant in the list
-      }
-    } else if (config.getCleanerPolicy() == 
HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) {
-      Instant instant = Instant.now();
-      ZonedDateTime currentDateTime = ZonedDateTime.ofInstant(instant, 
hoodieTable.getMetaClient().getTableConfig().getTimelineTimezone().getZoneId());
-      String earliestTimeToRetain = 
HoodieActiveTimeline.formatDate(Date.from(currentDateTime.minusHours(hoursRetained).toInstant()));
-      earliestCommitToRetain = 
Option.fromJavaOptional(commitTimeline.getInstantsAsStream().filter(i -> 
HoodieTimeline.compareTimestamps(i.getTimestamp(),
-              HoodieTimeline.GREATER_THAN_OR_EQUALS, 
earliestTimeToRetain)).findFirst());
-    }
-    return earliestCommitToRetain;
+    return CleanerUtils.getEarliestCommitToRetain(
+        hoodieTable.getMetaClient().getActiveTimeline().getCommitsTimeline(),
+        config.getCleanerPolicy(),
+        config.getCleanerCommitsRetained(),
+        Instant.now(),
+        config.getCleanerHoursRetained(),
+        hoodieTable.getMetaClient().getTableConfig().getTimelineTimezone());
   }
 
   /**
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
index b049c60b9e7..0399bf14242 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
@@ -54,7 +54,6 @@ import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieLockConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.metadata.HoodieTableMetadataWriter;
 import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
@@ -66,7 +65,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
@@ -324,17 +322,24 @@ public class TestHoodieTimelineArchiver extends 
HoodieClientTestHarness {
     }
   }
 
-  @Test
-  public void testArchivalWithCleanBasedOnHours() throws Exception {
+  @ParameterizedTest
+  @ValueSource(strings = {"KEEP_LATEST_BY_HOURS", "KEEP_LATEST_COMMITS"})
+  public void testArchivalWithAutoAdjustmentBasedOnCleanConfigs(String 
cleaningPolicy) throws Exception {
+    // This test verifies that when the archival configs are more aggressive 
than the cleaning
+    // configs, the archiver adjust the min and max commits to keep 
automatically based on the
+    // cleaning configs.
     init();
     HoodieWriteConfig config = 
HoodieWriteConfig.newBuilder().withPath(basePath)
+        .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+            .withRemoteServerPort(timelineServicePort).build())
         
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
         .withCleanConfig(HoodieCleanConfig.newBuilder()
             
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER)
-            
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS).cleanerNumHoursRetained(1)
-            .retainCommits(2)
+            .withCleanerPolicy(HoodieCleaningPolicy.valueOf(cleaningPolicy))
+            .cleanerNumHoursRetained(1)
+            .retainCommits(5)
             .build())
-        
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(3,4).build())
+        
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(3, 
4).build())
         .build();
 
     HoodieTestTable testTable = HoodieTestTable.of(metaClient);
@@ -342,42 +347,56 @@ public class TestHoodieTimelineArchiver extends 
HoodieClientTestHarness {
     String p1 = "2020/01/02";
     Instant instant = Instant.now();
     ZonedDateTime commitDateTime = ZonedDateTime.ofInstant(instant, 
ZoneId.systemDefault());
-
-    triggerCommit(p0, p1, commitDateTime, 5, testTable);
-    triggerCommit(p0, p1, commitDateTime, 10, testTable);
-    triggerCommit(p0, p1, commitDateTime, 20, testTable);
-    triggerCommit(p0, p1, commitDateTime, 30, testTable);
-    triggerCommit(p0, p1, commitDateTime, 40, testTable);
+    List<HoodieInstant> expectedAllCommits = new ArrayList<>();
+
+    // The following commits should be archived
+    expectedAllCommits.add(triggerCommit(p0, p1, commitDateTime, 90, true, 
testTable));
+    expectedAllCommits.add(triggerCommit(p0, p1, commitDateTime, 80, true, 
testTable));
+    expectedAllCommits.add(triggerCommit(p0, p1, commitDateTime, 70, true, 
testTable));
+    // The following commits should not be archived
+    expectedAllCommits.add(triggerCommit(p0, p1, commitDateTime, 50, true, 
testTable));
+    expectedAllCommits.add(triggerCommit(p0, p1, commitDateTime, 45, true, 
testTable));
+    expectedAllCommits.add(triggerCommit(p0, p1, commitDateTime, 40, true, 
testTable));
+    expectedAllCommits.add(triggerCommit(p0, p1, commitDateTime, 30, false, 
testTable));
+    expectedAllCommits.add(triggerCommit(p0, p1, commitDateTime, 20, false, 
testTable));
+    expectedAllCommits.add(triggerCommit(p0, p1, commitDateTime, 10, true, 
testTable));
+    expectedAllCommits.add(triggerCommit(p0, p1, commitDateTime, 5, true, 
testTable));
 
     metaClient = HoodieTableMetaClient.reload(metaClient);
-    // lets trigger archival. should fail since archival configs does not 
align w/ cleaner configs.
-    try {
-      archiveAndGetCommitsList(config);
-      Assertions.fail("Should have failed archival since archival configs are 
not aligned with cleaner configs. ");
-    } catch (HoodieIOException e) {
-      assertTrue(e.getMessage().contains("Please align your archival configs 
based on cleaner configs"));
-    }
+    Pair<List<HoodieInstant>, List<HoodieInstant>> commitsList =
+        archiveAndGetCommitsList(config, true);
+    List<HoodieInstant> originalCommits = commitsList.getKey();
+    List<HoodieInstant> commitsAfterArchival = commitsList.getValue();
+    assertInstantListEquals(expectedAllCommits, originalCommits);
+    assertInstantListEquals(
+        expectedAllCommits.subList(2, expectedAllCommits.size()), 
commitsAfterArchival);
   }
 
-  private void triggerCommit(String p0, String p1, ZonedDateTime curDateTime, 
int minutesForCommit, HoodieTestTable testTable) throws Exception {
+  private HoodieInstant triggerCommit(
+      String p0, String p1, ZonedDateTime curDateTime, int minutesForCommit,
+      boolean isComplete, HoodieTestTable testTable) throws Exception {
 
     String file1P0C0 = UUID.randomUUID().toString();
     String file1P1C0 = UUID.randomUUID().toString();
     String commitTs = 
HoodieActiveTimeline.formatDate(Date.from(curDateTime.minusMinutes(minutesForCommit).toInstant()));
     testTable.addInflightCommit(commitTs).withBaseFilesInPartition(p0, 
file1P0C0).withBaseFilesInPartition(p1, file1P1C0);
 
-    HoodieCommitMetadata commitMetadata = generateCommitMetadata(commitTs,
-        Collections.unmodifiableMap(new HashMap<String, List<String>>() {
-          {
-            put(p0, CollectionUtils.createImmutableList(file1P0C0));
-            put(p1, CollectionUtils.createImmutableList(file1P1C0));
-          }
-        })
-    );
-    metaClient.getActiveTimeline().saveAsComplete(
-        new HoodieInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.COMMIT_ACTION, commitTs),
-        
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+    if (isComplete) {
+      HoodieCommitMetadata commitMetadata = generateCommitMetadata(commitTs,
+          Collections.unmodifiableMap(new HashMap<String, List<String>>() {
+            {
+              put(p0, CollectionUtils.createImmutableList(file1P0C0));
+              put(p1, CollectionUtils.createImmutableList(file1P1C0));
+            }
+          })
+      );
 
+      metaClient.getActiveTimeline().saveAsComplete(
+          new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, 
commitTs),
+          
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+    }
+    return new HoodieInstant(
+        isComplete ? State.COMPLETED : State.INFLIGHT, 
HoodieTimeline.COMMIT_ACTION, commitTs);
   }
 
   protected static HoodieCommitMetadata generateCommitMetadata(
@@ -1669,13 +1688,22 @@ public class TestHoodieTimelineArchiver extends 
HoodieClientTestHarness {
   }
 
   private Pair<List<HoodieInstant>, List<HoodieInstant>> 
archiveAndGetCommitsList(HoodieWriteConfig writeConfig) throws IOException {
+    return archiveAndGetCommitsList(writeConfig, false);
+  }
+
+  private Pair<List<HoodieInstant>, List<HoodieInstant>> 
archiveAndGetCommitsList(
+      HoodieWriteConfig writeConfig, boolean includeIncompleteInstants) throws 
IOException {
     metaClient.reloadActiveTimeline();
-    HoodieTimeline timeline = 
metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants();
+    HoodieTimeline timeline = includeIncompleteInstants
+        ? metaClient.getActiveTimeline().reload().getAllCommitsTimeline()
+        : 
metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants();
     List<HoodieInstant> originalCommits = timeline.getInstants();
     HoodieTable table = HoodieSparkTable.create(writeConfig, context, 
metaClient);
     HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(writeConfig, 
table);
     archiver.archiveIfRequired(context);
-    timeline = 
metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants();
+    timeline = includeIncompleteInstants
+        ? metaClient.getActiveTimeline().reload().getAllCommitsTimeline()
+        : 
metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants();
     List<HoodieInstant> commitsAfterArchival = timeline.getInstants();
     return Pair.of(originalCommits, commitsAfterArchival);
   }
@@ -1762,4 +1790,15 @@ public class TestHoodieTimelineArchiver extends 
HoodieClientTestHarness {
     }
     return new HoodieInstant(inflight, "rollback", rollbackTime);
   }
+
+  private void assertInstantListEquals(List<HoodieInstant> expected, 
List<HoodieInstant> actual) {
+    assertEquals(expected.size(), actual.size());
+    for (int i = 0; i < expected.size(); i++) {
+      HoodieInstant expectedInstant = expected.get(i);
+      HoodieInstant actualInstant = actual.get(i);
+      assertEquals(expectedInstant.getTimestamp(), 
actualInstant.getTimestamp());
+      assertEquals(expectedInstant.getAction(), actualInstant.getAction());
+      assertEquals(expectedInstant.getState(), actualInstant.getState());
+    }
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java
index 7d29c176c1b..2f57f5455c3 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java
@@ -24,8 +24,11 @@ import 
org.apache.hudi.avro.model.HoodieCleanPartitionMetadata;
 import org.apache.hudi.avro.model.HoodieCleanerPlan;
 import org.apache.hudi.common.HoodieCleanStat;
 import org.apache.hudi.common.model.CleanFileInfo;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.HoodieTimelineTimeZone;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
@@ -38,6 +41,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.time.Instant;
+import java.time.ZonedDateTime;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -104,9 +110,43 @@ public class CleanerUtils {
     return metadataMigrator.upgradeToLatest(cleanMetadata, 
cleanMetadata.getVersion());
   }
 
+  public static Option<HoodieInstant> getEarliestCommitToRetain(
+      HoodieTimeline commitsTimeline, HoodieCleaningPolicy cleaningPolicy, int 
commitsRetained,
+      Instant latestInstant, int hoursRetained, HoodieTimelineTimeZone 
timeZone) {
+    HoodieTimeline completedCommitsTimeline = 
commitsTimeline.filterCompletedInstants();
+    Option<HoodieInstant> earliestCommitToRetain = Option.empty();
+
+    if (cleaningPolicy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS
+        && completedCommitsTimeline.countInstants() > commitsRetained) {
+      Option<HoodieInstant> earliestPendingCommits =
+          commitsTimeline.filter(s -> !s.isCompleted()).firstInstant();
+      if (earliestPendingCommits.isPresent()) {
+        // Earliest commit to retain must not be later than the earliest 
pending commit
+        earliestCommitToRetain =
+            
completedCommitsTimeline.nthInstant(completedCommitsTimeline.countInstants() - 
commitsRetained).map(nthInstant -> {
+              if (nthInstant.compareTo(earliestPendingCommits.get()) <= 0) {
+                return Option.of(nthInstant);
+              } else {
+                return 
completedCommitsTimeline.findInstantsBefore(earliestPendingCommits.get().getTimestamp()).lastInstant();
+              }
+            }).orElse(Option.empty());
+      } else {
+        earliestCommitToRetain = 
completedCommitsTimeline.nthInstant(completedCommitsTimeline.countInstants()
+            - commitsRetained); //15 instants total, 10 commits to retain, 
this gives 6th instant in the list
+      }
+    } else if (cleaningPolicy == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) {
+      ZonedDateTime latestDateTime = ZonedDateTime.ofInstant(latestInstant, 
timeZone.getZoneId());
+      String earliestTimeToRetain = 
HoodieActiveTimeline.formatDate(Date.from(latestDateTime.minusHours(hoursRetained).toInstant()));
+      earliestCommitToRetain = 
Option.fromJavaOptional(completedCommitsTimeline.getInstantsAsStream().filter(i 
-> HoodieTimeline.compareTimestamps(i.getTimestamp(),
+          HoodieTimeline.GREATER_THAN_OR_EQUALS, 
earliestTimeToRetain)).findFirst());
+    }
+    return earliestCommitToRetain;
+  }
+
   /**
    * Get Latest version of cleaner plan corresponding to a clean instant.
-   * @param metaClient  Hoodie Table Meta Client
+   *
+   * @param metaClient   Hoodie Table Meta Client
    * @param cleanInstant Instant referring to clean action
    * @return Cleaner plan corresponding to clean instant
    * @throws IOException

Reply via email to