yihua commented on code in PR #9416:
URL: https://github.com/apache/hudi/pull/9416#discussion_r1325208804


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java:
##########
@@ -452,107 +431,138 @@ private Stream<HoodieInstant> 
getCommitInstantsToArchive() throws IOException {
               ? CompactionUtils.getOldestInstantToRetainForCompaction(
               table.getActiveTimeline(), 
config.getInlineCompactDeltaCommitMax())
               : Option.empty();
+      oldestInstantToRetainCandidates.add(oldestInstantToRetainForCompaction);
 
-      // The clustering commit instant can not be archived unless we ensure 
that the replaced files have been cleaned,
+      // 3. The clustering commit instant can not be archived unless we ensure 
that the replaced files have been cleaned,
       // without the replaced files metadata on the timeline, the fs view 
would expose duplicates for readers.
       // Meanwhile, when inline or async clustering is enabled, we need to 
ensure that there is a commit in the active timeline
       // to check whether the file slice generated in pending clustering after 
archive isn't committed.
       Option<HoodieInstant> oldestInstantToRetainForClustering =
           
ClusteringUtils.getOldestInstantToRetainForClustering(table.getActiveTimeline(),
 table.getMetaClient());
+      oldestInstantToRetainCandidates.add(oldestInstantToRetainForClustering);
+
+      // 4. If metadata table is enabled, do not archive instants which are 
more recent than the last compaction on the
+      // metadata table.
+      if (table.getMetaClient().getTableConfig().isMetadataTableAvailable()) {
+        try (HoodieTableMetadata tableMetadata = 
HoodieTableMetadata.create(table.getContext(), config.getMetadataConfig(), 
config.getBasePath())) {
+          Option<String> latestCompactionTime = 
tableMetadata.getLatestCompactionTime();
+          if (!latestCompactionTime.isPresent()) {
+            LOG.info("Not archiving as there is no compaction yet on the 
metadata table");
+            return Collections.emptyList();
+          } else {
+            LOG.info("Limiting archiving of instants to latest compaction on 
metadata table at " + latestCompactionTime.get());
+            oldestInstantToRetainCandidates.add(Option.of(new HoodieInstant(
+                HoodieInstant.State.COMPLETED, COMPACTION_ACTION, 
latestCompactionTime.get())));
+          }
+        } catch (Exception e) {
+          throw new HoodieException("Error limiting instant archival based on 
metadata table", e);
+        }
+      }
+
+      // 5. If this is a metadata table, do not archive the commits that live 
in data set
+      // active timeline. This is required by metadata table,
+      // see HoodieTableMetadataUtil#processRollbackMetadata for details.
+      if (table.isMetadataTable()) {
+        HoodieTableMetaClient dataMetaClient = HoodieTableMetaClient.builder()
+            
.setBasePath(HoodieTableMetadata.getDatasetBasePath(config.getBasePath()))
+            .setConf(metaClient.getHadoopConf())
+            .build();
+        Option<HoodieInstant> qualifiedEarliestInstant =
+            TimelineUtils.getEarliestInstantForMetadataArchival(
+                dataMetaClient.getActiveTimeline(), 
config.shouldArchiveBeyondSavepoint());
+
+        // Do not archive the instants after the earliest commit (COMMIT, 
DELTA_COMMIT, and
+        // REPLACE_COMMIT only, considering non-savepoint commit only if 
enabling archive
+        // beyond savepoint) and the earliest inflight instant (all actions).
+        // This is required by metadata table, see 
HoodieTableMetadataUtil#processRollbackMetadata
+        // for details.
+        // Todo: Remove #7580

Review Comment:
   +1 on @danny0405 's comment.  Before this PR, the logic is to keep at least 
one clean and rollback instant on the active timeline.  Archiving all rollback 
instants in the active timeline are ok, but there's caveat on archiving all 
clean instants (e.g., all completed clean instants are before the earliest 
commit to retain; one of the cases could be the user turns off cleaning for 
some time and turns it on again).  The incremental cleaning may not get the 
latest clean instant.
   
   However, it's not the right way either to keep at least one clean instant in 
the active timeline in the new logic, because that can block the archival of 
commits for the sake of a contiguous block of instants.
   
   @Zouxxyy could you double check if there is any issue wrt to different 
cleaning modes under the new archival logic?  e.g., incremental cleaning should 
fall back to full cleaning if there's no clean instant on the active timeline.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java:
##########
@@ -388,60 +387,40 @@ private Map<String, Boolean> 
deleteFilesParallelize(HoodieTableMetaClient metaCl
         paths);
   }
 
-  private Stream<HoodieInstant> getCleanInstantsToArchive() {
-    HoodieTimeline cleanAndRollbackTimeline = table.getActiveTimeline()
-        
.getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION, 
HoodieTimeline.ROLLBACK_ACTION)).filterCompletedInstants();
-    return cleanAndRollbackTimeline.getInstantsAsStream()
-        
.collect(Collectors.groupingBy(HoodieInstant::getAction)).values().stream()
-        .map(hoodieInstants -> {
-          if (hoodieInstants.size() > this.maxInstantsToKeep) {
-            return hoodieInstants.subList(0, hoodieInstants.size() - 
this.minInstantsToKeep);
-          } else {
-            return new ArrayList<HoodieInstant>();
-          }
-        }).flatMap(Collection::stream);
-  }
-
-  private Stream<HoodieInstant> getCommitInstantsToArchive() throws 
IOException {
-    // TODO (na) : Add a way to return actions associated with a timeline and 
then merge/unify
-    // with logic above to avoid Stream.concat
-    HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
-
-    // Get the oldest inflight instant and a completed commit before this 
inflight instant.
-    Option<HoodieInstant> oldestPendingInstant = table.getActiveTimeline()
-        .getWriteTimeline()
-        .filter(instant -> !instant.isCompleted())
-        .firstInstant();
-
-    // Oldest commit to retain is the greatest completed commit, that is less 
than the oldest pending instant.
-    // In some cases when inflight is the lowest commit then oldest commit to 
retain will be equal to oldest
-    // inflight commit.
-    Option<HoodieInstant> oldestCommitToRetain;
-    if (oldestPendingInstant.isPresent()) {
-      Option<HoodieInstant> completedCommitBeforeOldestPendingInstant =
-          Option.fromJavaOptional(commitTimeline.getReverseOrderedInstants()
-              .filter(instant -> 
HoodieTimeline.compareTimestamps(instant.getTimestamp(),
-                  LESSER_THAN, 
oldestPendingInstant.get().getTimestamp())).findFirst());
-      // Check if the completed instant is higher than the oldest inflight 
instant
-      // in that case update the oldestCommitToRetain to oldestInflight commit 
time.
-      if (!completedCommitBeforeOldestPendingInstant.isPresent()
-          || 
HoodieTimeline.compareTimestamps(oldestPendingInstant.get().getTimestamp(),
-          LESSER_THAN, 
completedCommitBeforeOldestPendingInstant.get().getTimestamp())) {
-        oldestCommitToRetain = oldestPendingInstant;
+  private List<HoodieInstant> getCommitInstantsToArchive() throws IOException {
+    HoodieTimeline completedCommitsTimeline = 
table.getCompletedCommitsTimeline();
+
+    if (completedCommitsTimeline.countInstants() > maxInstantsToKeep) {
+      // Step1: Get all candidates of oldestInstantToRetain.
+      List<Option<HoodieInstant>> oldestInstantToRetainCandidates = new 
ArrayList<>();
+

Review Comment:
   I think the new logic is easier to understand than the previous logic.  We 
need to make sure there is no bug introduced.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to