sundargates commented on a change in pull request #13784:
URL: https://github.com/apache/flink/pull/13784#discussion_r514584463
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
##########
@@ -48,7 +49,7 @@
*/
public abstract class AbstractKeyedStateBackend<K> implements
CheckpointableKeyedStateBackend<K>,
- CheckpointListener {
+ CheckpointListener {
Review comment:
nit: this looks like a spurious change?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
##########
@@ -20,6 +20,7 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.CheckpointListener;
Review comment:
nit: Considering that you have anyways backported the CheckpointListener
under the current package, wondering if you need to migrate all the usages as
part of this change? Would it not be easy to just do the refactoring in a
separate PR?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpointContext.java
##########
@@ -27,15 +28,33 @@
* This context is the interface through which the {@link
CheckpointCoordinator} interacts with an
* {@link OperatorCoordinator} during checkpointing and checkpoint restoring.
*/
-public interface OperatorCoordinatorCheckpointContext extends OperatorInfo {
+public interface OperatorCoordinatorCheckpointContext extends OperatorInfo,
CheckpointListener {
void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]>
result) throws Exception;
void afterSourceBarrierInjection(long checkpointId);
void abortCurrentTriggering();
- void checkpointComplete(long checkpointId);
+ /**
+ * This method does not throw exception because the operator
coordinator implementation
+ * is supposed to call {@link
OperatorCoordinator.Context#failJob(Throwable)} in case
+ * of receiving an exception.
+ *
+ * @param checkpointId The ID of the checkpoint that has been aborted.
+ */
+ @Override
+ void notifyCheckpointComplete(long checkpointId);
+
+ /**
+ * This method does not throw exception because the operator
coordinator implementation
+ * is supposed to call {@link
OperatorCoordinator.Context#failJob(Throwable)} in case
+ * of receiving an exception.
+ *
+ * @param checkpointId The ID of the checkpoint that has been aborted.
+ */
+ @Override
+ default void notifyCheckpointAborted(long checkpointId) {}
Review comment:
why are these methods being overridden here?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
##########
@@ -122,7 +123,18 @@
* instead. Any exception propagating from this method may be treated
as a fatal error for the
* JobManager, crashing the JobManager, and leading to an expensive
"master failover" procedure.
*/
- void checkpointComplete(long checkpointId);
+ @Override
+ void notifyCheckpointComplete(long checkpointId);
+
+ /**
+ * The OperatorCoordinator should not throw exception when notified an
aborted checkpoint.
+ * Instead, the implementation should just fail the task by using
+ * {@link Context#failJob(Throwable)}.
+ *
+ * @param checkpointId The ID of the checkpoint that has been aborted.
+ */
+ @Override
+ default void notifyCheckpointAborted(long checkpointId) {}
Review comment:
Same as above - not sure I understand why these methods are being
overridden here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]