Myasuka commented on a change in pull request #8693:
URL: https://github.com/apache/flink/pull/8693#discussion_r425359640
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
##########
@@ -60,17 +63,22 @@
class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator
{
private static final Logger LOG =
LoggerFactory.getLogger(SubtaskCheckpointCoordinatorImpl.class);
+ private static final int DEFAULT_MAX_RECORD_ABORTED_CHECKPOINTS = 128;
private final CachingCheckpointStorageWorkerView checkpointStorage;
private final String taskName;
- private final CloseableRegistry closeableRegistry;
+ private final AsyncCheckpointRunnableRegistry
asyncCheckpointRunnableRegistry;
private final ExecutorService executorService;
private final Environment env;
private final AsyncExceptionHandler asyncExceptionHandler;
private final ChannelStateWriter channelStateWriter;
private final StreamTaskActionExecutor actionExecutor;
private final boolean unalignedCheckpointEnabled;
private final BiFunctionWithException<ChannelStateWriter, Long,
CompletableFuture<Void>, IOException> prepareInputSnapshot;
+ /** The IDs of the checkpoint for which we are notified aborted. */
+ private final NavigableSet<Long> abortedCheckpointIds;
Review comment:
The notification of aborted checkpoints could come before task start to
execute the checkpoint.
We store aborted checkpoints so that we could avoid to execute any sync or
async phase of checkpoint on task side when we know this checkpoint has been
aborted.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]