yihua commented on code in PR #10467:
URL: https://github.com/apache/hudi/pull/10467#discussion_r1446534664
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -457,7 +457,7 @@ public Pair<Option<String>, JavaRDD<WriteStatus>>
syncOnce() throws IOException
private Option<String>
getLastPendingClusteringInstant(Option<HoodieTimeline> commitTimelineOpt) {
if (commitTimelineOpt.isPresent()) {
- Option<HoodieInstant> pendingClusteringInstant =
commitTimelineOpt.get().filterPendingReplaceTimeline().lastInstant();
+ Option<HoodieInstant> pendingClusteringInstant =
commitTimelineOpt.get().getLastPendingClusterCommit();
Review Comment:
Should we revisit all usage of `filterPendingReplaceTimeline` in a follow-up
and see if some should be fixed by differentiating clustering vs non-clustering
replace commits?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java:
##########
@@ -508,6 +509,24 @@ public Option<HoodieInstant> getLastClusterCommit() {
}).findFirst());
}
+ @Override
+ public Option<HoodieInstant> getLastPendingClusterCommit() {
+ return Option.fromJavaOptional(getCommitsTimeline().filter(s ->
s.getAction().equalsIgnoreCase(HoodieTimeline.REPLACE_COMMIT_ACTION))
+ .getReverseOrderedInstants()
+ .filter(i -> {
+ try {
+ if (!i.isCompleted()) {
+ HoodieCommitMetadata metadata =
TimelineUtils.getCommitMetadata(i, this);
+ return
metadata.getOperationType().equals(WriteOperationType.CLUSTER);
+ } else {
+ return false;
+ }
+ } catch (IOException e) {
+ return false;
Review Comment:
nit: logging the exception at warning level here before returning?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]