sundargates commented on a change in pull request #13784:
URL: https://github.com/apache/flink/pull/13784#discussion_r514584463



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
##########
@@ -48,7 +49,7 @@
  */
 public abstract class AbstractKeyedStateBackend<K> implements
                CheckpointableKeyedStateBackend<K>,
-               CheckpointListener {
+                               CheckpointListener {

Review comment:
       nit: this looks like a spurious change?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
##########
@@ -20,6 +20,7 @@
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.CheckpointListener;

Review comment:
       nit: Considering that you have anyways backported the CheckpointListener 
under the current package, wondering if you need to migrate all the usages as 
part of this change? Would it not be easy to just do the refactoring in a 
separate PR?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
##########
@@ -27,15 +28,33 @@
  * This context is the interface through which the {@link 
CheckpointCoordinator} interacts with an
  * {@link OperatorCoordinator} during checkpointing and checkpoint restoring.
  */
-public interface OperatorCoordinatorCheckpointContext extends OperatorInfo {
+public interface OperatorCoordinatorCheckpointContext extends OperatorInfo, 
CheckpointListener {
 
        void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> 
result) throws Exception;
 
        void afterSourceBarrierInjection(long checkpointId);
 
        void abortCurrentTriggering();
 
-       void checkpointComplete(long checkpointId);
+       /**
+        * This method does not throw exception because the operator 
coordinator implementation
+        * is supposed to call {@link 
OperatorCoordinator.Context#failJob(Throwable)} in case
+        * of receiving an exception.
+        *
+        * @param checkpointId The ID of the checkpoint that has been aborted.
+        */
+       @Override
+       void notifyCheckpointComplete(long checkpointId);
+
+       /**
+        * This method does not throw exception because the operator 
coordinator implementation
+        * is supposed to call {@link 
OperatorCoordinator.Context#failJob(Throwable)} in case
+        * of receiving an exception.
+        *
+        * @param checkpointId The ID of the checkpoint that has been aborted.
+        */
+       @Override
+       default void notifyCheckpointAborted(long checkpointId) {}

Review comment:
       why are these methods being overridden here?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
##########
@@ -122,7 +123,18 @@
         * instead. Any exception propagating from this method may be treated 
as a fatal error for the
         * JobManager, crashing the JobManager, and leading to an expensive 
"master failover" procedure.
         */
-       void checkpointComplete(long checkpointId);
+       @Override
+       void notifyCheckpointComplete(long checkpointId);
+
+       /**
+        * The OperatorCoordinator should not throw exception when notified an 
aborted checkpoint.
+        * Instead, the implementation should just fail the task by using
+        * {@link Context#failJob(Throwable)}.
+        *
+        * @param checkpointId The ID of the checkpoint that has been aborted.
+        */
+       @Override
+       default void notifyCheckpointAborted(long checkpointId) {}

Review comment:
       Same as above - not sure I understand why these methods are being 
overridden here. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to