pnowojski commented on a change in pull request #12186:
URL: https://github.com/apache/flink/pull/12186#discussion_r426128101
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
##########
@@ -279,16 +281,18 @@ private void takeSnapshotSync(
try {
for (StreamOperatorWrapper<?, ?> operatorWrapper :
operatorChain.getAllOperators(true)) {
- operatorSnapshotsInProgress.put(
-
operatorWrapper.getStreamOperator().getOperatorID(),
- buildOperatorSnapshotFutures(
- checkpointMetaData,
- checkpointOptions,
- operatorChain,
-
operatorWrapper.getStreamOperator(),
- isCanceled,
- channelStateWriteResult,
- storage));
+ if (!operatorWrapper.isClosed()) {
Review comment:
But we have to be careful with the implementation here. I think we can
not simply throw an exception, as after fixing
https://issues.apache.org/jira/browse/FLINK-17350 (my PR pending review),
exceptions thrown from here would fail the task immediately.
On the other hand, letting this task continue running (even if it's
closing?), we could repeat some mistakes from FLINK-17350.
But as there was no exception in any of the operators, I think it might be
fine to just cancel this checkpoint, clean it up and decline it (inform
`CheckpointCoordinator` that it has failed).
Another complication might be, how is this suppose to work with stop with
savepoint? I hope `isClosed()` couldn't happen in that scenario, since failures
while stopping with savepoint might be irrecoverable.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]