yihua commented on code in PR #9416:
URL: https://github.com/apache/hudi/pull/9416#discussion_r1325208804
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java:
##########
@@ -452,107 +431,138 @@ private Stream<HoodieInstant>
getCommitInstantsToArchive() throws IOException {
? CompactionUtils.getOldestInstantToRetainForCompaction(
table.getActiveTimeline(),
config.getInlineCompactDeltaCommitMax())
: Option.empty();
+ oldestInstantToRetainCandidates.add(oldestInstantToRetainForCompaction);
- // The clustering commit instant can not be archived unless we ensure
that the replaced files have been cleaned,
+ // 3. The clustering commit instant can not be archived unless we ensure
that the replaced files have been cleaned,
// without the replaced files metadata on the timeline, the fs view
would expose duplicates for readers.
// Meanwhile, when inline or async clustering is enabled, we need to
ensure that there is a commit in the active timeline
// to check whether the file slice generated in pending clustering after
archive isn't committed.
Option<HoodieInstant> oldestInstantToRetainForClustering =
ClusteringUtils.getOldestInstantToRetainForClustering(table.getActiveTimeline(),
table.getMetaClient());
+ oldestInstantToRetainCandidates.add(oldestInstantToRetainForClustering);
+
+ // 4. If metadata table is enabled, do not archive instants which are
more recent than the last compaction on the
+ // metadata table.
+ if (table.getMetaClient().getTableConfig().isMetadataTableAvailable()) {
+ try (HoodieTableMetadata tableMetadata =
HoodieTableMetadata.create(table.getContext(), config.getMetadataConfig(),
config.getBasePath())) {
+ Option<String> latestCompactionTime =
tableMetadata.getLatestCompactionTime();
+ if (!latestCompactionTime.isPresent()) {
+ LOG.info("Not archiving as there is no compaction yet on the
metadata table");
+ return Collections.emptyList();
+ } else {
+ LOG.info("Limiting archiving of instants to latest compaction on
metadata table at " + latestCompactionTime.get());
+ oldestInstantToRetainCandidates.add(Option.of(new HoodieInstant(
+ HoodieInstant.State.COMPLETED, COMPACTION_ACTION,
latestCompactionTime.get())));
+ }
+ } catch (Exception e) {
+ throw new HoodieException("Error limiting instant archival based on
metadata table", e);
+ }
+ }
+
+ // 5. If this is a metadata table, do not archive the commits that live
in data set
+ // active timeline. This is required by metadata table,
+ // see HoodieTableMetadataUtil#processRollbackMetadata for details.
+ if (table.isMetadataTable()) {
+ HoodieTableMetaClient dataMetaClient = HoodieTableMetaClient.builder()
+
.setBasePath(HoodieTableMetadata.getDatasetBasePath(config.getBasePath()))
+ .setConf(metaClient.getHadoopConf())
+ .build();
+ Option<HoodieInstant> qualifiedEarliestInstant =
+ TimelineUtils.getEarliestInstantForMetadataArchival(
+ dataMetaClient.getActiveTimeline(),
config.shouldArchiveBeyondSavepoint());
+
+ // Do not archive the instants after the earliest commit (COMMIT,
DELTA_COMMIT, and
+ // REPLACE_COMMIT only, considering non-savepoint commit only if
enabling archive
+ // beyond savepoint) and the earliest inflight instant (all actions).
+ // This is required by metadata table, see
HoodieTableMetadataUtil#processRollbackMetadata
+ // for details.
+ // Todo: Remove #7580
Review Comment:
+1 on @danny0405 's comment. Before this PR, the logic is to keep at least
one clean and rollback instant on the active timeline. Archiving all rollback
instants in the active timeline are ok, but there's caveat on archiving all
clean instants (e.g., all completed clean instants are before the earliest
commit to retain; one of the cases could be the user turns off cleaning for
some time and turns it on again). The incremental cleaning may not get the
latest clean instant.
However, it's not the right way either to keep at least one clean instant in
the active timeline in the new logic, because that can block the archival of
commits for the sake of a contiguous block of instants.
@Zouxxyy could you double check if there is any issue wrt to different
cleaning modes under the new archival logic? e.g., incremental cleaning should
fall back to full cleaning if there's no clean instant on the active timeline.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java:
##########
@@ -388,60 +387,40 @@ private Map<String, Boolean>
deleteFilesParallelize(HoodieTableMetaClient metaCl
paths);
}
- private Stream<HoodieInstant> getCleanInstantsToArchive() {
- HoodieTimeline cleanAndRollbackTimeline = table.getActiveTimeline()
-
.getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION,
HoodieTimeline.ROLLBACK_ACTION)).filterCompletedInstants();
- return cleanAndRollbackTimeline.getInstantsAsStream()
-
.collect(Collectors.groupingBy(HoodieInstant::getAction)).values().stream()
- .map(hoodieInstants -> {
- if (hoodieInstants.size() > this.maxInstantsToKeep) {
- return hoodieInstants.subList(0, hoodieInstants.size() -
this.minInstantsToKeep);
- } else {
- return new ArrayList<HoodieInstant>();
- }
- }).flatMap(Collection::stream);
- }
-
- private Stream<HoodieInstant> getCommitInstantsToArchive() throws
IOException {
- // TODO (na) : Add a way to return actions associated with a timeline and
then merge/unify
- // with logic above to avoid Stream.concat
- HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
-
- // Get the oldest inflight instant and a completed commit before this
inflight instant.
- Option<HoodieInstant> oldestPendingInstant = table.getActiveTimeline()
- .getWriteTimeline()
- .filter(instant -> !instant.isCompleted())
- .firstInstant();
-
- // Oldest commit to retain is the greatest completed commit, that is less
than the oldest pending instant.
- // In some cases when inflight is the lowest commit then oldest commit to
retain will be equal to oldest
- // inflight commit.
- Option<HoodieInstant> oldestCommitToRetain;
- if (oldestPendingInstant.isPresent()) {
- Option<HoodieInstant> completedCommitBeforeOldestPendingInstant =
- Option.fromJavaOptional(commitTimeline.getReverseOrderedInstants()
- .filter(instant ->
HoodieTimeline.compareTimestamps(instant.getTimestamp(),
- LESSER_THAN,
oldestPendingInstant.get().getTimestamp())).findFirst());
- // Check if the completed instant is higher than the oldest inflight
instant
- // in that case update the oldestCommitToRetain to oldestInflight commit
time.
- if (!completedCommitBeforeOldestPendingInstant.isPresent()
- ||
HoodieTimeline.compareTimestamps(oldestPendingInstant.get().getTimestamp(),
- LESSER_THAN,
completedCommitBeforeOldestPendingInstant.get().getTimestamp())) {
- oldestCommitToRetain = oldestPendingInstant;
+ private List<HoodieInstant> getCommitInstantsToArchive() throws IOException {
+ HoodieTimeline completedCommitsTimeline =
table.getCompletedCommitsTimeline();
+
+ if (completedCommitsTimeline.countInstants() > maxInstantsToKeep) {
+ // Step1: Get all candidates of oldestInstantToRetain.
+ List<Option<HoodieInstant>> oldestInstantToRetainCandidates = new
ArrayList<>();
+
Review Comment:
I think the new logic is easier to understand than the previous logic. We
need to make sure there is no bug introduced.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]