nsivabalan commented on code in PR #7826:
URL: https://github.com/apache/hudi/pull/7826#discussion_r1139797072
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -707,20 +709,37 @@ protected List<String>
getInstantsToRollback(HoodieTableMetaClient metaClient, H
}
}).collect(Collectors.toList());
} else if (cleaningPolicy.isLazy()) {
- return inflightInstantsStream.filter(instant -> {
- try {
- return heartbeatClient.isHeartbeatExpired(instant.getTimestamp());
- } catch (IOException io) {
- throw new HoodieException("Failed to check heartbeat for instant " +
instant, io);
- }
- }).map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+ return getInstantsToRollbackForLazyCleanPolicy(metaClient,
inflightInstantsStream);
} else if (cleaningPolicy.isNever()) {
return Collections.emptyList();
} else {
throw new IllegalArgumentException("Invalid Failed Writes Cleaning
Policy " + config.getFailedWritesCleanPolicy());
}
}
+ @VisibleForTesting
+ public List<String>
getInstantsToRollbackForLazyCleanPolicy(HoodieTableMetaClient metaClient,
+
Stream<HoodieInstant> inflightInstantsStream) {
+ // Get expired instants, must store them into list before double-checking
+ List<String> expiredInstants = inflightInstantsStream.filter(instant -> {
+ try {
+ // An instant transformed from inflight to completed have no heartbeat
file and will be detected as expired instant here
+ return heartbeatClient.isHeartbeatExpired(instant.getTimestamp());
+ } catch (IOException io) {
+ throw new HoodieException("Failed to check heartbeat for instant " +
instant, io);
+ }
+ }).map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+
+ if (!expiredInstants.isEmpty()) {
Review Comment:
generally reloading entire timeline is a costly operation. I am trying to
see if there is any other elegant way to fix this.
Ideally the heart beat file should always be generated by the regular
writer. and hence only condition when the heart beat file goes missing is, the
writer completed the commit and deleted it. and if thats a safe assumption, if
we encounter heart beat file not found, we should ignore the commit of
interest.
Only catch here is. if a user switches from non lazy to lazy clean, those
may not have heart beats.
Can we introduce locking for entire getInstantsToRollback. and so that we
can guarantee that if a writer is concurrently completing a commit, we can
deduce that when compared to a failed commit in previous launch when writer had
eager failed policy.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -707,20 +709,37 @@ protected List<String>
getInstantsToRollback(HoodieTableMetaClient metaClient, H
}
}).collect(Collectors.toList());
} else if (cleaningPolicy.isLazy()) {
- return inflightInstantsStream.filter(instant -> {
- try {
- return heartbeatClient.isHeartbeatExpired(instant.getTimestamp());
- } catch (IOException io) {
- throw new HoodieException("Failed to check heartbeat for instant " +
instant, io);
- }
- }).map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+ return getInstantsToRollbackForLazyCleanPolicy(metaClient,
inflightInstantsStream);
} else if (cleaningPolicy.isNever()) {
return Collections.emptyList();
} else {
throw new IllegalArgumentException("Invalid Failed Writes Cleaning
Policy " + config.getFailedWritesCleanPolicy());
}
}
+ @VisibleForTesting
+ public List<String>
getInstantsToRollbackForLazyCleanPolicy(HoodieTableMetaClient metaClient,
+
Stream<HoodieInstant> inflightInstantsStream) {
+ // Get expired instants, must store them into list before double-checking
+ List<String> expiredInstants = inflightInstantsStream.filter(instant -> {
+ try {
+ // An instant transformed from inflight to completed have no heartbeat
file and will be detected as expired instant here
+ return heartbeatClient.isHeartbeatExpired(instant.getTimestamp());
+ } catch (IOException io) {
+ throw new HoodieException("Failed to check heartbeat for instant " +
instant, io);
+ }
+ }).map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+
+ if (!expiredInstants.isEmpty()) {
Review Comment:
or we can also try checking for existence of completed commit file in
".hoodie" after we get hold of list of commits to rollback. Those which seem to
have a completed instant, we can ignore them from the list.
let me know what do you think.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]