This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 44ab6f32bff [HUDI-6538] Refactor methods in TimelineDiffHelper class
(#10938)
44ab6f32bff is described below
commit 44ab6f32bffbab8cd250bd0430d9591360f118e7
Author: wombatu-kun <[email protected]>
AuthorDate: Mon Apr 1 12:47:27 2024 +0700
[HUDI-6538] Refactor methods in TimelineDiffHelper class (#10938)
---
.../common/table/timeline/TimelineDiffHelper.java | 66 +++++++---------------
1 file changed, 21 insertions(+), 45 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineDiffHelper.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineDiffHelper.java
index aa7e2a30754..a98b71aa571 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineDiffHelper.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineDiffHelper.java
@@ -37,8 +37,11 @@ public class TimelineDiffHelper {
private static final Logger LOG =
LoggerFactory.getLogger(TimelineDiffHelper.class);
+ private TimelineDiffHelper() {
+ }
+
public static TimelineDiffResult
getNewInstantsForIncrementalSync(HoodieTimeline oldTimeline,
- HoodieTimeline newTimeline) {
+
HoodieTimeline newTimeline) {
HoodieTimeline oldT = oldTimeline.filterCompletedAndCompactionInstants();
HoodieTimeline newT = newTimeline.filterCompletedAndCompactionInstants();
@@ -57,14 +60,14 @@ public class TimelineDiffHelper {
List<HoodieInstant> newInstants = new ArrayList<>();
// Check If any pending compaction is lost. If so, do not allow
incremental timeline sync
- List<Pair<HoodieInstant, HoodieInstant>> compactionInstants =
getPendingCompactionTransitions(oldT, newT);
+ List<Pair<HoodieInstant, HoodieInstant>> compactionInstants =
getPendingActionTransitions(oldT.filterPendingCompactionTimeline(),
+ newT, HoodieTimeline.COMMIT_ACTION,
HoodieTimeline.COMPACTION_ACTION);
List<HoodieInstant> lostPendingCompactions = compactionInstants.stream()
.filter(instantPair -> instantPair.getValue() ==
null).map(Pair::getKey).collect(Collectors.toList());
if (!lostPendingCompactions.isEmpty()) {
// If a compaction is unscheduled, fall back to complete refresh of fs
view since some log files could have been
// moved. Its unsafe to incrementally sync in that case.
- LOG.warn("Some pending compactions are no longer in new timeline
(unscheduled ?). They are :"
- + lostPendingCompactions);
+ LOG.warn("Some pending compactions are no longer in new timeline
(unscheduled ?). They are: {}", lostPendingCompactions);
return TimelineDiffResult.UNSAFE_SYNC_RESULT;
}
List<HoodieInstant> finishedCompactionInstants =
compactionInstants.stream()
@@ -74,7 +77,8 @@ public class TimelineDiffHelper {
newTimeline.getInstantsAsStream().filter(instant ->
!oldTimelineInstants.contains(instant)).forEach(newInstants::add);
- List<Pair<HoodieInstant, HoodieInstant>> logCompactionInstants =
getPendingLogCompactionTransitions(oldTimeline, newTimeline);
+ List<Pair<HoodieInstant, HoodieInstant>> logCompactionInstants =
getPendingActionTransitions(oldTimeline.filterPendingLogCompactionTimeline(),
+ newTimeline, HoodieTimeline.DELTA_COMMIT_ACTION,
HoodieTimeline.LOG_COMPACTION_ACTION);
List<HoodieInstant> finishedOrRemovedLogCompactionInstants =
logCompactionInstants.stream()
.filter(instantPair -> !instantPair.getKey().isCompleted()
&& (instantPair.getValue() == null ||
instantPair.getValue().isCompleted()))
@@ -87,52 +91,24 @@ public class TimelineDiffHelper {
}
}
- /**
- * Getting pending log compaction transitions.
- */
- private static List<Pair<HoodieInstant, HoodieInstant>>
getPendingLogCompactionTransitions(HoodieTimeline oldTimeline,
-
HoodieTimeline newTimeline) {
- Set<HoodieInstant> newTimelineInstants =
newTimeline.getInstantsAsStream().collect(Collectors.toSet());
-
- return
oldTimeline.filterPendingLogCompactionTimeline().getInstantsAsStream().map(instant
-> {
- if (newTimelineInstants.contains(instant)) {
- return Pair.of(instant, instant);
- } else {
- HoodieInstant logCompacted =
- new HoodieInstant(State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, instant.getTimestamp());
- if (newTimelineInstants.contains(logCompacted)) {
- return Pair.of(instant, logCompacted);
- }
- HoodieInstant inflightLogCompacted =
- new HoodieInstant(State.INFLIGHT,
HoodieTimeline.LOG_COMPACTION_ACTION, instant.getTimestamp());
- if (newTimelineInstants.contains(inflightLogCompacted)) {
- return Pair.of(instant, inflightLogCompacted);
- }
- return Pair.<HoodieInstant, HoodieInstant>of(instant, null);
- }
- }).collect(Collectors.toList());
- }
-
- /**
- * Getting pending compaction transitions.
- */
- private static List<Pair<HoodieInstant, HoodieInstant>>
getPendingCompactionTransitions(HoodieTimeline oldTimeline,
- HoodieTimeline newTimeline) {
+ private static List<Pair<HoodieInstant, HoodieInstant>>
getPendingActionTransitions(HoodieTimeline pendingActionTimelineFromOld,
+
HoodieTimeline newTimeline,
+
String completedAction, String pendingAction) {
Set<HoodieInstant> newTimelineInstants =
newTimeline.getInstantsAsStream().collect(Collectors.toSet());
- return
oldTimeline.filterPendingCompactionTimeline().getInstantsAsStream().map(instant
-> {
+ return pendingActionTimelineFromOld.getInstantsAsStream().map(instant -> {
if (newTimelineInstants.contains(instant)) {
return Pair.of(instant, instant);
} else {
- HoodieInstant compacted =
- new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION,
instant.getTimestamp());
- if (newTimelineInstants.contains(compacted)) {
- return Pair.of(instant, compacted);
+ HoodieInstant completedInstant =
+ new HoodieInstant(State.COMPLETED, completedAction,
instant.getTimestamp());
+ if (newTimelineInstants.contains(completedInstant)) {
+ return Pair.of(instant, completedInstant);
}
- HoodieInstant inflightCompacted =
- new HoodieInstant(State.INFLIGHT,
HoodieTimeline.COMPACTION_ACTION, instant.getTimestamp());
- if (newTimelineInstants.contains(inflightCompacted)) {
- return Pair.of(instant, inflightCompacted);
+ HoodieInstant inflightInstant =
+ new HoodieInstant(State.INFLIGHT, pendingAction,
instant.getTimestamp());
+ if (newTimelineInstants.contains(inflightInstant)) {
+ return Pair.of(instant, inflightInstant);
}
return Pair.<HoodieInstant, HoodieInstant>of(instant, null);
}