dawidwys commented on a change in pull request #19138:
URL: https://github.com/apache/flink/pull/19138#discussion_r829830935
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -112,21 +126,36 @@ public void init() throws Exception {
@Override
public CompletableFuture<Boolean> triggerCheckpointAsync(
CheckpointMetaData checkpointMetaData, CheckpointOptions
checkpointOptions) {
- if (!isExternallyInducedSource) {
- if (isSynchronous(checkpointOptions.getCheckpointType())) {
- return triggerStopWithSavepointAsync(checkpointMetaData,
checkpointOptions);
- } else {
- return super.triggerCheckpointAsync(checkpointMetaData,
checkpointOptions);
- }
- } else if
(checkpointOptions.getCheckpointType().equals(CheckpointType.FULL_CHECKPOINT)) {
- // see FLINK-25256
- throw new IllegalStateException(
- "Using externally induced sources, we can not enforce
taking a full checkpoint."
- + "If you are restoring from a snapshot in
NO_CLAIM mode, please use"
- + " either CLAIM or LEGACY mode.");
- } else {
- return CompletableFuture.completedFuture(isRunning());
- }
+ CompletableFuture<Boolean> triggerFuture = new CompletableFuture<>();
+ // immediately move RPC to mailbox so we don't need to synchronize
fields
+ mainMailboxExecutor.execute(
Review comment:
We kind of do. The only check so far outside of the mailbox was a local
flag check `isSynchronous(checkpointOptions.getCheckpointType())`. All the rest
is immediately executed in the mailbox.
As for the change, the way I see it now, is that triggering a checkpoint
will require two mailbox actions instead of one.
One for checking the flags (`isSynchronous`/`externallyInduced` or so) and
the second one for executing the actual trigger.
I am a bit hesitant about such a change that affects all jobs for the sake
of really rare ones that use `externallyInducedSources`.
The alternatives I see are either 1) rework `triggerStopWithSavepointAsync`,
`triggerCheckpointAsync` to not submit a second action, which IMO would not be
trivial and poses some risks or 2) execute the logic for externally induced
sources in the mailbox and act on its results. Sth like:
```
enum TriggerAction {
IMMEDIATE,
DELAYED
}
public CompletableFuture<Boolean> triggerCheckpointAsync(
CheckpointMetaData checkpointMetaData, CheckpointOptions
checkpointOptions) {
if (isExternallyInducedSource) {
return triggerExternallyInducedSourcesCheckpointAsync()
.thenCompose(action -> {
switch (action) {
case IMMEDIATE:
return doTriggerCheckpointAsync();
case DELAYED:
return
CompletableFuture.completedFuture(isRunning());
}
})
} else {
return doTriggerCheckpointAsync(...);
}
}
CompletableFuture<TriggerAction>
triggerExternallyInducedSourcesCheckpointAsync() {
mainMailboxExecutor.execute(...)
return action;
}
CompletableFuture<Boolean> doTriggerCheckpointAsync(CheckpointMetaData
checkpointMetaData, CheckpointOptions checkpointOptions) {
...
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]