akalash commented on a change in pull request #18086:
URL: https://github.com/apache/flink/pull/18086#discussion_r770516209



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
##########
@@ -73,30 +73,29 @@ public static CheckpointOptions alignedNoTimeout(
                 type, location, AlignmentType.ALIGNED, 
NO_ALIGNED_CHECKPOINT_TIME_OUT);
     }
 
-    public static CheckpointOptions 
unaligned(CheckpointStorageLocationReference location) {
+    public static CheckpointOptions unaligned(
+            CheckpointType type, CheckpointStorageLocationReference location) {
+        checkArgument(!type.isSavepoint(), "Savepoints can not be unaligned");
         return new CheckpointOptions(
-                CheckpointType.CHECKPOINT,
-                location,
-                AlignmentType.UNALIGNED,
-                NO_ALIGNED_CHECKPOINT_TIME_OUT);
+                type, location, AlignmentType.UNALIGNED, 
NO_ALIGNED_CHECKPOINT_TIME_OUT);
     }
 
     public static CheckpointOptions alignedWithTimeout(
-            CheckpointStorageLocationReference location, long 
alignedCheckpointTimeout) {
+            CheckpointType type,
+            CheckpointStorageLocationReference location,
+            long alignedCheckpointTimeout) {
+        checkArgument(!type.isSavepoint(), "Savepoints can not be unaligned");

Review comment:
       I don't think that it makes sense to change something in this PR but in 
general, I believe we need to think about reorganizing CheckpointType. for 
example:
   
   - transform CheckpointType  to interface
   - Create enum SnapshotType with CHECKPOINT and FULL_CHECKPOINT
   - Create enum SavepointType with SAVEPOINT, SAVEPOINT_SUSPEND and 
SAVEPOINT_TERMINATE
   
   after that it will be possible to keep the clean contract for 
`alignedWithTimeout`(and similar method) with receiving only SnapshotType 
without checking the argument for samepoint.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -116,6 +116,11 @@ public void init() throws Exception {
             } else {
                 return super.triggerCheckpointAsync(checkpointMetaData, 
checkpointOptions);
             }
+        } else if (checkpointOptions.getCheckpointType() == 
CheckpointType.FULL_CHECKPOINT) {
+            // see FLINK-25256
+            throw new IllegalStateException(
+                    "NO_CLAIM mode is not supported in combination with 
externally induced"
+                            + " sources. Please use either CLAIM or LEGACY 
mode.");

Review comment:
       I just notice that we implicitly imply here that FULL_CHECKPOINT and 
NO_CLAIM are the same things but that is not true since I suppose 
FULL_CHECKPOINT can be used for different purposes in the future.
   Right now, I don't have any good solution for this since I can not find a 
different good place for checking this condition but maybe you have an idea or 
we can leave it as is since this message looks helpful for user.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to