twalthr commented on code in PR #21245:
URL: https://github.com/apache/flink/pull/21245#discussion_r1015379814


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java:
##########
@@ -154,30 +143,19 @@ public class CheckpointConfig implements 
java.io.Serializable {
     public CheckpointConfig(final CheckpointConfig checkpointConfig) {
         checkNotNull(checkpointConfig);
 
-        this.checkpointInterval = checkpointConfig.checkpointInterval;
-        this.checkpointingMode = checkpointConfig.checkpointingMode;
-        this.checkpointTimeout = checkpointConfig.checkpointTimeout;
-        this.maxConcurrentCheckpoints = 
checkpointConfig.maxConcurrentCheckpoints;
-        this.minPauseBetweenCheckpoints = 
checkpointConfig.minPauseBetweenCheckpoints;
-        this.tolerableCheckpointFailureNumber = 
checkpointConfig.tolerableCheckpointFailureNumber;
-        this.unalignedCheckpointsEnabled = 
checkpointConfig.isUnalignedCheckpointsEnabled();
-        this.alignedCheckpointTimeout = 
checkpointConfig.alignedCheckpointTimeout;
-        this.approximateLocalRecovery = 
checkpointConfig.isApproximateLocalRecoveryEnabled();
-        this.externalizedCheckpointCleanup = 
checkpointConfig.externalizedCheckpointCleanup;
-        this.forceCheckpointing = checkpointConfig.forceCheckpointing;
-        this.forceUnalignedCheckpoints = 
checkpointConfig.forceUnalignedCheckpoints;
+        this.configuration = new Configuration(checkpointConfig.configuration);
         this.storage = checkpointConfig.getCheckpointStorage();
-        this.checkpointIdOfIgnoredInFlightData =
-                checkpointConfig.getCheckpointIdOfIgnoredInFlightData();
     }
 
-    public CheckpointConfig() {}
+    public CheckpointConfig() {
+        configuration = new Configuration();
+    }
 
     // ------------------------------------------------------------------------
 
     /** Disables checkpointing. */
     public void disableCheckpointing() {
-        this.checkpointInterval = -1;
+        
configuration.removeConfig(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL);

Review Comment:
   it would be better to have a set operation here, currently a disabling would 
be a "fallback to cluster config" if we merge configuration in the future.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java:
##########
@@ -66,85 +68,72 @@ public class CheckpointConfig implements 
java.io.Serializable {
     private static final Logger LOG = 
LoggerFactory.getLogger(CheckpointConfig.class);
 
     /** The default checkpoint mode: exactly once. */
-    public static final CheckpointingMode DEFAULT_MODE = 
CheckpointingMode.EXACTLY_ONCE;
+    public static final CheckpointingMode DEFAULT_MODE =
+            ExecutionCheckpointingOptions.CHECKPOINTING_MODE.defaultValue();
 
     /** The default timeout of a checkpoint attempt: 10 minutes. */
-    public static final long DEFAULT_TIMEOUT = 10 * 60 * 1000;
+    @Deprecated
+    public static final long DEFAULT_TIMEOUT =
+            
ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT.defaultValue().toMillis();
 
     /** The default minimum pause to be made between checkpoints: none. */
-    public static final long DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS = 0;
+    @Deprecated
+    public static final long DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS =
+            
ExecutionCheckpointingOptions.MIN_PAUSE_BETWEEN_CHECKPOINTS.defaultValue().toMillis();
 
     /** The default limit of concurrently happening checkpoints: one. */
-    public static final int DEFAULT_MAX_CONCURRENT_CHECKPOINTS = 1;
+    @Deprecated
+    public static final int DEFAULT_MAX_CONCURRENT_CHECKPOINTS =
+            
ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS.defaultValue();
 
-    public static final int UNDEFINED_TOLERABLE_CHECKPOINT_NUMBER = -1;
+    @Deprecated public static final int UNDEFINED_TOLERABLE_CHECKPOINT_NUMBER 
= -1;
 
     /** Default id of checkpoint for which in-flight data should be ignored on 
recovery. */
-    public static final int DEFAULT_CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA = 
-1;
-
-    // ------------------------------------------------------------------------
-
-    /** Checkpointing mode (exactly-once vs. at-least-once). */
-    private CheckpointingMode checkpointingMode = DEFAULT_MODE;
-
-    /** Periodic checkpoint triggering interval. */
-    private long checkpointInterval = -1; // disabled
-
-    /** Maximum time checkpoint may take before being discarded. */
-    private long checkpointTimeout = DEFAULT_TIMEOUT;
-
-    /** Minimal pause between checkpointing attempts. */
-    private long minPauseBetweenCheckpoints = 
DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS;
-
-    /** Maximum number of checkpoint attempts in progress at the same time. */
-    private int maxConcurrentCheckpoints = DEFAULT_MAX_CONCURRENT_CHECKPOINTS;
-
-    /** Flag to force checkpointing in iterative jobs. */
-    private boolean forceCheckpointing;
-
-    /** Flag to force checkpointing in iterative jobs. */
-    private boolean forceUnalignedCheckpoints;
-
-    /** Flag to enable unaligned checkpoints. */
-    private boolean unalignedCheckpointsEnabled;
-
-    /** Id of checkpoint for which in-flight data should be ignored on 
recovery. */
-    private long checkpointIdOfIgnoredInFlightData =
-            DEFAULT_CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA;
-
-    /** The delay from the start of checkpoint after which AC switches to UC. 
*/
-    private Duration alignedCheckpointTimeout =
-            
ExecutionCheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT.defaultValue();
-
-    /** Flag to enable approximate local recovery. */
-    private boolean approximateLocalRecovery;
-
-    /** Cleanup behaviour for persistent checkpoints. */
-    private ExternalizedCheckpointCleanup externalizedCheckpointCleanup =
-            
ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT.defaultValue();
+    @Deprecated
+    public static final int DEFAULT_CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA =
+            
ExecutionCheckpointingOptions.CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA
+                    .defaultValue()
+                    .intValue();
 
     /**
-     * Task would not fail if there is an error in their checkpointing.
-     *
-     * <p>{@link #tolerableCheckpointFailureNumber} would always overrule this 
deprecated field if
-     * they have conflicts.
+     * Internal {@link ConfigOption}s, that are not exposed and it's not 
possible to configure them
+     * via config files. We are defining them here, so that we can store them 
in the {@link
+     * #configuration}.
      *
-     * @deprecated Use {@link #tolerableCheckpointFailureNumber}.
+     * <p>If you decide to expose any of those {@link ConfigOption}s, please 
double-check if the
+     * key, type and descriptions are sensible, as the initial values are 
arbitrary.
      */
-    @Deprecated private boolean failOnCheckpointingErrors = true;
+    // 
--------------------------------------------------------------------------------------------
+
+    /** @deprecated This will be removed once iterations properly participate 
in checkpointing. */
+    @Deprecated
+    private static final ConfigOption<Boolean> FORCE_CHECKPOINTING =
+            key("hidden.force.checkpointing")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Flag to force checkpointing in iterative 
jobs");
+
+    private static final ConfigOption<Boolean> APPROXIMATE_LOCAL_RECOVERY =

Review Comment:
   We should avoid hidden config options. Give them a proper key, move them 
into the `...Options` class they belong and use annotations to hide them from 
the docs generation. Otherwise it is very difficult to find them across the 
code base, we recently did in the same in the planner that also had internal 
options.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java:
##########
@@ -186,7 +164,7 @@ public void disableCheckpointing() {
      * @return True if checkpointing is enables, false otherwise.
      */
     public boolean isCheckpointingEnabled() {
-        return checkpointInterval > 0;
+        return 
configuration.contains(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL);

Review Comment:
   How about we introduce a new config option `checkpoint.enabled`? This would 
fix this issue properly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to