kezhuw commented on a change in pull request #15605:
URL: https://github.com/apache/flink/pull/15605#discussion_r613142677



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
##########
@@ -301,6 +308,43 @@ private void checkpointCoordinatorInternal(
         }
     }
 
+    private void completeCheckpointOnceEventsAreDone(
+            final long checkpointId,
+            final CompletableFuture<byte[]> checkpointFuture,
+            final byte[] checkpointResult) {
+
+        final Collection<CompletableFuture<?>> pendingEvents =
+                unconfirmedEvents.getCurrentIncompleteAndReset();
+        if (pendingEvents.isEmpty()) {
+            checkpointFuture.complete(checkpointResult);
+            return;
+        }
+
+        LOG.info(
+                "Coordinator checkpoint {} for coordinator {} is awaiting {} 
pending events",
+                checkpointId,
+                operatorId,
+                pendingEvents.size());
+
+        final CompletableFuture<?> conjunct = 
FutureUtils.waitForAll(pendingEvents);

Review comment:
       Still self backlog, for solely source exactly-once, this do more than 
minimum.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to