This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 95d8c6b9232 [HUDI-5569] Maintain commit timeline even in case of long 
standing inflights (#8783)
95d8c6b9232 is described below

commit 95d8c6b9232b803c17482752ea409422447dc945
Author: Surya Prasanna <[email protected]>
AuthorDate: Tue Jun 13 17:28:50 2023 -0700

    [HUDI-5569] Maintain commit timeline even in case of long standing 
inflights (#8783)
    
    - Maintain commit timeline even in case of long standing inflights
---
 .../apache/hudi/client/HoodieTimelineArchiver.java |  48 ++++----
 .../apache/hudi/io/TestHoodieTimelineArchiver.java | 124 ++++++++++++++++++++-
 .../apache/hudi/common/util/ClusteringUtils.java   |   7 +-
 .../common/testutils/HoodieTestDataGenerator.java  |  33 ++++++
 .../hudi/hadoop/TestHoodieParquetInputFormat.java  |  35 ++++++
 5 files changed, 221 insertions(+), 26 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
index c81f2d69779..0a6659cb1e7 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
@@ -31,7 +31,6 @@ import org.apache.hudi.common.model.HoodieArchivedLogFile;
 import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieAvroPayload;
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
-import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
@@ -413,15 +412,33 @@ public class HoodieTimelineArchiver<T extends 
HoodieAvroPayload, I, K, O> {
     // with logic above to avoid Stream.concat
     HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
 
-    Option<HoodieInstant> oldestPendingCompactionAndReplaceInstant = 
table.getActiveTimeline()
-        
.getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMPACTION_ACTION,
 HoodieTimeline.REPLACE_COMMIT_ACTION))
-        .filter(s -> !s.isCompleted())
+    // Get the oldest inflight instant and a completed commit before this 
inflight instant.
+    Option<HoodieInstant> oldestPendingInstant = table.getActiveTimeline()
+        .getWriteTimeline()
+        .filter(instant -> !instant.isCompleted())
         .firstInstant();
 
-    Option<HoodieInstant> oldestInflightCommitInstant =
-        table.getActiveTimeline()
-            
.getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION, 
HoodieTimeline.DELTA_COMMIT_ACTION))
-            .filterInflights().firstInstant();
+    // Oldest commit to retain is the greatest completed commit, that is less 
than the oldest pending instant.
+    // In some cases when inflight is the lowest commit then oldest commit to 
retain will be equal to oldest
+    // inflight commit.
+    Option<HoodieInstant> oldestCommitToRetain;
+    if (oldestPendingInstant.isPresent()) {
+      Option<HoodieInstant> completedCommitBeforeOldestPendingInstant =
+          Option.fromJavaOptional(commitTimeline.getReverseOrderedInstants()
+              .filter(instant -> 
HoodieTimeline.compareTimestamps(instant.getTimestamp(),
+                  LESSER_THAN, 
oldestPendingInstant.get().getTimestamp())).findFirst());
+      // Check if the completed instant is higher than the oldest inflight 
instant
+      // in that case update the oldestCommitToRetain to oldestInflight commit 
time.
+      if (!completedCommitBeforeOldestPendingInstant.isPresent()
+          || 
HoodieTimeline.compareTimestamps(oldestPendingInstant.get().getTimestamp(),
+          LESSER_THAN, 
completedCommitBeforeOldestPendingInstant.get().getTimestamp())) {
+        oldestCommitToRetain = oldestPendingInstant;
+      } else {
+        oldestCommitToRetain = completedCommitBeforeOldestPendingInstant;
+      }
+    } else {
+      oldestCommitToRetain = Option.empty();
+    }
 
     // NOTE: We cannot have any holes in the commit timeline.
     // We cannot archive any commits which are made after the first savepoint 
present,
@@ -460,19 +477,12 @@ public class HoodieTimelineArchiver<T extends 
HoodieAvroPayload, I, K, O> {
               return !(firstSavepoint.isPresent() && 
compareTimestamps(firstSavepoint.get().getTimestamp(), LESSER_THAN_OR_EQUALS, 
s.getTimestamp()));
             }
           }).filter(s -> {
-            // Ensure commits >= the oldest pending compaction/replace commit 
is retained
-            return oldestPendingCompactionAndReplaceInstant
+            // oldestCommitToRetain is the highest completed commit instant 
that is less than the oldest inflight instant.
+            // By filtering out any commit >= oldestCommitToRetain, we can 
ensure there are no gaps in the timeline
+            // when inflight commits are present.
+            return oldestCommitToRetain
                 .map(instant -> compareTimestamps(instant.getTimestamp(), 
GREATER_THAN, s.getTimestamp()))
                 .orElse(true);
-          }).filter(s -> {
-            // We need this to ensure that when multiple writers are 
performing conflict resolution, eligible instants don't
-            // get archived, i.e, instants after the oldestInflight are 
retained on the timeline
-            if (config.getFailedWritesCleanPolicy() == 
HoodieFailedWritesCleaningPolicy.LAZY) {
-              return oldestInflightCommitInstant.map(instant ->
-                      compareTimestamps(instant.getTimestamp(), GREATER_THAN, 
s.getTimestamp()))
-                  .orElse(true);
-            }
-            return true;
           }).filter(s ->
               oldestInstantToRetainForCompaction.map(instantToRetain ->
                       compareTimestamps(s.getTimestamp(), LESSER_THAN, 
instantToRetain.getTimestamp()))
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
index 73bfacfd9d7..b049c60b9e7 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
@@ -90,6 +90,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
@@ -913,19 +914,21 @@ public class TestHoodieTimelineArchiver extends 
HoodieClientTestHarness {
         if (i != 7) {
           assertEquals(originalCommits, commitsAfterArchival);
         } else {
-          // on 7th commit, archival will kick in. but will archive only one 
commit since 2nd compaction commit is inflight.
-          assertEquals(originalCommits.size() - commitsAfterArchival.size(), 
1);
+          // on 7th commit, archival will kick in, but cannot archive any 
commit,
+          // since 1st deltacommit is the greatest completed commit before an 
oldest inflight commit.
+          assertEquals(originalCommits.size() - commitsAfterArchival.size(), 
0);
         }
       } else {
         if (i != 7) {
           assertEquals(originalCommits, commitsAfterArchival);
         } else {
-          // on 7th commit, archival will kick in. but will archive only one 
commit since 2nd compaction commit is inflight.
-          assertEquals(originalCommits.size() - commitsAfterArchival.size(), 
1);
+          // on 7th commit, archival will kick in, but cannot archive any 
commit,
+          // since 1st deltacommit is the greatest completed commit before an 
oldest inflight commit.
+          assertEquals(originalCommits.size() - commitsAfterArchival.size(), 
0);
           for (int j = 1; j <= 7; j++) {
             if (j == 1) {
-              // first commit should be archived
-              assertFalse(commitsAfterArchival.contains(new 
HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "0000000" + 
j)));
+              // first commit should not be archived
+              assertTrue(commitsAfterArchival.contains(new 
HoodieInstant(State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "0000000" + 
j)));
             } else if (j == 2) {
               // 2nd compaction should not be archived
               assertFalse(commitsAfterArchival.contains(new 
HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "0000000" + 
j)));
@@ -1418,6 +1421,115 @@ public class TestHoodieTimelineArchiver extends 
HoodieClientTestHarness {
     }
   }
 
+  /**
+   * Test archival functionality when there are inflights files.
+   * Archive should hold on to the greatest completed commit that is less than 
the oldes inflight commit.
+   * @throws Exception
+   */
+  @Test
+  public void testGetCommitInstantsToArchiveDuringInflightCommits() throws 
Exception {
+    HoodieWriteConfig cfg = initTestTableAndGetWriteConfig(false, 3, 4, 2);
+
+    Set<String> expectedInstants = new HashSet<>();
+    // Create 3 completed commits.
+    for (int i = 0; i < 3; i++) {
+      String instantTime = "100" + i;
+      HoodieTestDataGenerator.createCommitFile(basePath, instantTime, 
wrapperFs.getConf());
+      expectedInstants.add(instantTime);
+    }
+    // Create an inflight file.
+    String replaceInstant = "1003";
+    HoodieTestDataGenerator.createReplaceCommitRequestedFile(basePath, 
replaceInstant, wrapperFs.getConf());
+    expectedInstants.add(replaceInstant);
+    // Create 3 more instants
+    for (int i = 4; i < 7; i++) {
+      String instantTime = "100" + i;
+      HoodieTestDataGenerator.createCommitFile(basePath, instantTime, 
wrapperFs.getConf());
+      expectedInstants.add(instantTime);
+    }
+    // Create another inflight commit
+    HoodieTestDataGenerator.createRequestedCommitFile(basePath, "1007", 
wrapperFs.getConf());
+    HoodieTestDataGenerator.createPendingCommitFile(basePath, "1007", 
wrapperFs.getConf());
+    expectedInstants.add("1007");
+    // Create 6 more instants
+    for (int i = 0; i < 6; i++) {
+      String instantTime = "101" + i;
+      HoodieTestDataGenerator.createCommitFile(basePath, instantTime, 
wrapperFs.getConf());
+      expectedInstants.add(instantTime);
+    }
+    HoodieTimeline timeline = 
metaClient.reloadActiveTimeline().getWriteTimeline();
+
+    // Check the count of instants.
+    assertEquals(expectedInstants.size(), timeline.countInstants(), "Loaded 14 
commits and the count should match");
+
+    // Run archival
+    HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
+    HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(cfg, table);
+    boolean result = archiver.archiveIfRequired(context);
+    expectedInstants.remove("1000");
+    expectedInstants.remove("1001");
+    assertTrue(result);
+    timeline = metaClient.reloadActiveTimeline().getWriteTimeline();
+
+    // Check the count of instants after archive it should have 2 less instants
+    // because 103 replacecommit's inflight will block archival.
+    assertEquals(12, timeline.countInstants(), "After archival only first 2 
commits should be archived");
+    assertEquals(expectedInstants.size(), timeline.countInstants(), "After 
archival only first 2 commits should be archived");
+
+    HoodieTimeline finalTimeline = timeline;
+    assertEquals(12, expectedInstants.stream().filter(instant -> 
finalTimeline.containsInstant(instant)).count());
+    assertEquals("1002", 
timeline.getInstantsAsStream().findFirst().get().getTimestamp());
+
+    // Delete replacecommit requested instant.
+    Path replaceCommitRequestedPath = new Path(
+        basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
+            + HoodieTimeline.makeRequestedReplaceFileName(replaceInstant));
+    metaClient.getFs().delete(replaceCommitRequestedPath);
+    metaClient.reloadActiveTimeline();
+
+    // Run archival
+    assertTrue(archiver.archiveIfRequired(context));
+    timeline = metaClient.reloadActiveTimeline().getWriteTimeline();
+    expectedInstants.removeAll(Arrays.asList("1002", "1003", "1004", "1005"));
+
+    // Check the count of instants after archive it should have 3 more less 
instants
+    // This time 1007 inflight commit will block archival.
+    assertEquals(8, timeline.countInstants(), "After archival only first 2 
commits should be archived");
+    assertEquals(expectedInstants.size(), timeline.countInstants(), "After 
archival only first 2 commits should be archived");
+    HoodieTimeline refreshedTimeline = timeline;
+    assertEquals(8, expectedInstants.stream().filter(instant -> 
refreshedTimeline.containsInstant(instant)).count());
+    assertEquals("1006", 
timeline.getInstantsAsStream().findFirst().get().getTimestamp());
+  }
+
+  /**
+   * If replacecommit inflight is the oldest commit in the timeline or for 
that matter any inflight commit is present
+   * then the archival is blocked from there. This method test this scenario.
+   */
+  @Test
+  public void testWithOldestReplaceCommit() throws Exception {
+    HoodieWriteConfig cfg = initTestTableAndGetWriteConfig(false, 2, 3, 2);
+
+    HoodieTestDataGenerator.createReplaceCommitRequestedFile(basePath, "1001", 
wrapperFs.getConf());
+    HoodieTestDataGenerator.createReplaceCommitInflightFile(basePath, "1001", 
wrapperFs.getConf());
+    // Create 8 completed commits.
+    for (int i = 2; i < 10; i++) {
+      String instantTime = "100" + i;
+      HoodieTestDataGenerator.createCommitFile(basePath, instantTime, 
wrapperFs.getConf());
+    }
+
+    HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
+    HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(cfg, table);
+
+    HoodieTimeline timeline = metaClient.reloadActiveTimeline();
+    assertEquals(9, timeline.countInstants(), "Loaded 9 commits and the count 
should match");
+    boolean result = archiver.archiveIfRequired(context);
+    assertTrue(result);
+    timeline = metaClient.reloadActiveTimeline();
+    assertEquals(9, timeline.countInstants(),
+        "Since we have a pending replacecommit at 1001, we should never 
archive any commit after 1001");
+    assertEquals("1001", 
timeline.getInstantsAsStream().findFirst().get().getTimestamp());
+  }
+
   @Test
   public void testArchivalAndCompactionInMetadataTable() throws Exception {
     init(HoodieTableType.COPY_ON_WRITE);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
index fae8362a744..dc6de394bf3 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
@@ -256,7 +256,12 @@ public class ClusteringUtils {
           retainLowerBound = earliestInstantToRetain.getTimestamp();
         } else {
           // no earliestInstantToRetain, indicate KEEP_LATEST_FILE_VERSIONS 
clean policy,
-          // retain first instant after clean instant
+          // retain first instant after clean instant.
+          // For KEEP_LATEST_FILE_VERSIONS cleaner policy, file versions are 
only maintained for active file groups
+          // not for replaced file groups. So, last clean instant can be 
considered as a lower bound, since
+          // the cleaner would have removed all the file groups until then. 
But there is a catch to this logic,
+          // while cleaner is running if there is a pending replacecommit then 
those files are not cleaned.
+          // TODO: This case has to be handled. HUDI-6352
           retainLowerBound = cleanInstant.getTimestamp();
         }
 
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index 96b22b14990..80fb90f6fe3 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -493,6 +493,18 @@ public class HoodieTestDataGenerator implements 
AutoCloseable {
     return rec;
   }
 
+  public static void createRequestedCommitFile(String basePath, String 
instantTime, Configuration configuration) throws IOException {
+    Path pendingRequestedFile = new Path(basePath + "/" + 
HoodieTableMetaClient.METAFOLDER_NAME + "/"
+        + HoodieTimeline.makeRequestedCommitFileName(instantTime));
+    createEmptyFile(basePath, pendingRequestedFile, configuration);
+  }
+
+  public static void createPendingCommitFile(String basePath, String 
instantTime, Configuration configuration) throws IOException {
+    Path pendingCommitFile = new Path(basePath + "/" + 
HoodieTableMetaClient.METAFOLDER_NAME + "/"
+        + HoodieTimeline.makeInflightCommitFileName(instantTime));
+    createEmptyFile(basePath, pendingCommitFile, configuration);
+  }
+
   public static void createCommitFile(String basePath, String instantTime, 
Configuration configuration) {
     HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
     createCommitFile(basePath, instantTime, configuration, commitMetadata);
@@ -534,6 +546,20 @@ public class HoodieTestDataGenerator implements 
AutoCloseable {
     }
   }
 
+  public static void createReplaceCommitRequestedFile(String basePath, String 
instantTime, Configuration configuration)
+      throws IOException {
+    Path commitFile = new Path(basePath + "/" + 
HoodieTableMetaClient.METAFOLDER_NAME + "/"
+        + HoodieTimeline.makeRequestedReplaceFileName(instantTime));
+    createEmptyFile(basePath, commitFile, configuration);
+  }
+
+  public static void createReplaceCommitInflightFile(String basePath, String 
instantTime, Configuration configuration)
+      throws IOException {
+    Path commitFile = new Path(basePath + "/" + 
HoodieTableMetaClient.METAFOLDER_NAME + "/"
+        + HoodieTimeline.makeInflightReplaceFileName(instantTime));
+    createEmptyFile(basePath, commitFile, configuration);
+  }
+
   private static void createPendingReplaceFile(String basePath, String 
instantTime, Configuration configuration, HoodieCommitMetadata commitMetadata) {
     Arrays.asList(HoodieTimeline.makeInflightReplaceFileName(instantTime),
         HoodieTimeline.makeRequestedReplaceFileName(instantTime))
@@ -558,6 +584,13 @@ public class HoodieTestDataGenerator implements 
AutoCloseable {
     os.close();
   }
 
+  public static void createCompactionRequestedFile(String basePath, String 
instantTime, Configuration configuration)
+      throws IOException {
+    Path commitFile = new Path(basePath + "/" + 
HoodieTableMetaClient.METAFOLDER_NAME + "/"
+        + HoodieTimeline.makeRequestedCompactionFileName(instantTime));
+    createEmptyFile(basePath, commitFile, configuration);
+  }
+
   public static void createCompactionAuxiliaryMetadata(String basePath, 
HoodieInstant instant,
                                                        Configuration 
configuration) throws IOException {
     Path commitFile =
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
index 32690f12096..ab1a7a4551c 100644
--- 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
+++ 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
@@ -694,6 +694,41 @@ public class TestHoodieParquetInputFormat {
     ensureFilesInCommit("Pulling 1 commit from 100, should get us the 10 files 
committed at 100", files, "100", 10);
   }
 
+  /**
+   * Test scenario where inflight commit is between completed commits.
+   */
+  @Test
+  public void testSnapshotPreCommitValidateWithInflights() throws IOException {
+    // Create commit and data files with commit 000
+    File partitionDir = InputFormatTestUtil.prepareTable(basePath, 
baseFileFormat, 5, "000");
+    createCommitFile(basePath, "000", "2016/05/01");
+
+    // create inflight commit add more files with same file_id.
+    InputFormatTestUtil.simulateInserts(partitionDir, baseFileExtension, 
"fileId1", 5, "100");
+    FileCreateUtils.createInflightCommit(basePath.toString(), "100");
+
+    // Create another commit without datafiles.
+    createCommitFile(basePath, "200", "2016/05/01");
+
+    // Add the paths
+    FileInputFormat.setInputPaths(jobConf, partitionDir.getPath());
+
+    // Now, the original data files with commit time 000 should be returned.
+    FileStatus[] files = inputFormat.listStatus(jobConf);
+    assertEquals(5, files.length, "Snapshot read must return all files in 
partition");
+    ensureFilesInCommit("Should return base files from commit 000, inflight 
data files with "
+        + "greater timestamp should be filtered", files, "000", 5);
+
+    // Create data files with same file_id for commit 200.
+    InputFormatTestUtil.simulateInserts(partitionDir, baseFileExtension, 
"fileId1", 5, "200");
+
+    // This time data files from commit time 200 will be returned.
+    files = inputFormat.listStatus(jobConf);
+    assertEquals(5, files.length, "Snapshot read must return all files in 
partition");
+    ensureFilesInCommit("Only completed commits files should be returned.",
+        files, "200", 5);
+  }
+
   private void ensureRecordsInCommit(String msg, String commit, int 
expectedNumberOfRecordsInCommit,
       int totalExpected) throws IOException {
     int actualCount = 0;

Reply via email to