suryaprasanna commented on code in PR #9007:
URL: https://github.com/apache/hudi/pull/9007#discussion_r1264527544
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java:
##########
@@ -53,6 +53,7 @@ public static Comparator<String>
getReverseCommitTimeComparator() {
/**
* Timeline, based on which all getter work.
+ * This should be a write timeline that contains either completed instants
or pending compaction instants.
Review Comment:
Good idea, I will use this name across all the classes.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -134,14 +134,20 @@ protected void init(HoodieTableMetaClient metaClient,
HoodieTimeline visibleActi
* @param visibleActiveTimeline Visible Active Timeline
*/
protected void refreshTimeline(HoodieTimeline visibleActiveTimeline) {
- this.visibleCommitsAndCompactionTimeline =
visibleActiveTimeline.getWriteTimeline();
+ this.visibleCommitsAndCompactionTimeline = visibleActiveTimeline
+ .getWriteTimeline()
+ .filterCompletedAndCompactionInstants();
}
/**
* Adds the provided statuses into the file system view, and also caches it
inside this object.
*/
public List<HoodieFileGroup> addFilesToView(FileStatus[] statuses) {
- HoodieTimer timer = HoodieTimer.start();
+ HoodieTimer timer = new HoodieTimer().startTimer();
+ // Timeline determines whether a file slice is committed or not. Timeline
object is attached to each HoodieFileGroup.
+ // Using this timeline object HoodieFileGroup determines whether a
fileSlice is committed or not.
+ // visibleCommitsAndCompactionTimeline contains entire write timeline with
pending commits, so filtering it
Review Comment:
I think this comment was leftover from original PR that I have raised.
Removed it from here and added it at the appropriate place.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java:
##########
@@ -145,6 +150,17 @@ Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String
partitionPath, String max
*/
Stream<FileSlice> getLatestMergedFileSlicesBeforeOrOn(String
partitionPath, String maxInstantTime);
+ /**
+ * Stream all "merged" file-slices before on an instant time If a
file-group has a pending compaction request, the
Review Comment:
Fixed it.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -170,15 +176,25 @@ public List<HoodieFileGroup> addFilesToView(FileStatus[]
statuses) {
/**
* Build FileGroups from passed in file-status.
+ * TODO: Note here timeline is completedAndCompactionTimeline.
Review Comment:
Yeah, makes sense. We can also avoid doing multiple times filtering on the
activeTimeline to get this timeline. So, renamed in all the places so that
developers dont even have to perform extra filtering.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java:
##########
@@ -327,11 +357,43 @@ private void addRollbackInstant(HoodieTimeline timeline,
HoodieInstant instant)
LOG.info("Done Syncing rollback instant (" + instant + ")");
}
+ /**
+ * Add pending replace file groups
+ * @param pendingClusteringInstant pending instant
+ * @throws IOException
+ */
+ private void addPendingClusteringInstant(HoodieInstant
pendingClusteringInstant) throws IOException {
+ LOG.info("Syncing pending clustering instant (" + pendingClusteringInstant
+ ")");
+ Stream<Pair<HoodieFileGroupId, HoodieInstant>> fileGroupStreamToInstant =
+
ClusteringUtils.getFileGroupEntriesFromClusteringInstant(pendingClusteringInstant,
metaClient);
+ addFileGroupsInPendingClustering(fileGroupStreamToInstant);
+ }
+
+ /**
+ * Remove Pending compaction instant. This is called when logcompaction is
converted to delta commit or the log compaction
Review Comment:
Fixed it.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java:
##########
@@ -181,17 +204,24 @@ private void removePendingCompactionInstant(HoodieInstant
instant) throws IOExce
}
/**
- * Remove Pending compaction instant. This is called when logcompaction is
converted to delta commit,
- * so you no longer need to track them as pending.
+ * Remove Pending compaction instant. This is called when logcompaction is
converted to delta commit or the log compaction
+ * operation is failed, so it is no longer need to be tracked as pending.
*
* @param instant Log Compaction Instant to be removed
+ * @param isSuccessful true if log compaction operation is successful, false
if the operation is failed and rollbacked.
*/
- private void removePendingLogCompactionInstant(HoodieInstant instant) throws
IOException {
- LOG.info("Removing completed log compaction instant (" + instant + ")");
- HoodieCompactionPlan plan =
CompactionUtils.getLogCompactionPlan(metaClient, instant.getTimestamp());
-
removePendingLogCompactionOperations(CompactionUtils.getPendingCompactionOperations(instant,
plan)
- .map(instantPair -> Pair.of(instantPair.getValue().getKey(),
-
CompactionOperation.convertFromAvroRecordInstance(instantPair.getValue().getValue()))));
+ private void removePendingLogCompactionInstant(HoodieInstant instant,
boolean isSuccessful) throws IOException {
+ if (isSuccessful) {
+ LOG.info("Removing completed log compaction instant (" + instant + ")");
+ HoodieCompactionPlan plan =
CompactionUtils.getLogCompactionPlan(metaClient, instant.getTimestamp());
+
removePendingLogCompactionOperations(CompactionUtils.getPendingCompactionOperations(instant,
plan)
+ .map(instantPair -> Pair.of(instantPair.getValue().getKey(),
+
CompactionOperation.convertFromAvroRecordInstance(instantPair.getValue().getValue()))));
+ } else {
+ LOG.info("Removing failed log compaction instant (" + instant + ")");
+ removePendingLogCompactionOperations(instant.getTimestamp());
Review Comment:
If a compaction instant is removed then it is not safe to do incremental
file sync, because there could be removal of instants and also removal of file
slices, so we fail the check in TimelineDiffHelper and the job automatically
fallback on doing full sync.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java:
##########
@@ -181,17 +204,24 @@ private void removePendingCompactionInstant(HoodieInstant
instant) throws IOExce
}
/**
- * Remove Pending compaction instant. This is called when logcompaction is
converted to delta commit,
- * so you no longer need to track them as pending.
+ * Remove Pending compaction instant. This is called when logcompaction is
converted to delta commit or the log compaction
Review Comment:
Fixed it.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineDiffHelper.java:
##########
@@ -146,17 +189,22 @@ public static class TimelineDiffResult {
private final List<HoodieInstant> newlySeenInstants;
private final List<HoodieInstant> finishedCompactionInstants;
- private final List<HoodieInstant> finishedOrRemovedLogCompactionInstants;
+ // Completed instants will have true as the value where as instants
removed due to rollback will have false as value.
+ private final List<Pair<HoodieInstant, Boolean>>
finishedOrRemovedLogCompactionInstants;
+ // Completed instants will have true as the value where as instants
removed due to rollback will have false as value.
+ private final List<Pair<HoodieInstant, Boolean>>
finishedOrRemovedReplaceCommitInstants;
private final boolean canSyncIncrementally;
public static final TimelineDiffResult UNSAFE_SYNC_RESULT =
- new TimelineDiffResult(null, null, null, false);
+ new TimelineDiffResult(null, null, null, null, false);
public TimelineDiffResult(List<HoodieInstant> newlySeenInstants,
List<HoodieInstant> finishedCompactionInstants,
- List<HoodieInstant>
finishedOrRemovedLogCompactionInstants, boolean canSyncIncrementally) {
+ List<Pair<HoodieInstant, Boolean>>
finishedOrRemovedLogCompactionInstants,
+ List<Pair<HoodieInstant, Boolean>>
finishedOrRemovedReplaceCommitInstants, boolean canSyncIncrementally) {
this.newlySeenInstants = newlySeenInstants;
this.finishedCompactionInstants = finishedCompactionInstants;
this.finishedOrRemovedLogCompactionInstants =
finishedOrRemovedLogCompactionInstants;
+ this.finishedOrRemovedReplaceCommitInstants =
finishedOrRemovedReplaceCommitInstants;
Review Comment:
Yes, renamed all these variables.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java:
##########
@@ -170,7 +170,7 @@ public interface HoodieTimeline extends Serializable {
*
* @return
*/
- HoodieTimeline filterCompletedInstantsOrRewriteTimeline();
+ HoodieTimeline filterCompletedAndRewriteInstants();
Review Comment:
Here we need to filter based on completed instants and rewrite instants. So,
with rewrite you can get pending replacecommits, logcompactions and compactions
with completed you will get all the commits that are completed.
This API is similar to filterCompletedAndCompactionInstants, but instead of
just including compactions it also includes other rewrites instants.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java:
##########
@@ -159,7 +177,12 @@ private void runIncrementalSync(HoodieTimeline timeline,
TimelineDiffResult diff
} else if
(instant.getAction().equals(HoodieTimeline.ROLLBACK_ACTION)) {
addRollbackInstant(timeline, instant);
} else if
(instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
- addReplaceInstant(timeline, instant);
+ boolean isClusteringCommit =
ClusteringUtils.isClusteringCommit(metaClient, instant);
+ if (!instant.isCompleted() && isClusteringCommit) {
+ addPendingClusteringInstant(instant);
+ } else {
+ addReplaceInstant(timeline, instant);
Review Comment:
Completed insert-overwrite commits also can come here.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineDiffHelper.java:
##########
@@ -74,19 +74,62 @@ public static TimelineDiffResult
getNewInstantsForIncrementalSync(HoodieTimeline
newTimeline.getInstantsAsStream().filter(instant ->
!oldTimelineInstants.contains(instant)).forEach(newInstants::add);
+ // Check for log compaction commits completed or removed.
List<Pair<HoodieInstant, HoodieInstant>> logCompactionInstants =
getPendingLogCompactionTransitions(oldTimeline, newTimeline);
- List<HoodieInstant> finishedOrRemovedLogCompactionInstants =
logCompactionInstants.stream()
+ List<Pair<HoodieInstant, Boolean>>
finishedOrRemovedLogCompactionInstants = logCompactionInstants.stream()
.filter(instantPair -> !instantPair.getKey().isCompleted()
&& (instantPair.getValue() == null ||
instantPair.getValue().isCompleted()))
- .map(Pair::getKey).collect(Collectors.toList());
- return new TimelineDiffResult(newInstants, finishedCompactionInstants,
finishedOrRemovedLogCompactionInstants, true);
+ .map(instantPair -> (instantPair.getValue() == null)
Review Comment:
Refactored it.
##########
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java:
##########
@@ -325,6 +325,15 @@ private void registerFileSlicesAPI() {
writeValueAsString(ctx, dtos);
}, true));
+
app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_SLICES_INCLUDE_PENDING_URL,
new ViewHandler(ctx -> {
+ metricsRegistry.add("LATEST_PARTITION_SLICES", 1);
Review Comment:
Fixed it.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineDiffHelper.java:
##########
@@ -74,19 +74,62 @@ public static TimelineDiffResult
getNewInstantsForIncrementalSync(HoodieTimeline
newTimeline.getInstantsAsStream().filter(instant ->
!oldTimelineInstants.contains(instant)).forEach(newInstants::add);
+ // Check for log compaction commits completed or removed.
List<Pair<HoodieInstant, HoodieInstant>> logCompactionInstants =
getPendingLogCompactionTransitions(oldTimeline, newTimeline);
- List<HoodieInstant> finishedOrRemovedLogCompactionInstants =
logCompactionInstants.stream()
+ List<Pair<HoodieInstant, Boolean>>
finishedOrRemovedLogCompactionInstants = logCompactionInstants.stream()
.filter(instantPair -> !instantPair.getKey().isCompleted()
&& (instantPair.getValue() == null ||
instantPair.getValue().isCompleted()))
- .map(Pair::getKey).collect(Collectors.toList());
- return new TimelineDiffResult(newInstants, finishedCompactionInstants,
finishedOrRemovedLogCompactionInstants, true);
+ .map(instantPair -> (instantPair.getValue() == null)
+ ? Pair.of(instantPair.getKey(), false) :
Pair.of(instantPair.getKey(), true))
+ .collect(Collectors.toList());
+
+ // Check for replace commits completed or removed.
+ List<Pair<HoodieInstant, HoodieInstant>> replaceCommitInstants =
getPendingReplaceCommitTransitions(oldTimeline, newTimeline);
+ List<Pair<HoodieInstant, Boolean>>
finishedOrRemovedReplaceCommitInstants = replaceCommitInstants.stream()
+ .filter(instantPair -> !instantPair.getKey().isCompleted()
+ && (instantPair.getValue() == null ||
instantPair.getValue().isCompleted()))
+ .map(instantPair -> (instantPair.getValue() == null)
+ ? Pair.of(instantPair.getKey(), false) :
Pair.of(instantPair.getKey(), true))
+ .collect(Collectors.toList());
+
+ // New instants will contains instants that are newly completed commits
or newly created pending rewrite commits
+ // (i.e. compaction, logcompaciton, replacecommit)
+ // Finished or removed rewrite commits are handled independently.
+ return new TimelineDiffResult(newInstants, finishedCompactionInstants,
finishedOrRemovedLogCompactionInstants,
+ finishedOrRemovedReplaceCommitInstants, true);
} else {
// One or more timelines is empty
LOG.warn("One or more timelines is empty");
return TimelineDiffResult.UNSAFE_SYNC_RESULT;
}
}
+ /**
+ * Get pending replacecommit transitions.
+ */
+ private static List<Pair<HoodieInstant, HoodieInstant>>
getPendingReplaceCommitTransitions(HoodieTimeline oldTimeline,
Review Comment:
Good catch, I think the filtering is missing in the HoodieTimeline's
filterCompletedAndRewriteInstants method. I added it now. Also, renamed as
clustering transitions instead of replace-commit transitions.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java:
##########
@@ -544,10 +574,13 @@ protected Option<HoodieInstant> getReplaceInstant(final
HoodieFileGroupId fileGr
}
private Stream<HoodieFileGroup> getFileGroups(Stream<FileSlice> sliceStream)
{
+ // HoodieFileGroup using timeline to determine whether a file slice is
committed or not so passing
+ // completedAndCompactionTimeline object while creating HoodieFileGroup.
+ HoodieTimeline completedAndCompactionTimeline =
getVisibleCommitsAndCompactionTimeline();
Review Comment:
Refactored it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]