dawidwys commented on a change in pull request #19138:
URL: https://github.com/apache/flink/pull/19138#discussion_r829881790
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -112,21 +126,36 @@ public void init() throws Exception {
@Override
public CompletableFuture<Boolean> triggerCheckpointAsync(
CheckpointMetaData checkpointMetaData, CheckpointOptions
checkpointOptions) {
- if (!isExternallyInducedSource) {
- if (isSynchronous(checkpointOptions.getCheckpointType())) {
- return triggerStopWithSavepointAsync(checkpointMetaData,
checkpointOptions);
- } else {
- return super.triggerCheckpointAsync(checkpointMetaData,
checkpointOptions);
- }
- } else if
(checkpointOptions.getCheckpointType().equals(CheckpointType.FULL_CHECKPOINT)) {
- // see FLINK-25256
- throw new IllegalStateException(
- "Using externally induced sources, we can not enforce
taking a full checkpoint."
- + "If you are restoring from a snapshot in
NO_CLAIM mode, please use"
- + " either CLAIM or LEGACY mode.");
- } else {
- return CompletableFuture.completedFuture(isRunning());
- }
+ CompletableFuture<Boolean> triggerFuture = new CompletableFuture<>();
+ // immediately move RPC to mailbox so we don't need to synchronize
fields
+ mainMailboxExecutor.execute(
Review comment:
That looks actually very similar to the code I wrote ;) so I think I am
good with it. It's actually better without the `TriggerAction`
I will continue checking the PR as I have not read through everything.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]