This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 27c1edf97c3 [HUDI-6182] Fix the hive sync compatibility and improve
timeline check (#8917)
27c1edf97c3 is described below
commit 27c1edf97c34cefbda17d5f7da60f4aa6ad9a824
Author: Danny Chan <[email protected]>
AuthorDate: Sat Jun 10 04:16:53 2023 +0800
[HUDI-6182] Fix the hive sync compatibility and improve timeline check
(#8917)
Minor fixes after #8745
---
.../table/timeline/HoodieDefaultTimeline.java | 4 +--
.../hudi/common/table/timeline/TimelineUtils.java | 36 ++++++++++------------
.../hudi/common/table/TestTimelineUtils.java | 4 +--
.../apache/hudi/sync/common/HoodieSyncClient.java | 4 +--
4 files changed, 23 insertions(+), 25 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
index a0e95571165..c130a19ee53 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
@@ -341,12 +341,12 @@ public class HoodieDefaultTimeline implements
HoodieTimeline {
@Override
public boolean empty() {
- return getInstants().isEmpty();
+ return instants.isEmpty();
}
@Override
public int countInstants() {
- return getInstants().size();
+ return instants.size();
}
@Override
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
index 8745b4ad683..d0ca22ae13f 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
@@ -223,33 +223,31 @@ public class TimelineUtils {
*
* @param metaClient {@link HoodieTableMetaClient} instance.
* @param exclusiveStartInstantTime Start instant time (exclusive).
- * @param commitCompletionTime Last commit completion time synced
+ * @param lastMaxCompletionTime Last commit max completion time synced
* @return Hudi timeline.
*/
public static HoodieTimeline getCommitsTimelineAfter(
- HoodieTableMetaClient metaClient, String exclusiveStartInstantTime,
String commitCompletionTime) {
+ HoodieTableMetaClient metaClient, String exclusiveStartInstantTime,
Option<String> lastMaxCompletionTime) {
HoodieDefaultTimeline activeTimeline = metaClient.getActiveTimeline();
- HoodieDefaultTimeline timeline =
- activeTimeline.isBeforeTimelineStarts(exclusiveStartInstantTime)
- ? metaClient.getArchivedTimeline(exclusiveStartInstantTime)
- .mergeTimeline(activeTimeline)
- : activeTimeline;
+ HoodieDefaultTimeline timeline =
activeTimeline.isBeforeTimelineStarts(exclusiveStartInstantTime)
+ ?
metaClient.getArchivedTimeline(exclusiveStartInstantTime).mergeTimeline(activeTimeline)
+ : activeTimeline;
- // Get new instants with greater instant time than
exclusiveStartInstantTime
- Stream<HoodieInstant> newInstants = timeline.getCommitsTimeline()
- .findInstantsAfter(exclusiveStartInstantTime, Integer.MAX_VALUE)
- .getInstantsAsStream();
+ HoodieDefaultTimeline timelineSinceLastSync = (HoodieDefaultTimeline)
timeline.getCommitsTimeline()
+ .findInstantsAfter(exclusiveStartInstantTime, Integer.MAX_VALUE);
- // Get 'hollow' instants that have less instant time than
exclusiveStartInstantTime but with greater commit completion time
- Stream<HoodieInstant> hollowInstants = timeline.getCommitsTimeline()
- .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(),
LESSER_THAN, exclusiveStartInstantTime))
- .filter(s ->
HoodieTimeline.compareTimestamps(s.getStateTransitionTime(), GREATER_THAN,
commitCompletionTime))
- .getInstantsAsStream();
-
- Stream<HoodieInstant> mergedInstantStream = Stream.concat(newInstants,
hollowInstants).sorted();
+ if (lastMaxCompletionTime.isPresent()) {
+ // Get 'hollow' instants that have less instant time than
exclusiveStartInstantTime but with greater commit completion time
+ HoodieDefaultTimeline hollowInstantsTimeline = (HoodieDefaultTimeline)
timeline.getCommitsTimeline()
+ .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(),
LESSER_THAN, exclusiveStartInstantTime))
+ .filter(s ->
HoodieTimeline.compareTimestamps(s.getStateTransitionTime(), GREATER_THAN,
lastMaxCompletionTime.get()));
+ if (!hollowInstantsTimeline.empty()) {
+ return timelineSinceLastSync.mergeTimeline(hollowInstantsTimeline);
+ }
+ }
- return new HoodieDefaultTimeline(mergedInstantStream, timeline.details);
+ return timelineSinceLastSync;
}
/**
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
index a3b57b21f84..bac59403aee 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
@@ -290,7 +290,7 @@ public class TestTimelineUtils extends
HoodieCommonTestHarness {
new HoodieInstant(COMPLETED, COMMIT_ACTION, "009"),
new HoodieInstant(COMPLETED, COMMIT_ACTION, "011"),
new HoodieInstant(COMPLETED, COMMIT_ACTION, "012")),
- TimelineUtils.getCommitsTimelineAfter(mockMetaClient, startTs,
startTs));
+ TimelineUtils.getCommitsTimelineAfter(mockMetaClient, startTs,
Option.of(startTs)));
verify(mockMetaClient, never()).getArchivedTimeline(any());
// Should load both archived and active timeline
@@ -311,7 +311,7 @@ public class TestTimelineUtils extends
HoodieCommonTestHarness {
new HoodieInstant(COMPLETED, COMMIT_ACTION, "010", "010"),
new HoodieInstant(COMPLETED, COMMIT_ACTION, "011", "011"),
new HoodieInstant(COMPLETED, COMMIT_ACTION, "012", "012")),
- TimelineUtils.getCommitsTimelineAfter(mockMetaClient, startTs,
startTs));
+ TimelineUtils.getCommitsTimelineAfter(mockMetaClient, startTs,
Option.of(startTs)));
verify(mockMetaClient, times(1)).getArchivedTimeline(any());
}
diff --git
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
index d6e57754847..3eeb72f89e0 100644
---
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
+++
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
@@ -90,7 +90,7 @@ public abstract class HoodieSyncClient implements
HoodieMetaSyncOperations, Auto
*/
public Set<String> getDroppedPartitionsSince(Option<String>
lastCommitTimeSynced, Option<String> lastCommitCompletionTimeSynced) {
HoodieTimeline timeline = lastCommitTimeSynced.isPresent()
- ? TimelineUtils.getCommitsTimelineAfter(metaClient,
lastCommitTimeSynced.get(), lastCommitCompletionTimeSynced.get())
+ ? TimelineUtils.getCommitsTimelineAfter(metaClient,
lastCommitTimeSynced.get(), lastCommitCompletionTimeSynced)
: metaClient.getActiveTimeline();
return new HashSet<>(TimelineUtils.getDroppedPartitions(timeline));
}
@@ -135,7 +135,7 @@ public abstract class HoodieSyncClient implements
HoodieMetaSyncOperations, Auto
} else {
LOG.info("Last commit time synced is " + lastCommitTimeSynced.get() + ",
Getting commits since then");
return TimelineUtils.getWrittenPartitions(
- TimelineUtils.getCommitsTimelineAfter(metaClient,
lastCommitTimeSynced.get(), lastCommitCompletionTimeSynced.get()));
+ TimelineUtils.getCommitsTimelineAfter(metaClient,
lastCommitTimeSynced.get(), lastCommitCompletionTimeSynced));
}
}