This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 27c1edf97c3 [HUDI-6182] Fix the hive sync compatibility and improve 
timeline check (#8917)
27c1edf97c3 is described below

commit 27c1edf97c34cefbda17d5f7da60f4aa6ad9a824
Author: Danny Chan <[email protected]>
AuthorDate: Sat Jun 10 04:16:53 2023 +0800

    [HUDI-6182] Fix the hive sync compatibility and improve timeline check 
(#8917)
    
    Minor fixes after #8745
---
 .../table/timeline/HoodieDefaultTimeline.java      |  4 +--
 .../hudi/common/table/timeline/TimelineUtils.java  | 36 ++++++++++------------
 .../hudi/common/table/TestTimelineUtils.java       |  4 +--
 .../apache/hudi/sync/common/HoodieSyncClient.java  |  4 +--
 4 files changed, 23 insertions(+), 25 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
index a0e95571165..c130a19ee53 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
@@ -341,12 +341,12 @@ public class HoodieDefaultTimeline implements 
HoodieTimeline {
 
   @Override
   public boolean empty() {
-    return getInstants().isEmpty();
+    return instants.isEmpty();
   }
 
   @Override
   public int countInstants() {
-    return getInstants().size();
+    return instants.size();
   }
 
   @Override
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
index 8745b4ad683..d0ca22ae13f 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
@@ -223,33 +223,31 @@ public class TimelineUtils {
    *
    * @param metaClient                {@link HoodieTableMetaClient} instance.
    * @param exclusiveStartInstantTime Start instant time (exclusive).
-   * @param commitCompletionTime Last commit completion time synced
+   * @param lastMaxCompletionTime     Last commit max completion time synced
    * @return Hudi timeline.
    */
   public static HoodieTimeline getCommitsTimelineAfter(
-      HoodieTableMetaClient metaClient, String exclusiveStartInstantTime, 
String commitCompletionTime) {
+      HoodieTableMetaClient metaClient, String exclusiveStartInstantTime, 
Option<String> lastMaxCompletionTime) {
     HoodieDefaultTimeline activeTimeline = metaClient.getActiveTimeline();
 
-    HoodieDefaultTimeline timeline =
-        activeTimeline.isBeforeTimelineStarts(exclusiveStartInstantTime)
-            ? metaClient.getArchivedTimeline(exclusiveStartInstantTime)
-            .mergeTimeline(activeTimeline)
-            : activeTimeline;
+    HoodieDefaultTimeline timeline = 
activeTimeline.isBeforeTimelineStarts(exclusiveStartInstantTime)
+        ? 
metaClient.getArchivedTimeline(exclusiveStartInstantTime).mergeTimeline(activeTimeline)
+        : activeTimeline;
 
-    // Get new instants with greater instant time than 
exclusiveStartInstantTime
-    Stream<HoodieInstant> newInstants = timeline.getCommitsTimeline()
-        .findInstantsAfter(exclusiveStartInstantTime, Integer.MAX_VALUE)
-        .getInstantsAsStream();
+    HoodieDefaultTimeline timelineSinceLastSync = (HoodieDefaultTimeline) 
timeline.getCommitsTimeline()
+        .findInstantsAfter(exclusiveStartInstantTime, Integer.MAX_VALUE);
 
-    // Get 'hollow' instants that have less instant time than 
exclusiveStartInstantTime but with greater commit completion time
-    Stream<HoodieInstant> hollowInstants = timeline.getCommitsTimeline()
-        .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), 
LESSER_THAN, exclusiveStartInstantTime))
-        .filter(s -> 
HoodieTimeline.compareTimestamps(s.getStateTransitionTime(), GREATER_THAN, 
commitCompletionTime))
-        .getInstantsAsStream();
-
-    Stream<HoodieInstant> mergedInstantStream = Stream.concat(newInstants, 
hollowInstants).sorted();
+    if (lastMaxCompletionTime.isPresent()) {
+      // Get 'hollow' instants that have less instant time than 
exclusiveStartInstantTime but with greater commit completion time
+      HoodieDefaultTimeline hollowInstantsTimeline = (HoodieDefaultTimeline) 
timeline.getCommitsTimeline()
+          .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), 
LESSER_THAN, exclusiveStartInstantTime))
+          .filter(s -> 
HoodieTimeline.compareTimestamps(s.getStateTransitionTime(), GREATER_THAN, 
lastMaxCompletionTime.get()));
+      if (!hollowInstantsTimeline.empty()) {
+        return timelineSinceLastSync.mergeTimeline(hollowInstantsTimeline);
+      }
+    }
 
-    return new HoodieDefaultTimeline(mergedInstantStream, timeline.details);
+    return timelineSinceLastSync;
   }
 
   /**
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java 
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
index a3b57b21f84..bac59403aee 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
@@ -290,7 +290,7 @@ public class TestTimelineUtils extends 
HoodieCommonTestHarness {
             new HoodieInstant(COMPLETED, COMMIT_ACTION, "009"),
             new HoodieInstant(COMPLETED, COMMIT_ACTION, "011"),
             new HoodieInstant(COMPLETED, COMMIT_ACTION, "012")),
-        TimelineUtils.getCommitsTimelineAfter(mockMetaClient, startTs, 
startTs));
+        TimelineUtils.getCommitsTimelineAfter(mockMetaClient, startTs, 
Option.of(startTs)));
     verify(mockMetaClient, never()).getArchivedTimeline(any());
 
     // Should load both archived and active timeline
@@ -311,7 +311,7 @@ public class TestTimelineUtils extends 
HoodieCommonTestHarness {
             new HoodieInstant(COMPLETED, COMMIT_ACTION, "010", "010"),
             new HoodieInstant(COMPLETED, COMMIT_ACTION, "011", "011"),
             new HoodieInstant(COMPLETED, COMMIT_ACTION, "012", "012")),
-        TimelineUtils.getCommitsTimelineAfter(mockMetaClient, startTs, 
startTs));
+        TimelineUtils.getCommitsTimelineAfter(mockMetaClient, startTs, 
Option.of(startTs)));
     verify(mockMetaClient, times(1)).getArchivedTimeline(any());
   }
 
diff --git 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
index d6e57754847..3eeb72f89e0 100644
--- 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
+++ 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
@@ -90,7 +90,7 @@ public abstract class HoodieSyncClient implements 
HoodieMetaSyncOperations, Auto
    */
   public Set<String> getDroppedPartitionsSince(Option<String> 
lastCommitTimeSynced, Option<String> lastCommitCompletionTimeSynced) {
     HoodieTimeline timeline = lastCommitTimeSynced.isPresent()
-        ? TimelineUtils.getCommitsTimelineAfter(metaClient, 
lastCommitTimeSynced.get(), lastCommitCompletionTimeSynced.get())
+        ? TimelineUtils.getCommitsTimelineAfter(metaClient, 
lastCommitTimeSynced.get(), lastCommitCompletionTimeSynced)
         : metaClient.getActiveTimeline();
     return new HashSet<>(TimelineUtils.getDroppedPartitions(timeline));
   }
@@ -135,7 +135,7 @@ public abstract class HoodieSyncClient implements 
HoodieMetaSyncOperations, Auto
     } else {
       LOG.info("Last commit time synced is " + lastCommitTimeSynced.get() + ", 
Getting commits since then");
       return TimelineUtils.getWrittenPartitions(
-          TimelineUtils.getCommitsTimelineAfter(metaClient, 
lastCommitTimeSynced.get(), lastCommitCompletionTimeSynced.get()));
+          TimelineUtils.getCommitsTimelineAfter(metaClient, 
lastCommitTimeSynced.get(), lastCommitCompletionTimeSynced));
     }
   }
 

Reply via email to