pnowojski commented on a change in pull request #13465:
URL: https://github.com/apache/flink/pull/13465#discussion_r494376371
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
##########
@@ -114,4 +137,81 @@ protected void createInputProcessor(
operatorChain,
setupNumRecordsInCounter(mainOperator));
}
+
+ @Override
+ public Future<Boolean> triggerCheckpointAsync(
+ CheckpointMetaData metadata,
+ CheckpointOptions options,
+ boolean advanceToEndOfEventTime) {
+
+ CompletableFuture<Boolean> resultFuture = new
CompletableFuture<>();
+ mainMailboxExecutor.execute(
+ () -> {
+ try {
+ /**
+ * Contrary to {@link
SourceStreamTask}, we are not using here
+ * {@link
StreamTask#latestAsyncCheckpointStartDelayNanos} to measure the start delay
+ * metric, but we will be using {@link
CheckpointBarrierHandler#getCheckpointStartDelayNanos()}
+ * instead.
+ */
+
pendingCheckpointCompletedFutures.put(metadata.getCheckpointId(), resultFuture);
+
checkPendingCheckpointCompletedFuturesSize();
+ triggerSourcesCheckpoint(new
CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(),
options));
+ }
+ catch (Exception ex) {
+ // Report the failure both via the
Future result but also to the mailbox
+
pendingCheckpointCompletedFutures.remove(metadata.getCheckpointId());
+ resultFuture.completeExceptionally(ex);
+ throw ex;
+ }
+ },
+ "checkpoint %s with %s",
+ metadata,
+ options);
+ return resultFuture;
+ }
+
+ private void checkPendingCheckpointCompletedFuturesSize() {
+ while (pendingCheckpointCompletedFutures.size() >
MAX_TRACKED_CHECKPOINTS) {
+ Long minCheckpointID =
Collections.min(pendingCheckpointCompletedFutures.keySet());
Review comment:
Ehhhh... ok, but that's overengineering a bit :)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]