dawidwys commented on pull request #14820:
URL: https://github.com/apache/flink/pull/14820#issuecomment-857734904
All right, I will need to think about it a bit more, but even assuming we
can receive an RPC signal while the input channels are still
active/non-finished I feel we can make it easier. I don't think there is a big
enough difference between source and non-source tasks to justify extracting a
separate hierarchy. The way I see it is that the only difference is if all the
input gates has finished or not. (If there are no input gates all of them are
obviously finished ;) ). In that scenario we could extend the
`triggerCheckpointAsyncInMailbox` (it was renamed on master) to handle both
cases quite easily.
I think we don't need to expose the `SingleCheckpointBarrierHandler` in any
way. Either with my approach or yours you would need to inject barriers on the
RPC call into the channels/or drain the channels somehow, right? Otherwise you
cannot checkpoint the data/align with the data in the channels properly. Or am
I missing something? Therefore you could inject the barrier into channel/drain
the channel from the `StreamTask` level and the logic in
`SingleCheckpointBarrierHandler` would still be valid.
Let me know what you think.
```
@Override
public Future<Boolean> triggerCheckpointAsync(
CheckpointMetaData checkpointMetaData, CheckpointOptions
checkpointOptions) {
CompletableFuture<Boolean> result = new CompletableFuture<>();
mainMailboxExecutor.execute(
() -> {
try {
result.complete(
triggerCheckpointAsyncInMailbox(
checkpointMetaData,
checkpointOptions));
} catch (Exception ex) {
// Report the failure both via the Future result but
also to the mailbox
result.completeExceptionally(ex);
throw ex;
}
},
"checkpoint %s with %s",
checkpointMetaData,
checkpointOptions);
return result;
}
private boolean triggerCheckpointAsyncInMailbox(
CheckpointMetaData checkpointMetaData, CheckpointOptions
checkpointOptions)
throws Exception {
FlinkSecurityManager.monitorUserSystemExitForCurrentThread();
try {
if (allInputsFinished()) {
return triggerCheckpointInRootNode(checkpointMetaData,
checkpointOptions);
} else {
throw new UnsupportedOperationException(
"We do not support triggering non root nodes yet.");
}
} catch (Exception e) {
// propagate exceptions only if the task is still in "running"
state
if (isRunning) {
throw new Exception(
"Could not perform checkpoint "
+ checkpointMetaData.getCheckpointId()
+ " for operator "
+ getName()
+ '.',
e);
} else {
LOG.debug(
"Could not perform checkpoint {} for operator {}
while the "
+ "invokable was not in state running.",
checkpointMetaData.getCheckpointId(),
getName(),
e);
return false;
}
} finally {
FlinkSecurityManager.unmonitorUserSystemExitForCurrentThread();
}
}
private boolean allInputsFinished() {
return getEnvironment().getAllInputGates().length == 0
|| Arrays.stream(getEnvironment().getAllInputGates())
.allMatch(InputGate::isFinished);
}
private boolean triggerCheckpointInRootNode(
CheckpointMetaData checkpointMetaData, CheckpointOptions
checkpointOptions)
throws Exception {
latestAsyncCheckpointStartDelayNanos =
1_000_000
* Math.max(
0, System.currentTimeMillis() -
checkpointMetaData.getTimestamp());
// No alignment if we inject a checkpoint
CheckpointMetricsBuilder checkpointMetrics =
new CheckpointMetricsBuilder()
.setAlignmentDurationNanos(0L)
.setBytesProcessedDuringAlignment(0L)
.setCheckpointStartDelayNanos(latestAsyncCheckpointStartDelayNanos);
subtaskCheckpointCoordinator.initInputsCheckpoint(
checkpointMetaData.getCheckpointId(), checkpointOptions);
boolean success =
performCheckpoint(checkpointMetaData, checkpointOptions,
checkpointMetrics);
if (!success) {
declineCheckpoint(checkpointMetaData.getCheckpointId());
}
return success;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]