kezhuw commented on a change in pull request #15605:
URL: https://github.com/apache/flink/pull/15605#discussion_r612996743
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java
##########
@@ -64,6 +69,17 @@
subtaskAccess.createEventSendAction(serializedEvent);
final CompletableFuture<Acknowledge> result = new
CompletableFuture<>();
+ result.whenCompleteAsync(
Review comment:
What if this is a dated gateway ? Will the result be counted for next
checkpoint ?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
##########
@@ -301,6 +308,43 @@ private void checkpointCoordinatorInternal(
}
}
+ private void completeCheckpointOnceEventsAreDone(
+ final long checkpointId,
+ final CompletableFuture<byte[]> checkpointFuture,
+ final byte[] checkpointResult) {
+
+ final Collection<CompletableFuture<?>> pendingEvents =
+ unconfirmedEvents.getCurrentIncompleteAndReset();
+ if (pendingEvents.isEmpty()) {
+ checkpointFuture.complete(checkpointResult);
+ return;
+ }
+
+ LOG.info(
+ "Coordinator checkpoint {} for coordinator {} is awaiting {}
pending events",
+ checkpointId,
+ operatorId,
+ pendingEvents.size());
+
+ final CompletableFuture<?> conjunct =
FutureUtils.waitForAll(pendingEvents);
Review comment:
For self backlog(not reviewing): This should still work only for source
tasks. This guarantees that checkpoint barrier for all tasks will come after
events before checkpoint barrier injection. But for no source tasks, new events
could arrive before checkpoint barrier from upstream.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]