Zouxxyy commented on code in PR #9416:
URL: https://github.com/apache/hudi/pull/9416#discussion_r1299184229


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java:
##########
@@ -452,107 +431,137 @@ private Stream<HoodieInstant> 
getCommitInstantsToArchive() throws IOException {
               ? CompactionUtils.getOldestInstantToRetainForCompaction(
               table.getActiveTimeline(), 
config.getInlineCompactDeltaCommitMax())
               : Option.empty();
+      oldestInstantToRetainCandidates.add(oldestInstantToRetainForCompaction);
 
-      // The clustering commit instant can not be archived unless we ensure 
that the replaced files have been cleaned,
+      // 3. The clustering commit instant can not be archived unless we ensure 
that the replaced files have been cleaned,
       // without the replaced files metadata on the timeline, the fs view 
would expose duplicates for readers.
       // Meanwhile, when inline or async clustering is enabled, we need to 
ensure that there is a commit in the active timeline
       // to check whether the file slice generated in pending clustering after 
archive isn't committed.
       Option<HoodieInstant> oldestInstantToRetainForClustering =
           
ClusteringUtils.getOldestInstantToRetainForClustering(table.getActiveTimeline(),
 table.getMetaClient());
+      oldestInstantToRetainCandidates.add(oldestInstantToRetainForClustering);
+
+      // 4. If metadata table is enabled, do not archive instants which are 
more recent than the last compaction on the
+      // metadata table.
+      if (table.getMetaClient().getTableConfig().isMetadataTableAvailable()) {
+        try (HoodieTableMetadata tableMetadata = 
HoodieTableMetadata.create(table.getContext(), config.getMetadataConfig(), 
config.getBasePath())) {
+          Option<String> latestCompactionTime = 
tableMetadata.getLatestCompactionTime();
+          if (!latestCompactionTime.isPresent()) {
+            LOG.info("Not archiving as there is no compaction yet on the 
metadata table");
+            return Collections.emptyList();
+          } else {
+            LOG.info("Limiting archiving of instants to latest compaction on 
metadata table at " + latestCompactionTime.get());
+            oldestInstantToRetainCandidates.add(Option.of(new HoodieInstant(
+                HoodieInstant.State.COMPLETED, COMPACTION_ACTION, 
latestCompactionTime.get())));
+          }
+        } catch (Exception e) {
+          throw new HoodieException("Error limiting instant archival based on 
metadata table", e);
+        }
+      }
+
+      // 5. If this is a metadata table, do not archive the commits that live 
in data set
+      // active timeline. This is required by metadata table,
+      // see HoodieTableMetadataUtil#processRollbackMetadata for details.
+      if (table.isMetadataTable()) {
+        HoodieTableMetaClient dataMetaClient = HoodieTableMetaClient.builder()
+            
.setBasePath(HoodieTableMetadata.getDatasetBasePath(config.getBasePath()))
+            .setConf(metaClient.getHadoopConf())
+            .build();
+        Option<HoodieInstant> qualifiedEarliestInstant =
+            TimelineUtils.getEarliestInstantForMetadataArchival(
+                dataMetaClient.getActiveTimeline(), 
config.shouldArchiveBeyondSavepoint());
+
+        // Do not archive the instants after the earliest commit (COMMIT, 
DELTA_COMMIT, and
+        // REPLACE_COMMIT only, considering non-savepoint commit only if 
enabling archive
+        // beyond savepoint) and the earliest inflight instant (all actions).
+        // This is required by metadata table, see 
HoodieTableMetadataUtil#processRollbackMetadata
+        // for details.
+        // Todo: Remove #7580
+        // Note that we cannot blindly use the earliest instant of all 
actions, because CLEAN and
+        // ROLLBACK instants are archived separately apart from commits (check
+        // HoodieTimelineArchiver#getCleanInstantsToArchive).  If we do so, a 
very old completed
+        // CLEAN or ROLLBACK instant can block the archive of metadata table 
timeline and causes
+        // the active timeline of metadata table to be extremely long, leading 
to performance issues
+        // for loading the timeline.
+        oldestInstantToRetainCandidates.add(qualifiedEarliestInstant);
+      }
+
+      // Choose the instant in oldestInstantToRetainCandidates with the 
smallest
+      // timestamp as oldestInstantToRetain.
+      java.util.Optional<HoodieInstant> oldestInstantToRetain = 
oldestInstantToRetainCandidates
+          .stream()
+          .filter(Option::isPresent)
+          .map(Option::get)
+          .min(HoodieInstant.COMPARATOR);
 
-      // Actually do the commits
-      Stream<HoodieInstant> instantToArchiveStream = 
commitTimeline.getInstantsAsStream()
+      // Step2: We cannot archive any commits which are made after the first 
savepoint present,
+      // unless HoodieArchivalConfig#ARCHIVE_BEYOND_SAVEPOINT is enabled.
+      Option<HoodieInstant> firstSavepoint = 
table.getCompletedSavepointTimeline().firstInstant();
+      Set<String> savepointTimestamps = table.getSavepointTimestamps();
+
+      Stream<HoodieInstant> instantToArchiveStream = 
completedCommitsTimeline.getInstantsAsStream()
           .filter(s -> {
             if (config.shouldArchiveBeyondSavepoint()) {
               // skip savepoint commits and proceed further
               return !savepointTimestamps.contains(s.getTimestamp());
             } else {
               // if no savepoint present, then don't filter
               // stop at first savepoint commit
-              return !(firstSavepoint.isPresent() && 
compareTimestamps(firstSavepoint.get().getTimestamp(), LESSER_THAN_OR_EQUALS, 
s.getTimestamp()));
+              return !firstSavepoint.isPresent() || 
compareTimestamps(s.getTimestamp(), LESSER_THAN, 
firstSavepoint.get().getTimestamp());
             }
-          }).filter(s -> {
-            // oldestCommitToRetain is the highest completed commit instant 
that is less than the oldest inflight instant.
-            // By filtering out any commit >= oldestCommitToRetain, we can 
ensure there are no gaps in the timeline
-            // when inflight commits are present.
-            return oldestCommitToRetain
-                .map(instant -> compareTimestamps(instant.getTimestamp(), 
GREATER_THAN, s.getTimestamp()))
-                .orElse(true);
-          }).filter(s ->
-              oldestInstantToRetainForCompaction.map(instantToRetain ->
-                      compareTimestamps(s.getTimestamp(), LESSER_THAN, 
instantToRetain.getTimestamp()))
-                  .orElse(true)
-          ).filter(s ->
-              oldestInstantToRetainForClustering.map(instantToRetain ->
-                      HoodieTimeline.compareTimestamps(s.getTimestamp(), 
LESSER_THAN, instantToRetain.getTimestamp()))
-                  .orElse(true)
-          );
-      return instantToArchiveStream.limit(commitTimeline.countInstants() - 
minInstantsToKeep);
+          }).filter(s -> oldestInstantToRetain
+              .map(instant -> compareTimestamps(s.getTimestamp(), LESSER_THAN, 
instant.getTimestamp()))
+              .orElse(true));
+      return 
instantToArchiveStream.limit(completedCommitsTimeline.countInstants() - 
minInstantsToKeep)
+          .collect(Collectors.toList());
     } else {
-      return Stream.empty();
+      return Collections.emptyList();
     }
   }
 
+  private List<HoodieInstant> 
getCleanAndRollbackInstantsToArchive(HoodieInstant 
newestCommitInstantToArchive) {
+    HoodieTimeline cleanAndRollbackTimeline = table.getActiveTimeline()
+        
.getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION, 
HoodieTimeline.ROLLBACK_ACTION))
+        .filterCompletedInstants();
+
+    // Since the commit instants to archive is continuous, we can use the 
newest commit instant to archive as the
+    // right boundary to collect the clean or rollback instants to archive.
+    //
+    //                                                  
newestCommitInstantToArchive
+    //                                                               v
+    //  | commit1 clean1 commit2 commit3 clean2 commit4 rollback1 commit5 | 
commit6 clean3 commit7 ...
+    //  | <------------------  instants to archive  --------------------> |
+    //
+    //  CommitInstantsToArchive: commit1, commit2, commit3, commit4, commit5
+    //  CleanAndRollbackInstantsToArchive: clean1, clean2, rollback1
+
+    return cleanAndRollbackTimeline.getInstantsAsStream()
+        .filter(s -> compareTimestamps(s.getTimestamp(), LESSER_THAN, 
newestCommitInstantToArchive.getTimestamp()))
+        .collect(Collectors.toList());
+  }
+
   private Stream<HoodieInstant> getInstantsToArchive() throws IOException {
-    Stream<HoodieInstant> instants = 
Stream.concat(getCleanInstantsToArchive(), getCommitInstantsToArchive());
     if (config.isMetaserverEnabled()) {
       return Stream.empty();
     }
 
-    // For archiving and cleaning instants, we need to include intermediate 
state files if they exist
+    // First get commit instants to archive.
+    List<HoodieInstant> instantsToArchive = getCommitInstantsToArchive();
+    if (!instantsToArchive.isEmpty()) {
+      HoodieInstant newestCommitInstantToArchive = 
instantsToArchive.get(instantsToArchive.size() - 1);
+      // Then get clean and rollback instants to archive.
+      List<HoodieInstant> cleanAndRollbackInstantsToArchive =
+          getCleanAndRollbackInstantsToArchive(newestCommitInstantToArchive);
+      instantsToArchive.addAll(cleanAndRollbackInstantsToArchive);

Review Comment:
   It was also unsorted before this PR. But I can add a sort here, obviously 
the instants written to the archive should be ordered



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to