akalash commented on a change in pull request #18086:
URL: https://github.com/apache/flink/pull/18086#discussion_r770516209
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
##########
@@ -73,30 +73,29 @@ public static CheckpointOptions alignedNoTimeout(
type, location, AlignmentType.ALIGNED,
NO_ALIGNED_CHECKPOINT_TIME_OUT);
}
- public static CheckpointOptions
unaligned(CheckpointStorageLocationReference location) {
+ public static CheckpointOptions unaligned(
+ CheckpointType type, CheckpointStorageLocationReference location) {
+ checkArgument(!type.isSavepoint(), "Savepoints can not be unaligned");
return new CheckpointOptions(
- CheckpointType.CHECKPOINT,
- location,
- AlignmentType.UNALIGNED,
- NO_ALIGNED_CHECKPOINT_TIME_OUT);
+ type, location, AlignmentType.UNALIGNED,
NO_ALIGNED_CHECKPOINT_TIME_OUT);
}
public static CheckpointOptions alignedWithTimeout(
- CheckpointStorageLocationReference location, long
alignedCheckpointTimeout) {
+ CheckpointType type,
+ CheckpointStorageLocationReference location,
+ long alignedCheckpointTimeout) {
+ checkArgument(!type.isSavepoint(), "Savepoints can not be unaligned");
Review comment:
I don't think that it makes sense to change something in this PR but in
general, I believe we need to think about reorganizing CheckpointType. for
example:
- transform CheckpointType to interface
- Create enum SnapshotType with CHECKPOINT and FULL_CHECKPOINT
- Create enum SavepointType with SAVEPOINT, SAVEPOINT_SUSPEND and
SAVEPOINT_TERMINATE
after that it will be possible to keep the clean contract for
`alignedWithTimeout`(and similar method) with receiving only SnapshotType
without checking the argument for samepoint.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
##########
@@ -116,6 +116,11 @@ public void init() throws Exception {
} else {
return super.triggerCheckpointAsync(checkpointMetaData,
checkpointOptions);
}
+ } else if (checkpointOptions.getCheckpointType() ==
CheckpointType.FULL_CHECKPOINT) {
+ // see FLINK-25256
+ throw new IllegalStateException(
+ "NO_CLAIM mode is not supported in combination with
externally induced"
+ + " sources. Please use either CLAIM or LEGACY
mode.");
Review comment:
I just notice that we implicitly imply here that FULL_CHECKPOINT and
NO_CLAIM are the same things but that is not true since I suppose
FULL_CHECKPOINT can be used for different purposes in the future.
Right now, I don't have any good solution for this since I can not find a
different good place for checking this condition but maybe you have an idea or
we can leave it as is since this message looks helpful for user.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]