dawidwys commented on a change in pull request #19138:
URL: https://github.com/apache/flink/pull/19138#discussion_r829830935



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -112,21 +126,36 @@ public void init() throws Exception {
     @Override
     public CompletableFuture<Boolean> triggerCheckpointAsync(
             CheckpointMetaData checkpointMetaData, CheckpointOptions 
checkpointOptions) {
-        if (!isExternallyInducedSource) {
-            if (isSynchronous(checkpointOptions.getCheckpointType())) {
-                return triggerStopWithSavepointAsync(checkpointMetaData, 
checkpointOptions);
-            } else {
-                return super.triggerCheckpointAsync(checkpointMetaData, 
checkpointOptions);
-            }
-        } else if 
(checkpointOptions.getCheckpointType().equals(CheckpointType.FULL_CHECKPOINT)) {
-            // see FLINK-25256
-            throw new IllegalStateException(
-                    "Using externally induced sources, we can not enforce 
taking a full checkpoint."
-                            + "If you are restoring from a snapshot in 
NO_CLAIM mode, please use"
-                            + " either CLAIM or LEGACY mode.");
-        } else {
-            return CompletableFuture.completedFuture(isRunning());
-        }
+        CompletableFuture<Boolean> triggerFuture = new CompletableFuture<>();
+        // immediately move RPC to mailbox so we don't need to synchronize 
fields
+        mainMailboxExecutor.execute(

Review comment:
       We kind of do. The only check so far outside of the mailbox was a local 
flag check `isSynchronous(checkpointOptions.getCheckpointType())`. All the rest 
is immediately executed in the mailbox.
   
   As for the change, the way I see it now, is that triggering a checkpoint 
will require two mailbox actions instead of one.
   One for checking the flags (`isSynchronous`/`externallyInduced` or so) and 
the second one for executing the actual trigger. 
   
   I am a bit hesitant about such a change that affects all jobs for the sake 
of really rare ones that use `externallyInducedSources`.
   
   The alternatives I see are either 1) rework `triggerStopWithSavepointAsync`, 
`triggerCheckpointAsync` to not submit a second action, which IMO would not be 
trivial and poses some risks or 2) execute the logic for externally induced 
sources in the mailbox and act on its results. Sth like:
   
   ```
   enum TriggerAction {
   IMMEDIATE,
   DELAYED
   }
   
   public CompletableFuture<Boolean> triggerCheckpointAsync(
             CheckpointMetaData checkpointMetaData, CheckpointOptions 
checkpointOptions) {
       if (isExternallyInducedSource) {
         return triggerExternallyInducedSourcesCheckpointAsync()
           .thenCompose(action -> {
                 switch (action) {
                        case IMMEDIATE:
                            return doTriggerCheckpointAsync();
                        case DELAYED:
                            return 
CompletableFuture.completedFuture(isRunning());
                 }
             })
       } else {
         return doTriggerCheckpointAsync(...);
       }
   }
   
   CompletableFuture<TriggerAction> 
triggerExternallyInducedSourcesCheckpointAsync() {
   mainMailboxExecutor.execute(...)
   return action;
   }
   
   CompletableFuture<Boolean> doTriggerCheckpointAsync(CheckpointMetaData 
checkpointMetaData, CheckpointOptions checkpointOptions) {
   ...
   }
   
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to