danny0405 commented on code in PR #8745:
URL: https://github.com/apache/hudi/pull/8745#discussion_r1220821891
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java:
##########
@@ -223,17 +225,24 @@ public static HoodieDefaultTimeline
getTimeline(HoodieTableMetaClient metaClient
* @param exclusiveStartInstantTime Start instant time (exclusive).
* @return Hudi timeline.
*/
- public static HoodieTimeline getCommitsTimelineAfter(
+ public static HoodieDefaultTimeline getCommitsTimelineAfter(
HoodieTableMetaClient metaClient, String exclusiveStartInstantTime) {
Review Comment:
Return `HoodieTimeline` instead.
##########
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java:
##########
@@ -341,8 +353,15 @@ public void close() {
@Override
public void updateLastCommitTimeSynced(String tableName) {
// Set the last commit time from the TBLproperties
- Option<String> lastCommitSynced =
getActiveTimeline().lastInstant().map(HoodieInstant::getTimestamp);
- if (lastCommitSynced.isPresent()) {
+ HoodieTimeline activeTimeline = getActiveTimeline();
+ Option<String> lastCommitSynced =
activeTimeline.lastInstant().map(HoodieInstant::getTimestamp);
+ Option<String> lastCommitCompletionSynced = getActiveTimeline()
+ .getInstantsOrderedByStateTransitionTime()
+ .skip(activeTimeline.countInstants() - 1)
+ .findFirst()
+ .map(i -> Option.of(i.getStateTransitionTime()))
+ .orElse(Option.empty());
+ if (lastCommitSynced.isPresent() &&
lastCommitCompletionSynced.isPresent()) {
try {
Review Comment:
A decision for `lastCommitSynced.isPresent()` should be enough.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java:
##########
@@ -223,17 +225,24 @@ public static HoodieDefaultTimeline
getTimeline(HoodieTableMetaClient metaClient
* @param exclusiveStartInstantTime Start instant time (exclusive).
* @return Hudi timeline.
*/
- public static HoodieTimeline getCommitsTimelineAfter(
+ public static HoodieDefaultTimeline getCommitsTimelineAfter(
HoodieTableMetaClient metaClient, String exclusiveStartInstantTime) {
HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
HoodieDefaultTimeline timeline =
activeTimeline.isBeforeTimelineStarts(exclusiveStartInstantTime)
? metaClient.getArchivedTimeline(exclusiveStartInstantTime)
.mergeTimeline(activeTimeline)
: activeTimeline;
- return timeline.getCommitsTimeline()
+ return (HoodieDefaultTimeline) timeline.getCommitsTimeline()
.findInstantsAfter(exclusiveStartInstantTime, Integer.MAX_VALUE);
}
+
+ public static HoodieDefaultTimeline getHollowInstantsTimeline(
+ HoodieTableMetaClient metaClient, String exclusiveStartInstantTime,
String commitCompletionTime) {
Review Comment:
Return `HoodieTimeline` instead.
##########
hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java:
##########
@@ -126,16 +131,20 @@ public List<String> getAllPartitionPathsOnStorage() {
config.getBoolean(META_SYNC_ASSUME_DATE_PARTITION));
}
- public List<String> getWrittenPartitionsSince(Option<String>
lastCommitTimeSynced) {
+ public List<String> getWrittenPartitionsSince(Option<String>
lastCommitTimeSynced, Option<String> lastCommitCompletionTimeSynced) {
if (!lastCommitTimeSynced.isPresent()) {
LOG.info("Last commit time synced is not known, listing all partitions
in "
+ config.getString(META_SYNC_BASE_PATH)
+ ",FS :" + config.getHadoopFileSystem());
return getAllPartitionPathsOnStorage();
} else {
LOG.info("Last commit time synced is " + lastCommitTimeSynced.get() + ",
Getting commits since then");
- return TimelineUtils.getWrittenPartitions(
- TimelineUtils.getCommitsTimelineAfter(metaClient,
lastCommitTimeSynced.get()));
+ HoodieDefaultTimeline writeTimeline =
TimelineUtils.getCommitsTimelineAfter(metaClient, lastCommitTimeSynced.get());
+ if (lastCommitCompletionTimeSynced.isPresent()) {
+ writeTimeline =
writeTimeline.mergeTimeline(TimelineUtils.getHollowInstantsTimeline(
+ metaClient, lastCommitTimeSynced.get(),
lastCommitCompletionTimeSynced.get()));
+ }
Review Comment:
Ditto.
##########
hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java:
##########
@@ -88,10 +89,14 @@ public boolean isBootstrap() {
* If last sync time is not known then consider only active timeline.
* Going through archive timeline is a costly operation, and it should be
avoided unless some start time is given.
*/
- public Set<String> getDroppedPartitionsSince(Option<String>
lastCommitTimeSynced) {
- HoodieTimeline timeline = lastCommitTimeSynced.isPresent()
+ public Set<String> getDroppedPartitionsSince(Option<String>
lastCommitTimeSynced, Option<String> lastCommitCompletionTimeSynced) {
+ HoodieDefaultTimeline timeline = lastCommitTimeSynced.isPresent()
? TimelineUtils.getCommitsTimelineAfter(metaClient,
lastCommitTimeSynced.get())
: metaClient.getActiveTimeline();
+ timeline = lastCommitCompletionTimeSynced.isPresent()
+ ? timeline.mergeTimeline(TimelineUtils.getHollowInstantsTimeline(
+ metaClient, lastCommitTimeSynced.get(),
lastCommitCompletionTimeSynced.get()))
Review Comment:
Can we just fix the logic in `TimelineUtils.getCommitsTimelineAfter(`
instead?
##########
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java:
##########
@@ -341,8 +353,15 @@ public void close() {
@Override
public void updateLastCommitTimeSynced(String tableName) {
// Set the last commit time from the TBLproperties
- Option<String> lastCommitSynced =
getActiveTimeline().lastInstant().map(HoodieInstant::getTimestamp);
- if (lastCommitSynced.isPresent()) {
+ HoodieTimeline activeTimeline = getActiveTimeline();
+ Option<String> lastCommitSynced =
activeTimeline.lastInstant().map(HoodieInstant::getTimestamp);
+ Option<String> lastCommitCompletionSynced = getActiveTimeline()
+ .getInstantsOrderedByStateTransitionTime()
+ .skip(activeTimeline.countInstants() - 1)
Review Comment:
Does `.lastInstant()` work here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]