lokeshj1703 commented on code in PR #11440:
URL: https://github.com/apache/hudi/pull/11440#discussion_r1681475904


##########
hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java:
##########
@@ -376,9 +381,75 @@ public static Option<HoodieInstant> 
getEarliestInstantToRetainForClustering(
         oldestInstantToRetain = replaceOrClusterTimeline.firstInstant();
       }
     }
+    oldestInstantToRetain = Option.ofNullable(
+        HoodieTimeline.minTimestampInstant(oldestInstantToRetain.orElse(null), 
earliestSavepointInClean.orElse(null)));
     return oldestInstantToRetain;
   }
 
+  public static Option<HoodieInstant> 
getEarliestSavepointInClean(HoodieActiveTimeline activeTimeline, 
HoodieTableMetaClient metaClient, HoodieCleaningPolicy cleanerPolicy,
+                                                                   
Option<HoodieInstant> cleanInstantOpt, boolean shouldArchiveBeyondSavepoint) 
throws IOException {
+    // EarliestSavepoint in clean is required to block archival when savepoint 
is deleted.
+    // This ensures that archival is blocked until clean has cleaned up files 
retained due to savepoint.
+    // If this guard is not present, the archival of commits can lead to 
duplicates. Here is a scenario
+    // illustrating the same. This scenario considers a case where 
EarliestSavepoint guard is not present
+    // c1.dc - f1 (c1 deltacommit creates file with id f1)
+    // c2.dc - f2 (c2 deltacommit creates file with id f2)
+    // c2.sp - Savepoint at c2
+    // c3.rc (replacing f2 -> f3) (Replace commit replacing file id f2 with f3)
+    // c4.dc
+    //
+    // Lets say Incremental cleaner moved past the c3.rc without cleaning f2 
since savepoint is created at c2.
+    // Archival is blocked at c2 since there is a savepoint at c2.
+    // Lets say the savepoint at c2 is now deleted, Archival would archive 
c3.rc since it is unblocked now.
+    // Since c3 is archived and f2 has not been cleaned, the table view would 
be considering f2 as a valid
+    // file id. This causes duplicates.
+    if (shouldArchiveBeyondSavepoint) {
+      // When archive beyond savepoint is enabled, we do not block the 
archival based on cleaner earliestSavepoint
+      return Option.empty();
+    }
+    // explicitly check the savepoint timeline and guard against the first one
+    Option<String> firstSavepointOpt = 
activeTimeline.getSavePointTimeline().filterCompletedInstants().firstInstant().map(HoodieInstant::getTimestamp);
+    String earliestSavepointTs = firstSavepointOpt.orElse(null);
+    Option<String> cleanerEarliestSavepoint = Option.empty();
+    Option<HoodieActionInstant> cleanerEarliestToRetain = Option.empty();
+
+    if (cleanerPolicy != HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS && 
cleanInstantOpt.isPresent()) {

Review Comment:
   @nsivabalan Is specific handling for `KEEP_LATEST_FILE_VERSIONS` still 
required or can we keep logic same for all cleaner policies? It seems like 
SAVEPOINTED_TIMESTAMPS would always be created.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to