AHeise commented on a change in pull request #19138:
URL: https://github.com/apache/flink/pull/19138#discussion_r829559288



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -112,21 +126,36 @@ public void init() throws Exception {
     @Override
     public CompletableFuture<Boolean> triggerCheckpointAsync(
             CheckpointMetaData checkpointMetaData, CheckpointOptions 
checkpointOptions) {
-        if (!isExternallyInducedSource) {
-            if (isSynchronous(checkpointOptions.getCheckpointType())) {
-                return triggerStopWithSavepointAsync(checkpointMetaData, 
checkpointOptions);
-            } else {
-                return super.triggerCheckpointAsync(checkpointMetaData, 
checkpointOptions);
-            }
-        } else if 
(checkpointOptions.getCheckpointType().equals(CheckpointType.FULL_CHECKPOINT)) {
-            // see FLINK-25256
-            throw new IllegalStateException(
-                    "Using externally induced sources, we can not enforce 
taking a full checkpoint."
-                            + "If you are restoring from a snapshot in 
NO_CLAIM mode, please use"
-                            + " either CLAIM or LEGACY mode.");
-        } else {
-            return CompletableFuture.completedFuture(isRunning());
-        }
+        CompletableFuture<Boolean> triggerFuture = new CompletableFuture<>();
+        // immediately move RPC to mailbox so we don't need to synchronize 
fields
+        mainMailboxExecutor.execute(

Review comment:
       Not sure about the performance implications here. In general, why don't 
we process all RPC directly in the mailbox?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -112,21 +126,36 @@ public void init() throws Exception {
     @Override
     public CompletableFuture<Boolean> triggerCheckpointAsync(
             CheckpointMetaData checkpointMetaData, CheckpointOptions 
checkpointOptions) {
-        if (!isExternallyInducedSource) {
-            if (isSynchronous(checkpointOptions.getCheckpointType())) {
-                return triggerStopWithSavepointAsync(checkpointMetaData, 
checkpointOptions);
-            } else {
-                return super.triggerCheckpointAsync(checkpointMetaData, 
checkpointOptions);
-            }
-        } else if 
(checkpointOptions.getCheckpointType().equals(CheckpointType.FULL_CHECKPOINT)) {
-            // see FLINK-25256
-            throw new IllegalStateException(
-                    "Using externally induced sources, we can not enforce 
taking a full checkpoint."
-                            + "If you are restoring from a snapshot in 
NO_CLAIM mode, please use"
-                            + " either CLAIM or LEGACY mode.");
-        } else {
-            return CompletableFuture.completedFuture(isRunning());
-        }
+        CompletableFuture<Boolean> triggerFuture = new CompletableFuture<>();
+        // immediately move RPC to mailbox so we don't need to synchronize 
fields
+        mainMailboxExecutor.execute(

Review comment:
       I quickly reworked it, such that it only needs 2 mails for the rare case 
of externally induced sources. WDYT?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -162,19 +191,21 @@ protected void advanceToEndOfEventTime() {
     // --------------------------
 
     private void triggerCheckpointForExternallyInducedSource(long 
checkpointId) {
-        final CheckpointOptions checkpointOptions =
-                CheckpointOptions.forConfig(
-                        CheckpointType.CHECKPOINT,
-                        CheckpointStorageLocationReference.getDefault(),
-                        configuration.isExactlyOnceCheckpointMode(),
-                        configuration.isUnalignedCheckpointsEnabled(),
-                        
configuration.getAlignedCheckpointTimeout().toMillis());
-        final long timestamp = System.currentTimeMillis();
-
-        final CheckpointMetaData checkpointMetaData =
-                new CheckpointMetaData(checkpointId, timestamp, timestamp);
-
-        super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
+        // cleanup any old checkpoint that was cancelled before trigger
+        untriggeredCheckpoints.headMap(checkpointId).clear();
+        UntriggeredCheckpoint untriggeredCheckpoint = 
untriggeredCheckpoints.remove(checkpointId);
+        if (untriggeredCheckpoint != null) {
+            // common case: RPC before external sources induces it
+            super.triggerCheckpointAsync(
+                    untriggeredCheckpoint.getMetadata(),
+                    untriggeredCheckpoint.getCheckpointOptions());
+        } else {
+            // rare case: external source induced first
+            // note that at this point, we should probably not emit more data 
such that data is
+            // properly aligned
+            // however, unless we receive a reliable checkpoint abort RPC, 
this may deadlock

Review comment:
       We should probably discuss if this is the best choice.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -112,21 +126,36 @@ public void init() throws Exception {
     @Override
     public CompletableFuture<Boolean> triggerCheckpointAsync(
             CheckpointMetaData checkpointMetaData, CheckpointOptions 
checkpointOptions) {
-        if (!isExternallyInducedSource) {
-            if (isSynchronous(checkpointOptions.getCheckpointType())) {
-                return triggerStopWithSavepointAsync(checkpointMetaData, 
checkpointOptions);
-            } else {
-                return super.triggerCheckpointAsync(checkpointMetaData, 
checkpointOptions);
-            }
-        } else if 
(checkpointOptions.getCheckpointType().equals(CheckpointType.FULL_CHECKPOINT)) {
-            // see FLINK-25256
-            throw new IllegalStateException(
-                    "Using externally induced sources, we can not enforce 
taking a full checkpoint."
-                            + "If you are restoring from a snapshot in 
NO_CLAIM mode, please use"
-                            + " either CLAIM or LEGACY mode.");
-        } else {
-            return CompletableFuture.completedFuture(isRunning());
-        }
+        CompletableFuture<Boolean> triggerFuture = new CompletableFuture<>();
+        // immediately move RPC to mailbox so we don't need to synchronize 
fields
+        mainMailboxExecutor.execute(
+                () -> {
+                    // cleanup any old checkpoint that was cancelled before 
trigger
+                    
triggeredCheckpoints.headSet(checkpointMetaData.getCheckpointId()).clear();

Review comment:
       The cleanup (and the one in #trigger) don't work well with concurrent 
checkpoints. Do we have a way to determine max concurrent checkpoints or can we 
actually rely on `abortCheckpoint`?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -112,21 +126,36 @@ public void init() throws Exception {
     @Override
     public CompletableFuture<Boolean> triggerCheckpointAsync(
             CheckpointMetaData checkpointMetaData, CheckpointOptions 
checkpointOptions) {
-        if (!isExternallyInducedSource) {
-            if (isSynchronous(checkpointOptions.getCheckpointType())) {
-                return triggerStopWithSavepointAsync(checkpointMetaData, 
checkpointOptions);
-            } else {
-                return super.triggerCheckpointAsync(checkpointMetaData, 
checkpointOptions);
-            }
-        } else if 
(checkpointOptions.getCheckpointType().equals(CheckpointType.FULL_CHECKPOINT)) {
-            // see FLINK-25256
-            throw new IllegalStateException(
-                    "Using externally induced sources, we can not enforce 
taking a full checkpoint."
-                            + "If you are restoring from a snapshot in 
NO_CLAIM mode, please use"
-                            + " either CLAIM or LEGACY mode.");
-        } else {
-            return CompletableFuture.completedFuture(isRunning());
-        }
+        CompletableFuture<Boolean> triggerFuture = new CompletableFuture<>();
+        // immediately move RPC to mailbox so we don't need to synchronize 
fields
+        mainMailboxExecutor.execute(

Review comment:
       my bad, I didn't force push -.-

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -112,21 +126,36 @@ public void init() throws Exception {
     @Override
     public CompletableFuture<Boolean> triggerCheckpointAsync(
             CheckpointMetaData checkpointMetaData, CheckpointOptions 
checkpointOptions) {
-        if (!isExternallyInducedSource) {
-            if (isSynchronous(checkpointOptions.getCheckpointType())) {
-                return triggerStopWithSavepointAsync(checkpointMetaData, 
checkpointOptions);
-            } else {
-                return super.triggerCheckpointAsync(checkpointMetaData, 
checkpointOptions);
-            }
-        } else if 
(checkpointOptions.getCheckpointType().equals(CheckpointType.FULL_CHECKPOINT)) {
-            // see FLINK-25256
-            throw new IllegalStateException(
-                    "Using externally induced sources, we can not enforce 
taking a full checkpoint."
-                            + "If you are restoring from a snapshot in 
NO_CLAIM mode, please use"
-                            + " either CLAIM or LEGACY mode.");
-        } else {
-            return CompletableFuture.completedFuture(isRunning());
-        }
+        CompletableFuture<Boolean> triggerFuture = new CompletableFuture<>();
+        // immediately move RPC to mailbox so we don't need to synchronize 
fields
+        mainMailboxExecutor.execute(

Review comment:
       Yes, it's heavily inspired by your idea but I didn't get the idea behind 
`TriggerAction`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to