tillrohrmann commented on a change in pull request #15484:
URL: https://github.com/apache/flink/pull/15484#discussion_r607683926
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
##########
@@ -329,7 +329,7 @@ public static DefaultExecutionGraph buildGraph(
}
public static boolean isCheckpointingEnabled(JobGraph jobGraph) {
- return jobGraph.getCheckpointingSettings() != null;
+ return jobGraph.isCheckpointingEnabled();
Review comment:
Let's revert this change and rename the method to
`shouldInstantiateCheckpointCoordinator`.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
##########
@@ -161,10 +161,10 @@ public void disableCheckpointing() {
/**
* Checks whether checkpointing is enabled.
*
- * @return True if checkpointing is enables, false otherwise.
+ * @return True if checkpointing is enabled, false otherwise.
*/
public boolean isCheckpointingEnabled() {
- return checkpointInterval > 0;
+ return checkpointInterval >= MINIMAL_CHECKPOINT_TIME &&
checkpointTimeout < Long.MAX_VALUE;
Review comment:
Factor this out into a method which can also be used by the `JobGraph`
to keep the test criterion consistent.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
##########
@@ -207,11 +207,11 @@ public long getCheckpointInterval() {
* @param checkpointInterval The checkpoint interval, in milliseconds.
*/
public void setCheckpointInterval(long checkpointInterval) {
- if (checkpointInterval < MINIMAL_CHECKPOINT_TIME) {
+ if (checkpointInterval < MINIMAL_CHECKPOINT_TIME || checkpointInterval
== Long.MAX_VALUE) {
Review comment:
With this change the user can no longer explicitly deactivate the
checkpointing by calling `setCheckpointInterval(Long.MAX_VALUE)`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
##########
@@ -356,14 +357,13 @@ public JobCheckpointingSettings
getCheckpointingSettings() {
* @return true if checkpointing enabled
*/
public boolean isCheckpointingEnabled() {
-
if (snapshotSettings == null) {
return false;
}
-
long checkpointInterval =
snapshotSettings.getCheckpointCoordinatorConfiguration().getCheckpointInterval();
- return checkpointInterval > 0 && checkpointInterval < Long.MAX_VALUE;
+
+ return checkpointInterval >= MINIMAL_CHECKPOINT_TIME &&
checkpointInterval < Long.MAX_VALUE;
Review comment:
Use the factored out method which is also used by `CheckpointConfig`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]