danny0405 commented on code in PR #9007:
URL: https://github.com/apache/hudi/pull/9007#discussion_r1233519003
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineDiffHelper.java:
##########
@@ -40,8 +40,8 @@ public class TimelineDiffHelper {
public static TimelineDiffResult
getNewInstantsForIncrementalSync(HoodieTimeline oldTimeline,
HoodieTimeline newTimeline) {
- HoodieTimeline oldT = oldTimeline.filterCompletedAndCompactionInstants();
- HoodieTimeline newT = newTimeline.filterCompletedAndCompactionInstants();
+ HoodieTimeline oldT =
oldTimeline.filterCompletedInstantsOrRewriteTimeline();
+ HoodieTimeline newT =
newTimeline.filterCompletedInstantsOrRewriteTimeline();
Review Comment:
The `filterCompletedInstantsOrRewriteTimeline` does not look like a good
name, how about rename it to `filterCompletedAndRewriteInstants`
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineDiffHelper.java:
##########
@@ -74,19 +74,62 @@ public static TimelineDiffResult
getNewInstantsForIncrementalSync(HoodieTimeline
newTimeline.getInstantsAsStream().filter(instant ->
!oldTimelineInstants.contains(instant)).forEach(newInstants::add);
+ // Check for log compaction commits completed or removed.
List<Pair<HoodieInstant, HoodieInstant>> logCompactionInstants =
getPendingLogCompactionTransitions(oldTimeline, newTimeline);
- List<HoodieInstant> finishedOrRemovedLogCompactionInstants =
logCompactionInstants.stream()
+ List<Pair<HoodieInstant, Boolean>>
finishedOrRemovedLogCompactionInstants = logCompactionInstants.stream()
.filter(instantPair -> !instantPair.getKey().isCompleted()
&& (instantPair.getValue() == null ||
instantPair.getValue().isCompleted()))
- .map(Pair::getKey).collect(Collectors.toList());
- return new TimelineDiffResult(newInstants, finishedCompactionInstants,
finishedOrRemovedLogCompactionInstants, true);
+ .map(instantPair -> (instantPair.getValue() == null)
+ ? Pair.of(instantPair.getKey(), false) :
Pair.of(instantPair.getKey(), true))
+ .collect(Collectors.toList());
+
+ // Check for replace commits completed or removed.
+ List<Pair<HoodieInstant, HoodieInstant>> replaceCommitInstants =
getPendingReplaceCommitTransitions(oldTimeline, newTimeline);
+ List<Pair<HoodieInstant, Boolean>>
finishedOrRemovedReplaceCommitInstants = replaceCommitInstants.stream()
+ .filter(instantPair -> !instantPair.getKey().isCompleted()
+ && (instantPair.getValue() == null ||
instantPair.getValue().isCompleted()))
+ .map(instantPair -> (instantPair.getValue() == null)
+ ? Pair.of(instantPair.getKey(), false) :
Pair.of(instantPair.getKey(), true))
+ .collect(Collectors.toList());
+
+ // New instants will contains instants that are newly completed commits
or newly created pending rewrite commits
+ // (i.e. compaction, logcompaciton, replacecommit)
+ // Finished or removed rewrite commits are handled independently.
+ return new TimelineDiffResult(newInstants, finishedCompactionInstants,
finishedOrRemovedLogCompactionInstants,
+ finishedOrRemovedReplaceCommitInstants, true);
} else {
// One or more timelines is empty
LOG.warn("One or more timelines is empty");
return TimelineDiffResult.UNSAFE_SYNC_RESULT;
}
}
+ /**
+ * Get pending replacecommit transitions.
+ */
+ private static List<Pair<HoodieInstant, HoodieInstant>>
getPendingReplaceCommitTransitions(HoodieTimeline oldTimeline,
+
HoodieTimeline newTimeline) {
+ Set<HoodieInstant> newTimelineInstants =
newTimeline.getInstantsAsStream().collect(Collectors.toSet());
+
+ return
oldTimeline.filterPendingReplaceTimeline().getInstantsAsStream().map(instant ->
{
+ if (newTimelineInstants.contains(instant)) {
Review Comment:
The methods `getPendingReplaceCommitTransitions ` and
`getPendingLogCompactionTransitions` look almost the same except the action
type, can we abstract a little to merge them altogether?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java:
##########
@@ -118,6 +118,8 @@ public class HoodieTableMetaClient implements Serializable {
private ConsistencyGuardConfig consistencyGuardConfig =
ConsistencyGuardConfig.newBuilder().build();
private FileSystemRetryConfig fileSystemRetryConfig =
FileSystemRetryConfig.newBuilder().build();
protected HoodieMetaserverConfig metaserverConfig;
+ // Tracks the status of the last incremental file sync
+ private boolean isLastIncrementalSyncSuccessful;
Review Comment:
Don't think the meta client should hold state like this. We should move the
state into `IncrementalTimelineSyncFileSystemView`
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineDiffHelper.java:
##########
@@ -74,19 +74,62 @@ public static TimelineDiffResult
getNewInstantsForIncrementalSync(HoodieTimeline
newTimeline.getInstantsAsStream().filter(instant ->
!oldTimelineInstants.contains(instant)).forEach(newInstants::add);
+ // Check for log compaction commits completed or removed.
List<Pair<HoodieInstant, HoodieInstant>> logCompactionInstants =
getPendingLogCompactionTransitions(oldTimeline, newTimeline);
- List<HoodieInstant> finishedOrRemovedLogCompactionInstants =
logCompactionInstants.stream()
+ List<Pair<HoodieInstant, Boolean>>
finishedOrRemovedLogCompactionInstants = logCompactionInstants.stream()
.filter(instantPair -> !instantPair.getKey().isCompleted()
&& (instantPair.getValue() == null ||
instantPair.getValue().isCompleted()))
- .map(Pair::getKey).collect(Collectors.toList());
- return new TimelineDiffResult(newInstants, finishedCompactionInstants,
finishedOrRemovedLogCompactionInstants, true);
+ .map(instantPair -> (instantPair.getValue() == null)
+ ? Pair.of(instantPair.getKey(), false) :
Pair.of(instantPair.getKey(), true))
Review Comment:
The strategy handling of deleted pending instants for compaction and
logcompaction/clustering are different, for compaction we return early, do you
think we should unify the handling?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]