stream2000 commented on code in PR #7826:
URL: https://github.com/apache/hudi/pull/7826#discussion_r1108374607


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -707,20 +709,33 @@ protected List<String> 
getInstantsToRollback(HoodieTableMetaClient metaClient, H
         }
       }).collect(Collectors.toList());
     } else if (cleaningPolicy.isLazy()) {
-      return inflightInstantsStream.filter(instant -> {
-        try {
-          return heartbeatClient.isHeartbeatExpired(instant.getTimestamp());
-        } catch (IOException io) {
-          throw new HoodieException("Failed to check heartbeat for instant " + 
instant, io);
-        }
-      }).map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+      return getInstantsToRollbackForLazyCleanPolicy(metaClient, 
inflightInstantsStream);
     } else if (cleaningPolicy.isNever()) {
       return Collections.emptyList();
     } else {
       throw new IllegalArgumentException("Invalid Failed Writes Cleaning 
Policy " + config.getFailedWritesCleanPolicy());
     }
   }
 
+  @VisibleForTesting
+  public List<String> 
getInstantsToRollbackForLazyCleanPolicy(HoodieTableMetaClient metaClient,
+                                                              
Stream<HoodieInstant> inflightInstantsStream) {
+    // Get expired instants, must store them into list before double-checking
+    List<String> expiredInstants = inflightInstantsStream.filter(instant -> {
+      try {
+        // An instant transformed from inflight to completed have no heartbeat 
file and will be detected as expired instant here
+        return heartbeatClient.isHeartbeatExpired(instant.getTimestamp());
+      } catch (IOException io) {
+        throw new HoodieException("Failed to check heartbeat for instant " + 
instant, io);
+      }
+    }).map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+
+    // Double check whether the heartbeat-expired instant is an inflight 
instant
+    metaClient.reloadActiveTimeline();

Review Comment:
   > > Yes,we use a separate spark job to run table services and the instant in 
completed by the flink writer
   > 
   > But from the code, the compaction and clustering instants are excluded, 
can you elaborate a little more, what kind of table service the Spark offline 
job takes over?
   
   We use spark offline job  to run clean/archive, and lazy clean will rollback 
failed instants



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to