danny0405 commented on code in PR #13285:
URL: https://github.com/apache/hudi/pull/13285#discussion_r2086661912


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java:
##########
@@ -501,44 +503,22 @@ private void handleWriteMetaEvent(WriteMetadataEvent 
event) {
     addEventToBuffer(event);
   }
 
-  /**
-   * The coordinator reuses the instant if there is no data for this round of 
checkpoint,
-   * sends the commit ack events to unblock the flushing.
-   */
-  private void sendCommitAckEvents(long checkpointId) {
-    CompletableFuture<?>[] futures = 
Arrays.stream(this.gateways).filter(Objects::nonNull)
-        .map(gw -> gw.sendEvent(CommitAckEvent.getInstance(checkpointId)))
-        .toArray(CompletableFuture<?>[]::new);
-    CompletableFuture.allOf(futures).whenComplete((resp, error) -> {
-      if (!sendToFinishedTasks(error)) {
-        throw new HoodieException("Error while waiting for the commit ack 
events to finish sending", error);
-      }
-    });
-  }
-
-  /**
-   * Decides whether the given exception is caused by sending events to 
FINISHED tasks.
-   *
-   * <p>Ugly impl: the exception may change in the future.
-   */
-  private static boolean sendToFinishedTasks(Throwable throwable) {
-    return throwable.getCause() instanceof TaskNotRunningException
-        || throwable.getCause().getMessage().contains("running");
-  }
-
   /**
    * Commits the instant.
    */
-  private boolean commitInstant(String instant) {
-    return commitInstant(instant, -1);
+  private boolean commitInstants(long checkpointId) {
+    // use < instead of <= because the write metadata event sends the last 
known checkpoint id which is smaller than the current one.
+    List<Boolean> result = this.eventBuffers.entrySet().stream().filter(entry 
-> entry.getKey() < checkpointId)
+        .map(entry -> commitInstant(entry.getKey(), 
entry.getValue().getLeft(), 
entry.getValue().getRight())).collect(Collectors.toList());

Review Comment:
   See the cleaing of bootstrap event and the strategy for commiting empty 
instant.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to