dawidwys commented on a change in pull request #19138:
URL: https://github.com/apache/flink/pull/19138#discussion_r830879183
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -159,22 +215,62 @@ protected void advanceToEndOfEventTime() {
output.emitWatermark(Watermark.MAX_WATERMARK);
}
+ @Override
+ protected void declineCheckpoint(long checkpointId) {
+ cleanupCheckpoint(checkpointId);
+ super.declineCheckpoint(checkpointId);
+ }
+
+ @Override
+ public Future<Void> notifyCheckpointAbortAsync(
+ long checkpointId, long latestCompletedCheckpointId) {
+ mainMailboxExecutor.execute(
+ () -> cleanupCheckpoint(checkpointId), "Cleanup checkpoint
%d", checkpointId);
+ return super.notifyCheckpointAbortAsync(checkpointId,
latestCompletedCheckpointId);
+ }
+
+ @Override
+ public Future<Void> notifyCheckpointSubsumedAsync(long checkpointId) {
+ mainMailboxExecutor.execute(
+ () -> cleanupCheckpoint(checkpointId), "Cleanup checkpoint
%d", checkpointId);
+ return super.notifyCheckpointSubsumedAsync(checkpointId);
+ }
+
// --------------------------
private void triggerCheckpointForExternallyInducedSource(long
checkpointId) {
- final CheckpointOptions checkpointOptions =
- CheckpointOptions.forConfig(
- CheckpointType.CHECKPOINT,
- CheckpointStorageLocationReference.getDefault(),
- configuration.isExactlyOnceCheckpointMode(),
- configuration.isUnalignedCheckpointsEnabled(),
-
configuration.getAlignedCheckpointTimeout().toMillis());
- final long timestamp = System.currentTimeMillis();
+ UntriggeredCheckpoint untriggeredCheckpoint =
untriggeredCheckpoints.remove(checkpointId);
+ if (untriggeredCheckpoint != null) {
+ cleanupOldCheckpoints(checkpointId);
+ // common case: RPC before external sources induces it
+ triggerCheckpointNowAsync(
+ untriggeredCheckpoint.getMetadata(),
+ untriggeredCheckpoint.getCheckpointOptions());
+ } else {
+ // rare case: external source induced first
+ triggeredCheckpoints.add(checkpointId);
+ if (waitForRPC.isDone()) {
+ waitForRPC = new CompletableFuture<>();
+ externallyInducedSourceInput.blockUntil(waitForRPC);
+ }
+ }
+ }
- final CheckpointMetaData checkpointMetaData =
- new CheckpointMetaData(checkpointId, timestamp, timestamp);
+ /**
+ * Cleanup any orphaned checkpoint before the given currently triggered
checkpoint. These
+ * checkpoint may occur when the checkpoint is cancelled but the RPC is
lost.
+ */
+ private void cleanupOldCheckpoints(long checkpointId) {
+ assert (mailboxProcessor.isMailboxThread());
+ triggeredCheckpoints.headSet(checkpointId).clear();
+ untriggeredCheckpoints.headMap(checkpointId).clear();
+ }
- super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
+ /** Remove temporary data about a canceled checkpoint. */
+ private void cleanupCheckpoint(long checkpointId) {
+ assert (mailboxProcessor.isMailboxThread());
Review comment:
Shouldn't we potentially unblock the input here? If the only pending
checkpoint was `aborted/declined/cancelled`?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -125,17 +130,10 @@ public void init() throws Exception {
this::getAsyncCheckpointStartDelayNanos);
}
- @Override
- protected void processInput(Controller controller) throws Exception {
- if (!isExternallyInducedSource || triggeredCheckpoints.isEmpty()) {
- super.processInput(controller);
- }
- }
-
@Override
public CompletableFuture<Boolean> triggerCheckpointAsync(
CheckpointMetaData checkpointMetaData, CheckpointOptions
checkpointOptions) {
- if (!isExternallyInducedSource) {
+ if (!isExternallInducedSource()) {
Review comment:
typo: `isExternallyInducedSource`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]