yihua commented on code in PR #10467:
URL: https://github.com/apache/hudi/pull/10467#discussion_r1446534664


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -457,7 +457,7 @@ public Pair<Option<String>, JavaRDD<WriteStatus>> 
syncOnce() throws IOException
 
   private Option<String> 
getLastPendingClusteringInstant(Option<HoodieTimeline> commitTimelineOpt) {
     if (commitTimelineOpt.isPresent()) {
-      Option<HoodieInstant> pendingClusteringInstant = 
commitTimelineOpt.get().filterPendingReplaceTimeline().lastInstant();
+      Option<HoodieInstant> pendingClusteringInstant = 
commitTimelineOpt.get().getLastPendingClusterCommit();

Review Comment:
   Should we revisit all usage of `filterPendingReplaceTimeline` in a follow-up 
and see if some should be fixed by differentiating clustering vs non-clustering 
replace commits?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java:
##########
@@ -508,6 +509,24 @@ public Option<HoodieInstant> getLastClusterCommit() {
         }).findFirst());
   }
 
+  @Override
+  public Option<HoodieInstant> getLastPendingClusterCommit() {
+    return  Option.fromJavaOptional(getCommitsTimeline().filter(s -> 
s.getAction().equalsIgnoreCase(HoodieTimeline.REPLACE_COMMIT_ACTION))
+        .getReverseOrderedInstants()
+        .filter(i -> {
+          try {
+            if (!i.isCompleted()) {
+              HoodieCommitMetadata metadata = 
TimelineUtils.getCommitMetadata(i, this);
+              return 
metadata.getOperationType().equals(WriteOperationType.CLUSTER);
+            } else {
+              return false;
+            }
+          } catch (IOException e) {
+            return false;

Review Comment:
   nit: logging the exception at warning level here before returning?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to