pnowojski commented on a change in pull request #8693:
URL: https://github.com/apache/flink/pull/8693#discussion_r425288321



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
##########
@@ -311,6 +311,11 @@ public void notifyCheckpointComplete(long checkpointId) {
                //Nothing to do
        }
 
+       @Override
+       public void notifyCheckpointAborted(long checkpointId) {
+               // nothing to do
+       }
+

Review comment:
       As @rkhachatryan mentioned, this can be removed thanks to the default 
implementation?

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
##########
@@ -464,6 +464,11 @@ public void notifyCheckpointComplete(long 
completedCheckpointId) throws Exceptio
                }
        }
 
+       @Override
+       public void notifyCheckpointAborted(long checkpointId) {
+               // nothing to do
+       }
+

Review comment:
       ditto about dropping?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java
##########
@@ -38,4 +38,12 @@
         * @throws Exception
         */
        void notifyCheckpointComplete(long checkpointId) throws Exception;
+
+       /**
+        * This method is called as a notification once a distributed 
checkpoint has been aborted.
+        *
+        * @param checkpointId The ID of the checkpoint that has been aborted.
+        * @throws Exception
+        */
+       default void notifyCheckpointAborted(long checkpointId) throws 
Exception {}

Review comment:
       TLDR; I would keep it as it is.
   
   I would lean toward a single interface. For me, when I'm implementing some 
operators/functions, it's for example quite annoying that I have to 
guess/google search what are all of relevant interfaces that I need. For 
example existing `CheckpointedFunction` and `CheckpointListener` are very often 
used together, both are exposing notifications for checkpointing, but we are 
requiring users to discover them separately. If they were combined into a 
single thing, users would learn much more easily what's exposed by Flink to 
functions/operators.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
##########
@@ -60,17 +63,22 @@
 class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator 
{
 
        private static final Logger LOG = 
LoggerFactory.getLogger(SubtaskCheckpointCoordinatorImpl.class);
+       private static final int DEFAULT_MAX_RECORD_ABORTED_CHECKPOINTS = 128;
 
        private final CachingCheckpointStorageWorkerView checkpointStorage;
        private final String taskName;
-       private final CloseableRegistry closeableRegistry;
+       private final AsyncCheckpointRunnableRegistry 
asyncCheckpointRunnableRegistry;
        private final ExecutorService executorService;
        private final Environment env;
        private final AsyncExceptionHandler asyncExceptionHandler;
        private final ChannelStateWriter channelStateWriter;
        private final StreamTaskActionExecutor actionExecutor;
        private final boolean unalignedCheckpointEnabled;
        private final BiFunctionWithException<ChannelStateWriter, Long, 
CompletableFuture<Void>, IOException> prepareInputSnapshot;
+       /** The IDs of the checkpoint for which we are notified aborted. */
+       private final NavigableSet<Long> abortedCheckpointIds;

Review comment:
       Why do we need to track the aborted checkpoints?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to