pnowojski commented on a change in pull request #11814:
URL: https://github.com/apache/flink/pull/11814#discussion_r425307799



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
##########
@@ -240,7 +240,8 @@ private void finishAndReportAsync(Map<OperatorID, 
OperatorSnapshotFutures> snaps
                final Future<?> channelWrittenFuture;
                if (unalignedCheckpointEnabled) {
                        ChannelStateWriteResult writeResult = 
channelStateWriter.getWriteResult(metadata.getCheckpointId());
-                       channelWrittenFuture = writeResult.getJointFuture();
+                       channelWrittenFuture = writeResult.getJointFuture()
+                               .whenComplete((dummy, ex) -> 
channelStateWriter.notifyCheckpointComplete(metadata.getCheckpointId()));

Review comment:
       nit: I guess it's no longer `notifyCheckpointComplete`, but 
`finishedWriting(...)`?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
##########
@@ -189,6 +188,7 @@ private void cleanup(
                        CheckpointMetrics metrics, CheckpointOptions options,
                        Exception ex) throws Exception {
 
+               channelStateWriter.abort(metadata.getCheckpointId(), ex);

Review comment:
       https://github.com/apache/flink/pull/8693 will add extra 
`notifyCheckpointAborted` that we could use here. I guess it's a race which PR 
is going to be merged first.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to