nsivabalan commented on code in PR #8783:
URL: https://github.com/apache/hudi/pull/8783#discussion_r1212425833


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java:
##########
@@ -460,19 +465,12 @@ private Stream<HoodieInstant> 
getCommitInstantsToArchive() throws IOException {
               return !(firstSavepoint.isPresent() && 
compareTimestamps(firstSavepoint.get().getTimestamp(), LESSER_THAN_OR_EQUALS, 
s.getTimestamp()));
             }
           }).filter(s -> {
-            // Ensure commits >= the oldest pending compaction/replace commit 
is retained
-            return oldestPendingCompactionAndReplaceInstant
+            // oldestCommitToRetain is the highest completed commit instant 
that is less than the oldest inflight instant.
+            // By filter out any commit >= oldestCommitToRetain, we can ensure 
there are no gaps in the timeline
+            // when inflight commits are present.
+            return oldestCommitToRetain

Review Comment:
   this simplifies a lot :) 



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java:
##########
@@ -1416,6 +1418,91 @@ public void 
testArchivalWithMaxDeltaCommitsGuaranteeForCompaction(boolean enable
     }
   }
 
+  @Test

Review Comment:
   can we add some java docs on what we are testing here. 
   simple illustration of timeline might also help



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java:
##########
@@ -1416,6 +1418,91 @@ public void 
testArchivalWithMaxDeltaCommitsGuaranteeForCompaction(boolean enable
     }
   }
 
+  @Test
+  public void testGetCommitInstantsToArchiveDuringInflightCommits() throws 
Exception {
+    HoodieWriteConfig cfg = initTestTableAndGetWriteConfig(true, 4, 5, 5);
+    HoodieTestDataGenerator.createCommitFile(basePath, "100", 
wrapperFs.getConf());
+
+    HoodieTestDataGenerator.createReplaceCommitRequestedFile(basePath, "101", 
wrapperFs.getConf());
+    HoodieTestDataGenerator.createReplaceCommitInflightFile(basePath, "101", 
wrapperFs.getConf());
+    HoodieTestDataGenerator.createCommitFile(basePath, "102", 
wrapperFs.getConf());
+    HoodieTestDataGenerator.createCommitFile(basePath, "103", 
wrapperFs.getConf());
+    HoodieTestDataGenerator.createCompactionRequestedFile(basePath, "104", 
wrapperFs.getConf());
+    HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
+        new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, 
"104"), wrapperFs.getConf());
+    HoodieTestDataGenerator.createCommitFile(basePath, "105", 
wrapperFs.getConf());
+    HoodieTestDataGenerator.createCommitFile(basePath, "106", 
wrapperFs.getConf());
+    HoodieTestDataGenerator.createCommitFile(basePath, "107", 
wrapperFs.getConf());
+    HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
+    HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(cfg, table);
+
+    HoodieTimeline timeline = 
metaClient.getActiveTimeline().getWriteTimeline();//getCommitsAndCompactionTimeline();
+    assertEquals(8, timeline.countInstants(), "Loaded 8 commits and the count 
should match");
+    boolean result = archiver.archiveIfRequired(context);
+    assertTrue(result);
+    timeline = 
metaClient.getActiveTimeline().reload().getWriteTimeline();//getCommitsAndCompactionTimeline();
+    assertTrue(timeline.containsInstant(new HoodieInstant(false, 
HoodieTimeline.COMMIT_ACTION, "100")),
+        "At least one instant before oldest pending replacecommit need to stay 
in the timeline");
+    assertEquals(8, timeline.countInstants(),
+        "Since we have a pending replacecommit at 101, we should never archive 
any commit "
+            + "after 101 and also to maintain timeline have at least one 
completed commit before pending commit");
+    assertTrue(timeline.containsInstant(new HoodieInstant(State.INFLIGHT, 
HoodieTimeline.REPLACE_COMMIT_ACTION, "101")),
+        "Inflight replacecommit must still be present");
+    assertTrue(timeline.containsInstant(new HoodieInstant(false, 
HoodieTimeline.COMMIT_ACTION, "102")),
+        "Instants greater than oldest pending commit must be present");
+    assertTrue(timeline.containsInstant(new HoodieInstant(false, 
HoodieTimeline.COMMIT_ACTION, "103")),
+        "Instants greater than oldest pending commit must be present");
+    assertTrue(timeline.containsInstant(new HoodieInstant(State.REQUESTED, 
HoodieTimeline.COMPACTION_ACTION, "104")),
+        "Instants greater than oldest pending commit must be present");
+    assertTrue(timeline.containsInstant(new HoodieInstant(false, 
HoodieTimeline.COMMIT_ACTION, "105")),
+        "Instants greater than oldest pending commit must be present");
+    assertTrue(timeline.containsInstant(new HoodieInstant(false, 
HoodieTimeline.COMMIT_ACTION, "106")),
+        "Instants greater than oldest pending commit must be present");
+    assertTrue(timeline.containsInstant(new HoodieInstant(false, 
HoodieTimeline.COMMIT_ACTION, "107")),
+        "Instants greater than oldest pending commit must be present");
+  }
+
+  /**
+   * If replacecommit inflight is the oldest commit in the timeline
+   * then this method tests the archival logic to work by ignoring the 
inflight commit.
+   */
+  @Test
+  public void testWithOldestReplaceCommit() throws Exception {
+    HoodieWriteConfig cfg = initTestTableAndGetWriteConfig(false, 2, 3, 2);
+
+    HoodieTestDataGenerator.createReplaceCommitRequestedFile(basePath, "101", 
wrapperFs.getConf());
+    HoodieTestDataGenerator.createReplaceCommitInflightFile(basePath, "101", 
wrapperFs.getConf());
+    HoodieTestDataGenerator.createCommitFile(basePath, "102", 
wrapperFs.getConf());
+    HoodieTestDataGenerator.createCompactionRequestedFile(basePath, "103", 
wrapperFs.getConf());
+    HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
+        new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, 
"103"), wrapperFs.getConf());
+    HoodieTestDataGenerator.createCommitFile(basePath, "104", 
wrapperFs.getConf());
+    HoodieTestDataGenerator.createCommitFile(basePath, "105", 
wrapperFs.getConf());
+    HoodieTestDataGenerator.createCommitFile(basePath, "106", 
wrapperFs.getConf());
+    HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
+    HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(cfg, table);
+
+    HoodieTimeline timeline = 
metaClient.getActiveTimeline().getWriteTimeline();//getCommitsAndCompactionTimeline();
+    assertEquals(6, timeline.countInstants(), "Loaded 6 commits and the count 
should match");
+    boolean result = archiver.archiveIfRequired(context);
+    assertTrue(result);
+    timeline = 
metaClient.getActiveTimeline().reload().getWriteTimeline();//getCommitsAndCompactionTimeline();
+    assertEquals(6, timeline.countInstants(),
+        "Since we have a pending compaction at 101, we should never archive 
any commit after 101");
+    assertTrue(timeline.containsInstant(new HoodieInstant(State.INFLIGHT, 
HoodieTimeline.REPLACE_COMMIT_ACTION, "101")),
+        "Iiflight replacecommit must still be present");
+    assertTrue(timeline.containsInstant(new HoodieInstant(false, 
HoodieTimeline.COMMIT_ACTION, "102")),
+        "Instants greater than oldest pending commit must be present");
+    assertTrue(timeline.containsInstant(new HoodieInstant(State.REQUESTED, 
HoodieTimeline.COMPACTION_ACTION, "103")),
+        "Instants greater than oldest pending commit must be present");

Review Comment:
   same here



##########
hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java:
##########
@@ -256,7 +256,12 @@ public static Option<HoodieInstant> 
getOldestInstantToRetainForClustering(
           retainLowerBound = earliestInstantToRetain.getTimestamp();
         } else {
           // no earliestInstantToRetain, indicate KEEP_LATEST_FILE_VERSIONS 
clean policy,
-          // retain first instant after clean instant
+          // retain first instant after clean instant.
+          // For KEEP_LATEST_FILE_VERSIONS cleaner policy, file versions are 
only maintained for active file groups
+          // not for replaced file groups. So, last clean instant can be 
considered as a lower bound, since
+          // the cleaner would have removed all the file groups until then. 
But there is a catch to this logic,
+          // while cleaner is running if there is a pending replacecommit then 
those files are not cleaned.
+          // TODO: This case has to be handled.

Review Comment:
   can you file a tracking ticket for this and link here



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java:
##########
@@ -1416,6 +1418,91 @@ public void 
testArchivalWithMaxDeltaCommitsGuaranteeForCompaction(boolean enable
     }
   }
 
+  @Test
+  public void testGetCommitInstantsToArchiveDuringInflightCommits() throws 
Exception {
+    HoodieWriteConfig cfg = initTestTableAndGetWriteConfig(true, 4, 5, 5);
+    HoodieTestDataGenerator.createCommitFile(basePath, "100", 
wrapperFs.getConf());
+
+    HoodieTestDataGenerator.createReplaceCommitRequestedFile(basePath, "101", 
wrapperFs.getConf());
+    HoodieTestDataGenerator.createReplaceCommitInflightFile(basePath, "101", 
wrapperFs.getConf());
+    HoodieTestDataGenerator.createCommitFile(basePath, "102", 
wrapperFs.getConf());
+    HoodieTestDataGenerator.createCommitFile(basePath, "103", 
wrapperFs.getConf());
+    HoodieTestDataGenerator.createCompactionRequestedFile(basePath, "104", 
wrapperFs.getConf());
+    HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
+        new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, 
"104"), wrapperFs.getConf());
+    HoodieTestDataGenerator.createCommitFile(basePath, "105", 
wrapperFs.getConf());
+    HoodieTestDataGenerator.createCommitFile(basePath, "106", 
wrapperFs.getConf());
+    HoodieTestDataGenerator.createCommitFile(basePath, "107", 
wrapperFs.getConf());
+    HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
+    HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(cfg, table);
+
+    HoodieTimeline timeline = 
metaClient.getActiveTimeline().getWriteTimeline();//getCommitsAndCompactionTimeline();
+    assertEquals(8, timeline.countInstants(), "Loaded 8 commits and the count 
should match");
+    boolean result = archiver.archiveIfRequired(context);
+    assertTrue(result);
+    timeline = 
metaClient.getActiveTimeline().reload().getWriteTimeline();//getCommitsAndCompactionTimeline();
+    assertTrue(timeline.containsInstant(new HoodieInstant(false, 
HoodieTimeline.COMMIT_ACTION, "100")),
+        "At least one instant before oldest pending replacecommit need to stay 
in the timeline");
+    assertEquals(8, timeline.countInstants(),
+        "Since we have a pending replacecommit at 101, we should never archive 
any commit "
+            + "after 101 and also to maintain timeline have at least one 
completed commit before pending commit");
+    assertTrue(timeline.containsInstant(new HoodieInstant(State.INFLIGHT, 
HoodieTimeline.REPLACE_COMMIT_ACTION, "101")),
+        "Inflight replacecommit must still be present");
+    assertTrue(timeline.containsInstant(new HoodieInstant(false, 
HoodieTimeline.COMMIT_ACTION, "102")),

Review Comment:
   can you declare a list of instants to expect in active timeline. and run it 
in a for loop rather than 1 line for each instant



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to