akalash commented on a change in pull request #18482:
URL: https://github.com/apache/flink/pull/18482#discussion_r793538049



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -498,13 +501,6 @@ public boolean isShutdown() {
             @Nullable String externalSavepointLocation,
             boolean isPeriodic) {
 
-        if (props.getCheckpointType().getPostCheckpointAction() == 
PostCheckpointAction.TERMINATE
-                && !(props.isSynchronous() && props.isSavepoint())) {
-            return FutureUtils.completedExceptionally(
-                    new IllegalArgumentException(
-                            "Only synchronous savepoints are allowed to 
advance the watermark to MAX."));
-        }

Review comment:
       I don't really get why it was removed?

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
##########
@@ -867,9 +867,9 @@ public int numKeyValueStateEntries() {
     }
 
     @Override
-    public boolean requiresLegacySynchronousTimerSnapshots(CheckpointType 
checkpointType) {
+    public boolean requiresLegacySynchronousTimerSnapshots(SnapshotType 
checkpointType) {
         return priorityQueueFactory instanceof HeapPriorityQueueSetFactory
-                && checkpointType == CheckpointType.CHECKPOINT;
+                && !checkpointType.isSavepoint();

Review comment:
       Before changes it was only `CheckpointType.CHECKPOINT`, after changes it 
is `CheckpointType.CHECKPOINT` and `CheckpointType.FULL_CHECKPOINT`. Is it ok?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointOptionsTest.java
##########
@@ -55,9 +56,17 @@ public void testSavepoint() throws Exception {
         final byte[] locationBytes = new byte[rnd.nextInt(41) + 1];
         rnd.nextBytes(locationBytes);
 
+        final SnapshotType[] snapshotTypes = {
+            CHECKPOINT,
+            FULL_CHECKPOINT,
+            SavepointType.savepoint(SavepointFormatType.CANONICAL),
+            SavepointType.suspend(SavepointFormatType.CANONICAL),
+            SavepointType.terminate(SavepointFormatType.CANONICAL)
+        };

Review comment:
       Since it is not enum anymore there is highly likely to forget about this 
place when a new snapshot type will be added. Is it possible to avoid such a 
problem?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java
##########
@@ -269,9 +273,14 @@ void snapshotState(
         }
     }
 
+    private boolean isCanonicalSavepoint(SnapshotType snapshotType) {

Review comment:
       I just share the concern that using an explicit cast to SavepointType 
here doesn't look so good. But I see that it is problem with current behavior 
in general, not with this PR.

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/cli/CancelOptions.java
##########
@@ -33,11 +37,21 @@
     /** Optional target directory for the savepoint. Overwrites cluster 
default. */
     private final String targetDirectory;
 
+    private final SavepointFormatType formatType;
+
     public CancelOptions(CommandLine line) {
         super(line);
         this.args = line.getArgs();
         this.withSavepoint = 
line.hasOption(CANCEL_WITH_SAVEPOINT_OPTION.getOpt());
         this.targetDirectory = 
line.getOptionValue(CANCEL_WITH_SAVEPOINT_OPTION.getOpt());
+        if (line.hasOption(SAVEPOINT_FORMAT_OPTION)) {
+            formatType =
+                    ConfigurationUtils.convertValue(
+                            line.getOptionValue(SAVEPOINT_FORMAT_OPTION),
+                            SavepointFormatType.class);
+        } else {
+            formatType = SavepointFormatType.DEFAULT;

Review comment:
       Maybe is it better to do these `if-else` things inside 
`SavepointFormatType` and here just call `SavepointFormatType.fromString()`?

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/cli/SavepointOptions.java
##########
@@ -37,6 +42,14 @@ public SavepointOptions(CommandLine line) {
         dispose = line.hasOption(SAVEPOINT_DISPOSE_OPTION.getOpt());
         disposeSavepointPath = 
line.getOptionValue(SAVEPOINT_DISPOSE_OPTION.getOpt());
         jarFile = line.getOptionValue(JAR_OPTION.getOpt());
+        if (line.hasOption(SAVEPOINT_FORMAT_OPTION)) {
+            formatType =
+                    ConfigurationUtils.convertValue(
+                            line.getOptionValue(SAVEPOINT_FORMAT_OPTION),
+                            SavepointFormatType.class);
+        } else {
+            formatType = SavepointFormatType.DEFAULT;

Review comment:
       The same comment about moving it inside `SavepointFormatType`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to