dawidwys commented on a change in pull request #19138:
URL: https://github.com/apache/flink/pull/19138#discussion_r829919903



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -113,19 +127,45 @@ public void init() throws Exception {
     public CompletableFuture<Boolean> triggerCheckpointAsync(
             CheckpointMetaData checkpointMetaData, CheckpointOptions 
checkpointOptions) {
         if (!isExternallyInducedSource) {
-            if (isSynchronous(checkpointOptions.getCheckpointType())) {
-                return triggerStopWithSavepointAsync(checkpointMetaData, 
checkpointOptions);
-            } else {
-                return super.triggerCheckpointAsync(checkpointMetaData, 
checkpointOptions);
-            }
-        } else if 
(checkpointOptions.getCheckpointType().equals(CheckpointType.FULL_CHECKPOINT)) {
-            // see FLINK-25256
-            throw new IllegalStateException(
-                    "Using externally induced sources, we can not enforce 
taking a full checkpoint."
-                            + "If you are restoring from a snapshot in 
NO_CLAIM mode, please use"
-                            + " either CLAIM or LEGACY mode.");
+            return triggerCheckpointNowAsync(checkpointMetaData, 
checkpointOptions);
+        }
+        CompletableFuture<Boolean> triggerFuture = new CompletableFuture<>();
+        // immediately move RPC to mailbox so we don't need to synchronize 
fields
+        mainMailboxExecutor.execute(
+                () ->
+                        triggerCheckpointOnExternallyInducedSource(
+                                checkpointMetaData, checkpointOptions, 
triggerFuture),
+                "SourceOperatorStreamTask#triggerCheckpointAsync");
+        return triggerFuture;
+    }
+
+    private void triggerCheckpointOnExternallyInducedSource(
+            CheckpointMetaData checkpointMetaData,
+            CheckpointOptions checkpointOptions,
+            CompletableFuture<Boolean> triggerFuture) {
+        // cleanup any old checkpoint that was cancelled before trigger
+        
triggeredCheckpoints.headSet(checkpointMetaData.getCheckpointId()).clear();
+        if 
(!triggeredCheckpoints.remove(checkpointMetaData.getCheckpointId())) {
+            // common case: RPC is received before source reader triggers 
checkpoint
+            // store metadata and options for later
+            untriggeredCheckpoints.put(
+                    checkpointMetaData.getCheckpointId(),
+                    new UntriggeredCheckpoint(checkpointMetaData, 
checkpointOptions));
+            triggerFuture.complete(isRunning());
+        } else {
+            // not externally induced or trigger already received (rare case)

Review comment:
       I guess, the comment is wrong now? It is only `trigger already received 
(rare case)`, right?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -109,23 +125,55 @@ public void init() throws Exception {
                         this::getAsyncCheckpointStartDelayNanos);
     }
 
+    @Override
+    protected void processInput(Controller controller) throws Exception {
+        if (!isExternallyInducedSource || triggeredCheckpoints.isEmpty()) {

Review comment:
       Doesn't it cause hot looping? It also adds an additional condition on 
the hot path, doesn't it?
   
   Would it be possible to `suspendDefaultAction` when we add something to the 
`triggeredCheckpoints` and unfreeze it once the `triggeredCheckpoints` are 
empty?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -159,22 +207,54 @@ protected void advanceToEndOfEventTime() {
         output.emitWatermark(Watermark.MAX_WATERMARK);
     }
 
+    @Override
+    protected void declineCheckpoint(long checkpointId) {
+        cleanupCheckpoint(checkpointId);
+        super.declineCheckpoint(checkpointId);
+    }
+
+    @Override
+    public Future<Void> notifyCheckpointAbortAsync(
+            long checkpointId, long latestCompletedCheckpointId) {
+        cleanupCheckpoint(checkpointId);
+        return super.notifyCheckpointAbortAsync(checkpointId, 
latestCompletedCheckpointId);
+    }
+
+    @Override
+    public Future<Void> notifyCheckpointSubsumedAsync(long checkpointId) {
+        cleanupCheckpoint(checkpointId);
+        return super.notifyCheckpointSubsumedAsync(checkpointId);
+    }
+
     // --------------------------
 
     private void triggerCheckpointForExternallyInducedSource(long 
checkpointId) {
-        final CheckpointOptions checkpointOptions =
-                CheckpointOptions.forConfig(
-                        CheckpointType.CHECKPOINT,
-                        CheckpointStorageLocationReference.getDefault(),
-                        configuration.isExactlyOnceCheckpointMode(),
-                        configuration.isUnalignedCheckpointsEnabled(),
-                        
configuration.getAlignedCheckpointTimeout().toMillis());
-        final long timestamp = System.currentTimeMillis();
+        UntriggeredCheckpoint untriggeredCheckpoint = 
untriggeredCheckpoints.remove(checkpointId);
+        if (untriggeredCheckpoint != null) {
+            cleanupOldCheckpoints(checkpointId);
+            // common case: RPC before external sources induces it
+            triggerCheckpointNowAsync(
+                    untriggeredCheckpoint.getMetadata(),
+                    untriggeredCheckpoint.getCheckpointOptions());
+        } else {
+            // rare case: external source induced first
+            triggeredCheckpoints.add(checkpointId);
+        }
+    }
 
-        final CheckpointMetaData checkpointMetaData =
-                new CheckpointMetaData(checkpointId, timestamp, timestamp);
+    /**
+     * Cleanup any orphaned checkpoint before the given currently triggered 
checkpoint. These
+     * checkpoint may occur when the checkpoint is cancelled but the RPC is 
lost.
+     */
+    private void cleanupOldCheckpoints(long checkpointId) {
+        triggeredCheckpoints.headSet(checkpointId).clear();
+        untriggeredCheckpoints.headMap(checkpointId).clear();
+    }
 
-        super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
+    /** Remove temporary data about a canceled checkpoint. */
+    private void cleanupCheckpoint(long checkpointId) {

Review comment:
       Shouldn't we do it in the mailbox? We are modifying fields that are 
accessed from RPC and the mailbox.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to