This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 78094566e4b [HUDI-6182] Hive sync use state transient time to avoid 
losing partit… (#8745)
78094566e4b is described below

commit 78094566e4b6895a62de0800c7baef42c167d3c6
Author: StreamingFlames <[email protected]>
AuthorDate: Fri Jun 9 16:16:55 2023 +0800

    [HUDI-6182] Hive sync use state transient time to avoid losing partit… 
(#8745)
    
    Now Hudi would sync 2 timestamp to HMS for each sync, one is the max commit 
time T1(actually the max version id), another is the max completion time T2, 
the later can be used to trace the instants that has smaller version id than T1 
but greater completion timestamp than T2. Such instants are usually generated 
by multi writers.
---
 .../hudi/common/table/timeline/TimelineUtils.java  | 27 +++++++--
 .../hudi/common/table/TestTimelineUtils.java       | 39 ++++++------
 .../java/org/apache/hudi/sync/adb/AdbSyncTool.java |  2 +-
 .../java/org/apache/hudi/hive/HiveSyncTool.java    |  9 ++-
 .../org/apache/hudi/hive/HoodieHiveSyncClient.java | 24 +++++++-
 .../org/apache/hudi/hive/TestHiveSyncTool.java     | 70 ++++++++++++++++++++--
 .../hudi/sync/common/HoodieMetaSyncOperations.java |  9 +++
 .../apache/hudi/sync/common/HoodieSyncClient.java  |  8 +--
 8 files changed, 150 insertions(+), 38 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
index 434a93c32bc..8745b4ad683 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
@@ -42,6 +42,8 @@ import java.util.stream.Stream;
 
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
+import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.SAVEPOINT_ACTION;
 
@@ -221,20 +223,35 @@ public class TimelineUtils {
    *
    * @param metaClient                {@link HoodieTableMetaClient} instance.
    * @param exclusiveStartInstantTime Start instant time (exclusive).
+   * @param commitCompletionTime Last commit completion time synced
    * @return Hudi timeline.
    */
   public static HoodieTimeline getCommitsTimelineAfter(
-      HoodieTableMetaClient metaClient, String exclusiveStartInstantTime) {
-    HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+      HoodieTableMetaClient metaClient, String exclusiveStartInstantTime, 
String commitCompletionTime) {
+    HoodieDefaultTimeline activeTimeline = metaClient.getActiveTimeline();
+
     HoodieDefaultTimeline timeline =
         activeTimeline.isBeforeTimelineStarts(exclusiveStartInstantTime)
             ? metaClient.getArchivedTimeline(exclusiveStartInstantTime)
             .mergeTimeline(activeTimeline)
             : activeTimeline;
-    return timeline.getCommitsTimeline()
-        .findInstantsAfter(exclusiveStartInstantTime, Integer.MAX_VALUE);
+
+    // Get new instants with greater instant time than 
exclusiveStartInstantTime
+    Stream<HoodieInstant> newInstants = timeline.getCommitsTimeline()
+        .findInstantsAfter(exclusiveStartInstantTime, Integer.MAX_VALUE)
+        .getInstantsAsStream();
+
+    // Get 'hollow' instants that have less instant time than 
exclusiveStartInstantTime but with greater commit completion time
+    Stream<HoodieInstant> hollowInstants = timeline.getCommitsTimeline()
+        .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), 
LESSER_THAN, exclusiveStartInstantTime))
+        .filter(s -> 
HoodieTimeline.compareTimestamps(s.getStateTransitionTime(), GREATER_THAN, 
commitCompletionTime))
+        .getInstantsAsStream();
+
+    Stream<HoodieInstant> mergedInstantStream = Stream.concat(newInstants, 
hollowInstants).sorted();
+
+    return new HoodieDefaultTimeline(mergedInstantStream, timeline.details);
   }
-  
+
   /**
    * Returns the commit metadata of the given instant.
    *
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java 
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
index 0c09d91163e..a3b57b21f84 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
@@ -275,40 +275,43 @@ public class TestTimelineUtils extends 
HoodieCommonTestHarness {
     String startTs = "010";
     HoodieTableMetaClient mockMetaClient = prepareMetaClient(
         Arrays.asList(
-            new HoodieInstant(COMPLETED, COMMIT_ACTION, "009"),
-            new HoodieInstant(COMPLETED, COMMIT_ACTION, "010"),
-            new HoodieInstant(COMPLETED, COMMIT_ACTION, "011"),
-            new HoodieInstant(COMPLETED, COMMIT_ACTION, "012")),
-        Arrays.asList(new HoodieInstant(COMPLETED, COMMIT_ACTION, "001"),
-            new HoodieInstant(COMPLETED, COMMIT_ACTION, "002")),
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "009", "013"),
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "010", "010"),
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "011", "011"),
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "012", "012")),
+        Arrays.asList(new HoodieInstant(COMPLETED, COMMIT_ACTION, "001", 
"001"),
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "002", "002")),
         startTs
     );
+
+    // Commit 009 will be included in result because it has greater commit 
completion than 010
     verifyTimeline(
         Arrays.asList(
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "009"),
             new HoodieInstant(COMPLETED, COMMIT_ACTION, "011"),
             new HoodieInstant(COMPLETED, COMMIT_ACTION, "012")),
-        TimelineUtils.getCommitsTimelineAfter(mockMetaClient, startTs));
+        TimelineUtils.getCommitsTimelineAfter(mockMetaClient, startTs, 
startTs));
     verify(mockMetaClient, never()).getArchivedTimeline(any());
 
     // Should load both archived and active timeline
     startTs = "001";
     mockMetaClient = prepareMetaClient(
         Arrays.asList(
-            new HoodieInstant(COMPLETED, ROLLBACK_ACTION, "009"),
-            new HoodieInstant(COMPLETED, COMMIT_ACTION, "010"),
-            new HoodieInstant(COMPLETED, COMMIT_ACTION, "011"),
-            new HoodieInstant(COMPLETED, COMMIT_ACTION, "012")),
-        Arrays.asList(new HoodieInstant(COMPLETED, COMMIT_ACTION, "001"),
-            new HoodieInstant(COMPLETED, COMMIT_ACTION, "002")),
+            new HoodieInstant(COMPLETED, ROLLBACK_ACTION, "009", "009"),
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "010", "010"),
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "011", "011"),
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "012", "012")),
+        Arrays.asList(new HoodieInstant(COMPLETED, COMMIT_ACTION, "001", 
"001"),
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "002", "002")),
         startTs
     );
     verifyTimeline(
         Arrays.asList(
-            new HoodieInstant(COMPLETED, COMMIT_ACTION, "002"),
-            new HoodieInstant(COMPLETED, COMMIT_ACTION, "010"),
-            new HoodieInstant(COMPLETED, COMMIT_ACTION, "011"),
-            new HoodieInstant(COMPLETED, COMMIT_ACTION, "012")),
-        TimelineUtils.getCommitsTimelineAfter(mockMetaClient, startTs));
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "002", "002"),
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "010", "010"),
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "011", "011"),
+            new HoodieInstant(COMPLETED, COMMIT_ACTION, "012", "012")),
+        TimelineUtils.getCommitsTimelineAfter(mockMetaClient, startTs, 
startTs));
     verify(mockMetaClient, times(1)).getArchivedTimeline(any());
   }
 
diff --git 
a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncTool.java
 
b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncTool.java
index 5f4a36631a2..58a24548d04 100644
--- 
a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncTool.java
+++ 
b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncTool.java
@@ -194,7 +194,7 @@ public class AdbSyncTool extends HoodieSyncTool {
     if (config.getSplitStrings(META_SYNC_PARTITION_FIELDS).isEmpty()) {
       writtenPartitionsSince = new ArrayList<>();
     } else {
-      writtenPartitionsSince = 
syncClient.getWrittenPartitionsSince(lastCommitTimeSynced);
+      writtenPartitionsSince = 
syncClient.getWrittenPartitionsSince(lastCommitTimeSynced, Option.empty());
     }
     LOG.info("Scan partitions complete, partitionNum:{}", 
writtenPartitionsSince.size());
 
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
index c69f9c9b1d6..40f79ba19f1 100644
--- 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
+++ 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
@@ -267,8 +267,11 @@ public class HiveSyncTool extends HoodieSyncTool 
implements AutoCloseable {
     boolean syncIncremental = config.getBoolean(META_SYNC_INCREMENTAL);
     Option<String> lastCommitTimeSynced = (tableExists && syncIncremental)
         ? syncClient.getLastCommitTimeSynced(tableName) : Option.empty();
+    Option<String> lastCommitCompletionTimeSynced = (tableExists && 
syncIncremental)
+        ? syncClient.getLastCommitCompletionTimeSynced(tableName) : 
Option.empty();
     if (syncIncremental) {
-      LOG.info("Last commit time synced was found to be " + 
lastCommitTimeSynced.orElse("null"));
+      LOG.info(String.format("Last commit time synced was found to be %s, last 
commit completion time is found to be %s",
+          lastCommitTimeSynced.orElse("null"), 
lastCommitCompletionTimeSynced.orElse("null")));
     } else {
       LOG.info(
           "Executing a full partition sync operation since {} is set to 
false.",
@@ -287,12 +290,12 @@ public class HiveSyncTool extends HoodieSyncTool 
implements AutoCloseable {
           + ", file system: " + config.getHadoopFileSystem());
       partitionsChanged = syncAllPartitions(tableName);
     } else {
-      List<String> writtenPartitionsSince = 
syncClient.getWrittenPartitionsSince(lastCommitTimeSynced);
+      List<String> writtenPartitionsSince = 
syncClient.getWrittenPartitionsSince(lastCommitTimeSynced, 
lastCommitCompletionTimeSynced);
       LOG.info("Storage partitions scan complete. Found " + 
writtenPartitionsSince.size());
 
       // Sync the partitions if needed
       // find dropped partitions, if any, in the latest commit
-      Set<String> droppedPartitions = 
syncClient.getDroppedPartitionsSince(lastCommitTimeSynced);
+      Set<String> droppedPartitions = 
syncClient.getDroppedPartitionsSince(lastCommitTimeSynced, 
lastCommitCompletionTimeSynced);
       partitionsChanged = syncPartitions(tableName, writtenPartitionsSince, 
droppedPartitions);
     }
 
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
index 231a96e8f47..576abc3a908 100644
--- 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
+++ 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
@@ -21,6 +21,7 @@ package org.apache.hudi.hive;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.MapUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
@@ -281,6 +282,17 @@ public class HoodieHiveSyncClient extends HoodieSyncClient 
{
     }
   }
 
+  @Override
+  public Option<String> getLastCommitCompletionTimeSynced(String tableName) {
+    // Get the last commit completion time from the TBLproperties
+    try {
+      Table table = client.getTable(databaseName, tableName);
+      return 
Option.ofNullable(table.getParameters().getOrDefault(HOODIE_LAST_COMMIT_COMPLETION_TIME_SYNC,
 null));
+    } catch (Exception e) {
+      throw new HoodieHiveSyncException("Failed to get the last commit 
completion time synced from the table " + tableName, e);
+    }
+  }
+
   public Option<String> getLastReplicatedTime(String tableName) {
     // Get the last replicated time from the TBLproperties
     try {
@@ -340,8 +352,15 @@ public class HoodieHiveSyncClient extends HoodieSyncClient 
{
 
   @Override
   public void updateLastCommitTimeSynced(String tableName) {
-    // Set the last commit time from the TBLproperties
-    Option<String> lastCommitSynced = 
getActiveTimeline().lastInstant().map(HoodieInstant::getTimestamp);
+    // Set the last commit time and commit completion from the TBLproperties
+    HoodieTimeline activeTimeline = getActiveTimeline();
+    Option<String> lastCommitSynced = 
activeTimeline.lastInstant().map(HoodieInstant::getTimestamp);
+    Option<String> lastCommitCompletionSynced = activeTimeline
+        .getInstantsOrderedByStateTransitionTime()
+        .skip(activeTimeline.countInstants() - 1)
+        .findFirst()
+        .map(i -> Option.of(i.getStateTransitionTime()))
+        .orElse(Option.empty());
     if (lastCommitSynced.isPresent()) {
       try {
         Table table = client.getTable(databaseName, tableName);
@@ -351,6 +370,7 @@ public class HoodieHiveSyncClient extends HoodieSyncClient {
         SerDeInfo serdeInfo = sd.getSerdeInfo();
         serdeInfo.putToParameters(ConfigUtils.TABLE_SERDE_PATH, basePath);
         table.putToParameters(HOODIE_LAST_COMMIT_TIME_SYNC, 
lastCommitSynced.get());
+        table.putToParameters(HOODIE_LAST_COMMIT_COMPLETION_TIME_SYNC, 
lastCommitCompletionSynced.get());
         client.alter_table(databaseName, tableName, table);
       } catch (Exception e) {
         throw new HoodieHiveSyncException("Failed to get update last commit 
time synced to " + lastCommitSynced, e);
diff --git 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
index b61c7d8622b..da23d8876c7 100644
--- 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
+++ 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieSyncTableStrategy;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
 import org.apache.hudi.common.testutils.NetworkTestUtils;
 import org.apache.hudi.common.testutils.SchemaTestUtil;
 import org.apache.hudi.common.util.Option;
@@ -323,7 +324,7 @@ public class TestHiveSyncTool {
         + FSUtils.getPartitionPath(basePath, "2050/1/1").toString() + "'");
 
     hivePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
-    List<String> writtenPartitionsSince = 
hiveClient.getWrittenPartitionsSince(Option.empty());
+    List<String> writtenPartitionsSince = 
hiveClient.getWrittenPartitionsSince(Option.empty(), Option.empty());
     List<PartitionEvent> partitionEvents = 
hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, 
Collections.emptySet());
     assertEquals(1, partitionEvents.size(), "There should be only one 
partition event");
     assertEquals(PartitionEventType.UPDATE, 
partitionEvents.iterator().next().eventType,
@@ -469,6 +470,7 @@ public class TestHiveSyncTool {
     String sparkTableProperties = 
getSparkTableProperties(syncAsDataSourceTable, useSchemaFromCommitMetadata);
     assertEquals(
         "EXTERNAL\tTRUE\n"
+            + "last_commit_completion_time_sync\t" + 
getLastCommitCompletionTimeSynced() + "\n"
             + "last_commit_time_sync\t100\n"
             + sparkTableProperties
             + "tp_0\tp0\n"
@@ -506,8 +508,9 @@ public class TestHiveSyncTool {
     hiveDriver.getResults(results);
 
     assertEquals(
-        String.format("%slast_commit_time_sync\t%s\n%s",
+        
String.format("%slast_commit_completion_time_sync\t%s\nlast_commit_time_sync\t%s\n%s",
             createManagedTable ? StringUtils.EMPTY_STRING : "EXTERNAL\tTRUE\n",
+            getLastCommitCompletionTimeSynced(),
             instantTime,
             getSparkTableProperties(true, true)),
         String.format("%s\n", String.join("\n", results.subList(0, 
results.size() - 1))));
@@ -600,6 +603,7 @@ public class TestHiveSyncTool {
           results.subList(0, results.size() - 1));
       assertEquals(
           "EXTERNAL\tTRUE\n"
+              + "last_commit_completion_time_sync\t" + 
getLastCommitCompletionTimeSynced() + "\n"
               + "last_commit_time_sync\t101\n"
               + sparkTableProperties
               + "tp_0\tp0\n"
@@ -691,7 +695,7 @@ public class TestHiveSyncTool {
 
     // Lets do the sync
     reSyncHiveTable();
-    List<String> writtenPartitionsSince = 
hiveClient.getWrittenPartitionsSince(Option.of(commitTime1));
+    List<String> writtenPartitionsSince = 
hiveClient.getWrittenPartitionsSince(Option.of(commitTime1), 
Option.of(commitTime1));
     assertEquals(1, writtenPartitionsSince.size(), "We should have one 
partition written after 100 commit");
     List<org.apache.hudi.sync.common.model.Partition> hivePartitions = 
hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
     List<PartitionEvent> partitionEvents = 
hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, 
Collections.emptySet());
@@ -1073,7 +1077,7 @@ public class TestHiveSyncTool {
     HiveTestUtil.addCOWPartition("2010/01/02", true, true, commitTime2);
 
     reInitHiveSyncClient();
-    List<String> writtenPartitionsSince = 
hiveClient.getWrittenPartitionsSince(Option.of(instantTime));
+    List<String> writtenPartitionsSince = 
hiveClient.getWrittenPartitionsSince(Option.of(instantTime), 
Option.of(getLastCommitCompletionTimeSynced()));
     assertEquals(1, writtenPartitionsSince.size(), "We should have one 
partition written after 100 commit");
     List<org.apache.hudi.sync.common.model.Partition> hivePartitions = 
hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
     List<PartitionEvent> partitionEvents = 
hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, 
Collections.emptySet());
@@ -1103,7 +1107,7 @@ public class TestHiveSyncTool {
         "Table partitions should match the number of partitions we wrote");
     assertEquals(commitTime3, 
hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME).get(),
         "The last commit that was synced should be updated in the 
TBLPROPERTIES");
-    assertEquals(1, 
hiveClient.getWrittenPartitionsSince(Option.of(commitTime2)).size());
+    assertEquals(1, 
hiveClient.getWrittenPartitionsSince(Option.of(commitTime2), 
Option.of(getLastCommitCompletionTimeSynced())).size());
   }
 
   @ParameterizedTest
@@ -1505,6 +1509,56 @@ public class TestHiveSyncTool {
     assertEquals(commitTime1, 
hiveClient.getLastCommitTimeSynced(tableName).get());
   }
 
+  @ParameterizedTest
+  @MethodSource("syncModeAndEnablePushDown")
+  public void testHiveSyncWithMultiWriter(String syncMode, String 
enablePushDown) throws Exception {
+    String tableName = HiveTestUtil.TABLE_NAME + 
HiveSyncTool.SUFFIX_SNAPSHOT_TABLE;
+    hiveSyncProps.setProperty(HIVE_SYNC_MODE.key(), syncMode);
+    hiveSyncProps.setProperty(HIVE_SYNC_FILTER_PUSHDOWN_ENABLED.key(), 
enablePushDown);
+    hiveSyncProps.setProperty(META_SYNC_CONDITIONAL_SYNC.key(), "true");
+
+    String commitTime1 = HoodieInstantTimeGenerator.createNewInstantTime(1);
+    String commitTime2 = HoodieInstantTimeGenerator.createNewInstantTime(2);
+    String commitTime3 = HoodieInstantTimeGenerator.createNewInstantTime(3);
+    String commitTime4 = HoodieInstantTimeGenerator.createNewInstantTime(4);
+    String commitTime5 = HoodieInstantTimeGenerator.createNewInstantTime(5);
+
+    // Commit 4 and commit5 will be committed first
+    HiveTestUtil.createMORTable(commitTime4, commitTime5, 2, true, true);
+    reInitHiveSyncClient();
+    reSyncHiveTable();
+
+    assertTrue(hiveClient.tableExists(tableName));
+    assertEquals(commitTime5, 
hiveClient.getLastCommitTimeSynced(tableName).get());
+    assertEquals(getLastCommitCompletionTimeSynced(), 
hiveClient.getLastCommitCompletionTimeSynced(tableName).get());
+    assertEquals(2, hiveClient.getAllPartitions(tableName).size());
+
+    // Commit2 and commit3 committed after commit4 and commit5
+    HiveTestUtil.addMORPartitions(4, true, true, true, 
ZonedDateTime.now().plusDays(2), commitTime2, commitTime3);
+    reInitHiveSyncClient();
+    reSyncHiveTable();
+
+    String lastCommitCompletionTimeSynced = 
getLastCommitCompletionTimeSynced();
+    // LastCommitTimeSynced will still be commit3
+    assertEquals(commitTime5, 
hiveClient.getLastCommitTimeSynced(tableName).get());
+    assertEquals(lastCommitCompletionTimeSynced, 
hiveClient.getLastCommitCompletionTimeSynced(tableName).get());
+    // Partitions included in commit0 and commit1 will be synced to HMS 
correctly
+    assertEquals(4, hiveClient.getAllPartitions(tableName).size());
+
+    List<Partition> partitions = hiveClient.getAllPartitions(tableName);
+    // create two replace commits to delete current partitions, but do not 
sync in between
+    String partitiontoDelete = 
partitions.get(0).getValues().get(0).replace("-", "/");
+    // Add commit1 to the table that replace some partitions
+    HiveTestUtil.createReplaceCommit(commitTime1, partitiontoDelete, 
WriteOperationType.DELETE_PARTITION, true, true);
+
+    reInitHiveSyncClient();
+    reSyncHiveTable();
+
+    assertEquals(getLastCommitCompletionTimeSynced(), 
hiveClient.getLastCommitCompletionTimeSynced(tableName).get());
+    assertEquals(commitTime5, 
hiveClient.getLastCommitTimeSynced(tableName).get());
+    assertEquals(3, hiveClient.getAllPartitions(tableName).size());
+  }
+
   private void reSyncHiveTable() {
     hiveSyncTool.syncHoodieTable();
     // we need renew the hive client after tool.syncHoodieTable(), because it 
will close hive
@@ -1512,6 +1566,12 @@ public class TestHiveSyncTool {
     reInitHiveSyncClient();
   }
 
+  private String getLastCommitCompletionTimeSynced() {
+    return hiveClient.getActiveTimeline()
+        .getInstantsOrderedByStateTransitionTime()
+        .skip(hiveClient.getActiveTimeline().countInstants() - 
1).findFirst().get().getStateTransitionTime();
+  }
+
   private void reInitHiveSyncClient() {
     hiveSyncTool = new HiveSyncTool(hiveSyncProps, HiveTestUtil.getHiveConf());
     hiveClient = (HoodieHiveSyncClient) hiveSyncTool.syncClient;
diff --git 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java
 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java
index b98ac2b7940..87af1d16d75 100644
--- 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java
+++ 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java
@@ -33,6 +33,8 @@ public interface HoodieMetaSyncOperations {
 
   String HOODIE_LAST_COMMIT_TIME_SYNC = "last_commit_time_sync";
 
+  String HOODIE_LAST_COMMIT_COMPLETION_TIME_SYNC = 
"last_commit_completion_time_sync";
+
   /**
    * Create the table.
    *
@@ -179,6 +181,13 @@ public interface HoodieMetaSyncOperations {
     return Option.empty();
   }
 
+  /**
+   * Get the commit completion time of last sync
+   */
+  default Option<String> getLastCommitCompletionTimeSynced(String tableName) {
+    return Option.empty();
+  }
+
   /**
    * Update the timestamp of last sync.
    */
diff --git 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
index b6f1d0bdfa2..d6e57754847 100644
--- 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
+++ 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
@@ -88,9 +88,9 @@ public abstract class HoodieSyncClient implements 
HoodieMetaSyncOperations, Auto
    * If last sync time is not known then consider only active timeline.
    * Going through archive timeline is a costly operation, and it should be 
avoided unless some start time is given.
    */
-  public Set<String> getDroppedPartitionsSince(Option<String> 
lastCommitTimeSynced) {
+  public Set<String> getDroppedPartitionsSince(Option<String> 
lastCommitTimeSynced, Option<String> lastCommitCompletionTimeSynced) {
     HoodieTimeline timeline = lastCommitTimeSynced.isPresent()
-        ? TimelineUtils.getCommitsTimelineAfter(metaClient, 
lastCommitTimeSynced.get())
+        ? TimelineUtils.getCommitsTimelineAfter(metaClient, 
lastCommitTimeSynced.get(), lastCommitCompletionTimeSynced.get())
         : metaClient.getActiveTimeline();
     return new HashSet<>(TimelineUtils.getDroppedPartitions(timeline));
   }
@@ -126,7 +126,7 @@ public abstract class HoodieSyncClient implements 
HoodieMetaSyncOperations, Auto
         config.getBoolean(META_SYNC_ASSUME_DATE_PARTITION));
   }
 
-  public List<String> getWrittenPartitionsSince(Option<String> 
lastCommitTimeSynced) {
+  public List<String> getWrittenPartitionsSince(Option<String> 
lastCommitTimeSynced, Option<String> lastCommitCompletionTimeSynced) {
     if (!lastCommitTimeSynced.isPresent()) {
       LOG.info("Last commit time synced is not known, listing all partitions 
in "
           + config.getString(META_SYNC_BASE_PATH)
@@ -135,7 +135,7 @@ public abstract class HoodieSyncClient implements 
HoodieMetaSyncOperations, Auto
     } else {
       LOG.info("Last commit time synced is " + lastCommitTimeSynced.get() + ", 
Getting commits since then");
       return TimelineUtils.getWrittenPartitions(
-          TimelineUtils.getCommitsTimelineAfter(metaClient, 
lastCommitTimeSynced.get()));
+          TimelineUtils.getCommitsTimelineAfter(metaClient, 
lastCommitTimeSynced.get(), lastCommitCompletionTimeSynced.get()));
     }
   }
 

Reply via email to