This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 78094566e4b [HUDI-6182] Hive sync use state transient time to avoid
losing partit… (#8745)
78094566e4b is described below
commit 78094566e4b6895a62de0800c7baef42c167d3c6
Author: StreamingFlames <[email protected]>
AuthorDate: Fri Jun 9 16:16:55 2023 +0800
[HUDI-6182] Hive sync use state transient time to avoid losing partit…
(#8745)
Now Hudi would sync 2 timestamp to HMS for each sync, one is the max commit
time T1(actually the max version id), another is the max completion time T2,
the later can be used to trace the instants that has smaller version id than T1
but greater completion timestamp than T2. Such instants are usually generated
by multi writers.
---
.../hudi/common/table/timeline/TimelineUtils.java | 27 +++++++--
.../hudi/common/table/TestTimelineUtils.java | 39 ++++++------
.../java/org/apache/hudi/sync/adb/AdbSyncTool.java | 2 +-
.../java/org/apache/hudi/hive/HiveSyncTool.java | 9 ++-
.../org/apache/hudi/hive/HoodieHiveSyncClient.java | 24 +++++++-
.../org/apache/hudi/hive/TestHiveSyncTool.java | 70 ++++++++++++++++++++--
.../hudi/sync/common/HoodieMetaSyncOperations.java | 9 +++
.../apache/hudi/sync/common/HoodieSyncClient.java | 8 +--
8 files changed, 150 insertions(+), 38 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
index 434a93c32bc..8745b4ad683 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
@@ -42,6 +42,8 @@ import java.util.stream.Stream;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
+import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.SAVEPOINT_ACTION;
@@ -221,20 +223,35 @@ public class TimelineUtils {
*
* @param metaClient {@link HoodieTableMetaClient} instance.
* @param exclusiveStartInstantTime Start instant time (exclusive).
+ * @param commitCompletionTime Last commit completion time synced
* @return Hudi timeline.
*/
public static HoodieTimeline getCommitsTimelineAfter(
- HoodieTableMetaClient metaClient, String exclusiveStartInstantTime) {
- HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+ HoodieTableMetaClient metaClient, String exclusiveStartInstantTime,
String commitCompletionTime) {
+ HoodieDefaultTimeline activeTimeline = metaClient.getActiveTimeline();
+
HoodieDefaultTimeline timeline =
activeTimeline.isBeforeTimelineStarts(exclusiveStartInstantTime)
? metaClient.getArchivedTimeline(exclusiveStartInstantTime)
.mergeTimeline(activeTimeline)
: activeTimeline;
- return timeline.getCommitsTimeline()
- .findInstantsAfter(exclusiveStartInstantTime, Integer.MAX_VALUE);
+
+ // Get new instants with greater instant time than
exclusiveStartInstantTime
+ Stream<HoodieInstant> newInstants = timeline.getCommitsTimeline()
+ .findInstantsAfter(exclusiveStartInstantTime, Integer.MAX_VALUE)
+ .getInstantsAsStream();
+
+ // Get 'hollow' instants that have less instant time than
exclusiveStartInstantTime but with greater commit completion time
+ Stream<HoodieInstant> hollowInstants = timeline.getCommitsTimeline()
+ .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(),
LESSER_THAN, exclusiveStartInstantTime))
+ .filter(s ->
HoodieTimeline.compareTimestamps(s.getStateTransitionTime(), GREATER_THAN,
commitCompletionTime))
+ .getInstantsAsStream();
+
+ Stream<HoodieInstant> mergedInstantStream = Stream.concat(newInstants,
hollowInstants).sorted();
+
+ return new HoodieDefaultTimeline(mergedInstantStream, timeline.details);
}
-
+
/**
* Returns the commit metadata of the given instant.
*
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
index 0c09d91163e..a3b57b21f84 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java
@@ -275,40 +275,43 @@ public class TestTimelineUtils extends
HoodieCommonTestHarness {
String startTs = "010";
HoodieTableMetaClient mockMetaClient = prepareMetaClient(
Arrays.asList(
- new HoodieInstant(COMPLETED, COMMIT_ACTION, "009"),
- new HoodieInstant(COMPLETED, COMMIT_ACTION, "010"),
- new HoodieInstant(COMPLETED, COMMIT_ACTION, "011"),
- new HoodieInstant(COMPLETED, COMMIT_ACTION, "012")),
- Arrays.asList(new HoodieInstant(COMPLETED, COMMIT_ACTION, "001"),
- new HoodieInstant(COMPLETED, COMMIT_ACTION, "002")),
+ new HoodieInstant(COMPLETED, COMMIT_ACTION, "009", "013"),
+ new HoodieInstant(COMPLETED, COMMIT_ACTION, "010", "010"),
+ new HoodieInstant(COMPLETED, COMMIT_ACTION, "011", "011"),
+ new HoodieInstant(COMPLETED, COMMIT_ACTION, "012", "012")),
+ Arrays.asList(new HoodieInstant(COMPLETED, COMMIT_ACTION, "001",
"001"),
+ new HoodieInstant(COMPLETED, COMMIT_ACTION, "002", "002")),
startTs
);
+
+ // Commit 009 will be included in result because it has greater commit
completion than 010
verifyTimeline(
Arrays.asList(
+ new HoodieInstant(COMPLETED, COMMIT_ACTION, "009"),
new HoodieInstant(COMPLETED, COMMIT_ACTION, "011"),
new HoodieInstant(COMPLETED, COMMIT_ACTION, "012")),
- TimelineUtils.getCommitsTimelineAfter(mockMetaClient, startTs));
+ TimelineUtils.getCommitsTimelineAfter(mockMetaClient, startTs,
startTs));
verify(mockMetaClient, never()).getArchivedTimeline(any());
// Should load both archived and active timeline
startTs = "001";
mockMetaClient = prepareMetaClient(
Arrays.asList(
- new HoodieInstant(COMPLETED, ROLLBACK_ACTION, "009"),
- new HoodieInstant(COMPLETED, COMMIT_ACTION, "010"),
- new HoodieInstant(COMPLETED, COMMIT_ACTION, "011"),
- new HoodieInstant(COMPLETED, COMMIT_ACTION, "012")),
- Arrays.asList(new HoodieInstant(COMPLETED, COMMIT_ACTION, "001"),
- new HoodieInstant(COMPLETED, COMMIT_ACTION, "002")),
+ new HoodieInstant(COMPLETED, ROLLBACK_ACTION, "009", "009"),
+ new HoodieInstant(COMPLETED, COMMIT_ACTION, "010", "010"),
+ new HoodieInstant(COMPLETED, COMMIT_ACTION, "011", "011"),
+ new HoodieInstant(COMPLETED, COMMIT_ACTION, "012", "012")),
+ Arrays.asList(new HoodieInstant(COMPLETED, COMMIT_ACTION, "001",
"001"),
+ new HoodieInstant(COMPLETED, COMMIT_ACTION, "002", "002")),
startTs
);
verifyTimeline(
Arrays.asList(
- new HoodieInstant(COMPLETED, COMMIT_ACTION, "002"),
- new HoodieInstant(COMPLETED, COMMIT_ACTION, "010"),
- new HoodieInstant(COMPLETED, COMMIT_ACTION, "011"),
- new HoodieInstant(COMPLETED, COMMIT_ACTION, "012")),
- TimelineUtils.getCommitsTimelineAfter(mockMetaClient, startTs));
+ new HoodieInstant(COMPLETED, COMMIT_ACTION, "002", "002"),
+ new HoodieInstant(COMPLETED, COMMIT_ACTION, "010", "010"),
+ new HoodieInstant(COMPLETED, COMMIT_ACTION, "011", "011"),
+ new HoodieInstant(COMPLETED, COMMIT_ACTION, "012", "012")),
+ TimelineUtils.getCommitsTimelineAfter(mockMetaClient, startTs,
startTs));
verify(mockMetaClient, times(1)).getArchivedTimeline(any());
}
diff --git
a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncTool.java
b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncTool.java
index 5f4a36631a2..58a24548d04 100644
---
a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncTool.java
+++
b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncTool.java
@@ -194,7 +194,7 @@ public class AdbSyncTool extends HoodieSyncTool {
if (config.getSplitStrings(META_SYNC_PARTITION_FIELDS).isEmpty()) {
writtenPartitionsSince = new ArrayList<>();
} else {
- writtenPartitionsSince =
syncClient.getWrittenPartitionsSince(lastCommitTimeSynced);
+ writtenPartitionsSince =
syncClient.getWrittenPartitionsSince(lastCommitTimeSynced, Option.empty());
}
LOG.info("Scan partitions complete, partitionNum:{}",
writtenPartitionsSince.size());
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
index c69f9c9b1d6..40f79ba19f1 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
@@ -267,8 +267,11 @@ public class HiveSyncTool extends HoodieSyncTool
implements AutoCloseable {
boolean syncIncremental = config.getBoolean(META_SYNC_INCREMENTAL);
Option<String> lastCommitTimeSynced = (tableExists && syncIncremental)
? syncClient.getLastCommitTimeSynced(tableName) : Option.empty();
+ Option<String> lastCommitCompletionTimeSynced = (tableExists &&
syncIncremental)
+ ? syncClient.getLastCommitCompletionTimeSynced(tableName) :
Option.empty();
if (syncIncremental) {
- LOG.info("Last commit time synced was found to be " +
lastCommitTimeSynced.orElse("null"));
+ LOG.info(String.format("Last commit time synced was found to be %s, last
commit completion time is found to be %s",
+ lastCommitTimeSynced.orElse("null"),
lastCommitCompletionTimeSynced.orElse("null")));
} else {
LOG.info(
"Executing a full partition sync operation since {} is set to
false.",
@@ -287,12 +290,12 @@ public class HiveSyncTool extends HoodieSyncTool
implements AutoCloseable {
+ ", file system: " + config.getHadoopFileSystem());
partitionsChanged = syncAllPartitions(tableName);
} else {
- List<String> writtenPartitionsSince =
syncClient.getWrittenPartitionsSince(lastCommitTimeSynced);
+ List<String> writtenPartitionsSince =
syncClient.getWrittenPartitionsSince(lastCommitTimeSynced,
lastCommitCompletionTimeSynced);
LOG.info("Storage partitions scan complete. Found " +
writtenPartitionsSince.size());
// Sync the partitions if needed
// find dropped partitions, if any, in the latest commit
- Set<String> droppedPartitions =
syncClient.getDroppedPartitionsSince(lastCommitTimeSynced);
+ Set<String> droppedPartitions =
syncClient.getDroppedPartitionsSince(lastCommitTimeSynced,
lastCommitCompletionTimeSynced);
partitionsChanged = syncPartitions(tableName, writtenPartitionsSince,
droppedPartitions);
}
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
index 231a96e8f47..576abc3a908 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
@@ -21,6 +21,7 @@ package org.apache.hudi.hive;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.MapUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
@@ -281,6 +282,17 @@ public class HoodieHiveSyncClient extends HoodieSyncClient
{
}
}
+ @Override
+ public Option<String> getLastCommitCompletionTimeSynced(String tableName) {
+ // Get the last commit completion time from the TBLproperties
+ try {
+ Table table = client.getTable(databaseName, tableName);
+ return
Option.ofNullable(table.getParameters().getOrDefault(HOODIE_LAST_COMMIT_COMPLETION_TIME_SYNC,
null));
+ } catch (Exception e) {
+ throw new HoodieHiveSyncException("Failed to get the last commit
completion time synced from the table " + tableName, e);
+ }
+ }
+
public Option<String> getLastReplicatedTime(String tableName) {
// Get the last replicated time from the TBLproperties
try {
@@ -340,8 +352,15 @@ public class HoodieHiveSyncClient extends HoodieSyncClient
{
@Override
public void updateLastCommitTimeSynced(String tableName) {
- // Set the last commit time from the TBLproperties
- Option<String> lastCommitSynced =
getActiveTimeline().lastInstant().map(HoodieInstant::getTimestamp);
+ // Set the last commit time and commit completion from the TBLproperties
+ HoodieTimeline activeTimeline = getActiveTimeline();
+ Option<String> lastCommitSynced =
activeTimeline.lastInstant().map(HoodieInstant::getTimestamp);
+ Option<String> lastCommitCompletionSynced = activeTimeline
+ .getInstantsOrderedByStateTransitionTime()
+ .skip(activeTimeline.countInstants() - 1)
+ .findFirst()
+ .map(i -> Option.of(i.getStateTransitionTime()))
+ .orElse(Option.empty());
if (lastCommitSynced.isPresent()) {
try {
Table table = client.getTable(databaseName, tableName);
@@ -351,6 +370,7 @@ public class HoodieHiveSyncClient extends HoodieSyncClient {
SerDeInfo serdeInfo = sd.getSerdeInfo();
serdeInfo.putToParameters(ConfigUtils.TABLE_SERDE_PATH, basePath);
table.putToParameters(HOODIE_LAST_COMMIT_TIME_SYNC,
lastCommitSynced.get());
+ table.putToParameters(HOODIE_LAST_COMMIT_COMPLETION_TIME_SYNC,
lastCommitCompletionSynced.get());
client.alter_table(databaseName, tableName, table);
} catch (Exception e) {
throw new HoodieHiveSyncException("Failed to get update last commit
time synced to " + lastCommitSynced, e);
diff --git
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
index b61c7d8622b..da23d8876c7 100644
---
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
+++
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieSyncTableStrategy;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
import org.apache.hudi.common.testutils.NetworkTestUtils;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.util.Option;
@@ -323,7 +324,7 @@ public class TestHiveSyncTool {
+ FSUtils.getPartitionPath(basePath, "2050/1/1").toString() + "'");
hivePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
- List<String> writtenPartitionsSince =
hiveClient.getWrittenPartitionsSince(Option.empty());
+ List<String> writtenPartitionsSince =
hiveClient.getWrittenPartitionsSince(Option.empty(), Option.empty());
List<PartitionEvent> partitionEvents =
hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince,
Collections.emptySet());
assertEquals(1, partitionEvents.size(), "There should be only one
partition event");
assertEquals(PartitionEventType.UPDATE,
partitionEvents.iterator().next().eventType,
@@ -469,6 +470,7 @@ public class TestHiveSyncTool {
String sparkTableProperties =
getSparkTableProperties(syncAsDataSourceTable, useSchemaFromCommitMetadata);
assertEquals(
"EXTERNAL\tTRUE\n"
+ + "last_commit_completion_time_sync\t" +
getLastCommitCompletionTimeSynced() + "\n"
+ "last_commit_time_sync\t100\n"
+ sparkTableProperties
+ "tp_0\tp0\n"
@@ -506,8 +508,9 @@ public class TestHiveSyncTool {
hiveDriver.getResults(results);
assertEquals(
- String.format("%slast_commit_time_sync\t%s\n%s",
+
String.format("%slast_commit_completion_time_sync\t%s\nlast_commit_time_sync\t%s\n%s",
createManagedTable ? StringUtils.EMPTY_STRING : "EXTERNAL\tTRUE\n",
+ getLastCommitCompletionTimeSynced(),
instantTime,
getSparkTableProperties(true, true)),
String.format("%s\n", String.join("\n", results.subList(0,
results.size() - 1))));
@@ -600,6 +603,7 @@ public class TestHiveSyncTool {
results.subList(0, results.size() - 1));
assertEquals(
"EXTERNAL\tTRUE\n"
+ + "last_commit_completion_time_sync\t" +
getLastCommitCompletionTimeSynced() + "\n"
+ "last_commit_time_sync\t101\n"
+ sparkTableProperties
+ "tp_0\tp0\n"
@@ -691,7 +695,7 @@ public class TestHiveSyncTool {
// Lets do the sync
reSyncHiveTable();
- List<String> writtenPartitionsSince =
hiveClient.getWrittenPartitionsSince(Option.of(commitTime1));
+ List<String> writtenPartitionsSince =
hiveClient.getWrittenPartitionsSince(Option.of(commitTime1),
Option.of(commitTime1));
assertEquals(1, writtenPartitionsSince.size(), "We should have one
partition written after 100 commit");
List<org.apache.hudi.sync.common.model.Partition> hivePartitions =
hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
List<PartitionEvent> partitionEvents =
hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince,
Collections.emptySet());
@@ -1073,7 +1077,7 @@ public class TestHiveSyncTool {
HiveTestUtil.addCOWPartition("2010/01/02", true, true, commitTime2);
reInitHiveSyncClient();
- List<String> writtenPartitionsSince =
hiveClient.getWrittenPartitionsSince(Option.of(instantTime));
+ List<String> writtenPartitionsSince =
hiveClient.getWrittenPartitionsSince(Option.of(instantTime),
Option.of(getLastCommitCompletionTimeSynced()));
assertEquals(1, writtenPartitionsSince.size(), "We should have one
partition written after 100 commit");
List<org.apache.hudi.sync.common.model.Partition> hivePartitions =
hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
List<PartitionEvent> partitionEvents =
hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince,
Collections.emptySet());
@@ -1103,7 +1107,7 @@ public class TestHiveSyncTool {
"Table partitions should match the number of partitions we wrote");
assertEquals(commitTime3,
hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME).get(),
"The last commit that was synced should be updated in the
TBLPROPERTIES");
- assertEquals(1,
hiveClient.getWrittenPartitionsSince(Option.of(commitTime2)).size());
+ assertEquals(1,
hiveClient.getWrittenPartitionsSince(Option.of(commitTime2),
Option.of(getLastCommitCompletionTimeSynced())).size());
}
@ParameterizedTest
@@ -1505,6 +1509,56 @@ public class TestHiveSyncTool {
assertEquals(commitTime1,
hiveClient.getLastCommitTimeSynced(tableName).get());
}
+ @ParameterizedTest
+ @MethodSource("syncModeAndEnablePushDown")
+ public void testHiveSyncWithMultiWriter(String syncMode, String
enablePushDown) throws Exception {
+ String tableName = HiveTestUtil.TABLE_NAME +
HiveSyncTool.SUFFIX_SNAPSHOT_TABLE;
+ hiveSyncProps.setProperty(HIVE_SYNC_MODE.key(), syncMode);
+ hiveSyncProps.setProperty(HIVE_SYNC_FILTER_PUSHDOWN_ENABLED.key(),
enablePushDown);
+ hiveSyncProps.setProperty(META_SYNC_CONDITIONAL_SYNC.key(), "true");
+
+ String commitTime1 = HoodieInstantTimeGenerator.createNewInstantTime(1);
+ String commitTime2 = HoodieInstantTimeGenerator.createNewInstantTime(2);
+ String commitTime3 = HoodieInstantTimeGenerator.createNewInstantTime(3);
+ String commitTime4 = HoodieInstantTimeGenerator.createNewInstantTime(4);
+ String commitTime5 = HoodieInstantTimeGenerator.createNewInstantTime(5);
+
+ // Commit 4 and commit5 will be committed first
+ HiveTestUtil.createMORTable(commitTime4, commitTime5, 2, true, true);
+ reInitHiveSyncClient();
+ reSyncHiveTable();
+
+ assertTrue(hiveClient.tableExists(tableName));
+ assertEquals(commitTime5,
hiveClient.getLastCommitTimeSynced(tableName).get());
+ assertEquals(getLastCommitCompletionTimeSynced(),
hiveClient.getLastCommitCompletionTimeSynced(tableName).get());
+ assertEquals(2, hiveClient.getAllPartitions(tableName).size());
+
+ // Commit2 and commit3 committed after commit4 and commit5
+ HiveTestUtil.addMORPartitions(4, true, true, true,
ZonedDateTime.now().plusDays(2), commitTime2, commitTime3);
+ reInitHiveSyncClient();
+ reSyncHiveTable();
+
+ String lastCommitCompletionTimeSynced =
getLastCommitCompletionTimeSynced();
+ // LastCommitTimeSynced will still be commit3
+ assertEquals(commitTime5,
hiveClient.getLastCommitTimeSynced(tableName).get());
+ assertEquals(lastCommitCompletionTimeSynced,
hiveClient.getLastCommitCompletionTimeSynced(tableName).get());
+ // Partitions included in commit0 and commit1 will be synced to HMS
correctly
+ assertEquals(4, hiveClient.getAllPartitions(tableName).size());
+
+ List<Partition> partitions = hiveClient.getAllPartitions(tableName);
+ // create two replace commits to delete current partitions, but do not
sync in between
+ String partitiontoDelete =
partitions.get(0).getValues().get(0).replace("-", "/");
+ // Add commit1 to the table that replace some partitions
+ HiveTestUtil.createReplaceCommit(commitTime1, partitiontoDelete,
WriteOperationType.DELETE_PARTITION, true, true);
+
+ reInitHiveSyncClient();
+ reSyncHiveTable();
+
+ assertEquals(getLastCommitCompletionTimeSynced(),
hiveClient.getLastCommitCompletionTimeSynced(tableName).get());
+ assertEquals(commitTime5,
hiveClient.getLastCommitTimeSynced(tableName).get());
+ assertEquals(3, hiveClient.getAllPartitions(tableName).size());
+ }
+
private void reSyncHiveTable() {
hiveSyncTool.syncHoodieTable();
// we need renew the hive client after tool.syncHoodieTable(), because it
will close hive
@@ -1512,6 +1566,12 @@ public class TestHiveSyncTool {
reInitHiveSyncClient();
}
+ private String getLastCommitCompletionTimeSynced() {
+ return hiveClient.getActiveTimeline()
+ .getInstantsOrderedByStateTransitionTime()
+ .skip(hiveClient.getActiveTimeline().countInstants() -
1).findFirst().get().getStateTransitionTime();
+ }
+
private void reInitHiveSyncClient() {
hiveSyncTool = new HiveSyncTool(hiveSyncProps, HiveTestUtil.getHiveConf());
hiveClient = (HoodieHiveSyncClient) hiveSyncTool.syncClient;
diff --git
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java
index b98ac2b7940..87af1d16d75 100644
---
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java
+++
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java
@@ -33,6 +33,8 @@ public interface HoodieMetaSyncOperations {
String HOODIE_LAST_COMMIT_TIME_SYNC = "last_commit_time_sync";
+ String HOODIE_LAST_COMMIT_COMPLETION_TIME_SYNC =
"last_commit_completion_time_sync";
+
/**
* Create the table.
*
@@ -179,6 +181,13 @@ public interface HoodieMetaSyncOperations {
return Option.empty();
}
+ /**
+ * Get the commit completion time of last sync
+ */
+ default Option<String> getLastCommitCompletionTimeSynced(String tableName) {
+ return Option.empty();
+ }
+
/**
* Update the timestamp of last sync.
*/
diff --git
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
index b6f1d0bdfa2..d6e57754847 100644
---
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
+++
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
@@ -88,9 +88,9 @@ public abstract class HoodieSyncClient implements
HoodieMetaSyncOperations, Auto
* If last sync time is not known then consider only active timeline.
* Going through archive timeline is a costly operation, and it should be
avoided unless some start time is given.
*/
- public Set<String> getDroppedPartitionsSince(Option<String>
lastCommitTimeSynced) {
+ public Set<String> getDroppedPartitionsSince(Option<String>
lastCommitTimeSynced, Option<String> lastCommitCompletionTimeSynced) {
HoodieTimeline timeline = lastCommitTimeSynced.isPresent()
- ? TimelineUtils.getCommitsTimelineAfter(metaClient,
lastCommitTimeSynced.get())
+ ? TimelineUtils.getCommitsTimelineAfter(metaClient,
lastCommitTimeSynced.get(), lastCommitCompletionTimeSynced.get())
: metaClient.getActiveTimeline();
return new HashSet<>(TimelineUtils.getDroppedPartitions(timeline));
}
@@ -126,7 +126,7 @@ public abstract class HoodieSyncClient implements
HoodieMetaSyncOperations, Auto
config.getBoolean(META_SYNC_ASSUME_DATE_PARTITION));
}
- public List<String> getWrittenPartitionsSince(Option<String>
lastCommitTimeSynced) {
+ public List<String> getWrittenPartitionsSince(Option<String>
lastCommitTimeSynced, Option<String> lastCommitCompletionTimeSynced) {
if (!lastCommitTimeSynced.isPresent()) {
LOG.info("Last commit time synced is not known, listing all partitions
in "
+ config.getString(META_SYNC_BASE_PATH)
@@ -135,7 +135,7 @@ public abstract class HoodieSyncClient implements
HoodieMetaSyncOperations, Auto
} else {
LOG.info("Last commit time synced is " + lastCommitTimeSynced.get() + ",
Getting commits since then");
return TimelineUtils.getWrittenPartitions(
- TimelineUtils.getCommitsTimelineAfter(metaClient,
lastCommitTimeSynced.get()));
+ TimelineUtils.getCommitsTimelineAfter(metaClient,
lastCommitTimeSynced.get(), lastCommitCompletionTimeSynced.get()));
}
}