twalthr commented on code in PR #21245:
URL: https://github.com/apache/flink/pull/21245#discussion_r1015379814
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java:
##########
@@ -154,30 +143,19 @@ public class CheckpointConfig implements
java.io.Serializable {
public CheckpointConfig(final CheckpointConfig checkpointConfig) {
checkNotNull(checkpointConfig);
- this.checkpointInterval = checkpointConfig.checkpointInterval;
- this.checkpointingMode = checkpointConfig.checkpointingMode;
- this.checkpointTimeout = checkpointConfig.checkpointTimeout;
- this.maxConcurrentCheckpoints =
checkpointConfig.maxConcurrentCheckpoints;
- this.minPauseBetweenCheckpoints =
checkpointConfig.minPauseBetweenCheckpoints;
- this.tolerableCheckpointFailureNumber =
checkpointConfig.tolerableCheckpointFailureNumber;
- this.unalignedCheckpointsEnabled =
checkpointConfig.isUnalignedCheckpointsEnabled();
- this.alignedCheckpointTimeout =
checkpointConfig.alignedCheckpointTimeout;
- this.approximateLocalRecovery =
checkpointConfig.isApproximateLocalRecoveryEnabled();
- this.externalizedCheckpointCleanup =
checkpointConfig.externalizedCheckpointCleanup;
- this.forceCheckpointing = checkpointConfig.forceCheckpointing;
- this.forceUnalignedCheckpoints =
checkpointConfig.forceUnalignedCheckpoints;
+ this.configuration = new Configuration(checkpointConfig.configuration);
this.storage = checkpointConfig.getCheckpointStorage();
- this.checkpointIdOfIgnoredInFlightData =
- checkpointConfig.getCheckpointIdOfIgnoredInFlightData();
}
- public CheckpointConfig() {}
+ public CheckpointConfig() {
+ configuration = new Configuration();
+ }
// ------------------------------------------------------------------------
/** Disables checkpointing. */
public void disableCheckpointing() {
- this.checkpointInterval = -1;
+
configuration.removeConfig(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL);
Review Comment:
it would be better to have a set operation here, currently a disabling would
be a "fallback to cluster config" if we merge configuration in the future.
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java:
##########
@@ -66,85 +68,72 @@ public class CheckpointConfig implements
java.io.Serializable {
private static final Logger LOG =
LoggerFactory.getLogger(CheckpointConfig.class);
/** The default checkpoint mode: exactly once. */
- public static final CheckpointingMode DEFAULT_MODE =
CheckpointingMode.EXACTLY_ONCE;
+ public static final CheckpointingMode DEFAULT_MODE =
+ ExecutionCheckpointingOptions.CHECKPOINTING_MODE.defaultValue();
/** The default timeout of a checkpoint attempt: 10 minutes. */
- public static final long DEFAULT_TIMEOUT = 10 * 60 * 1000;
+ @Deprecated
+ public static final long DEFAULT_TIMEOUT =
+
ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT.defaultValue().toMillis();
/** The default minimum pause to be made between checkpoints: none. */
- public static final long DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS = 0;
+ @Deprecated
+ public static final long DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS =
+
ExecutionCheckpointingOptions.MIN_PAUSE_BETWEEN_CHECKPOINTS.defaultValue().toMillis();
/** The default limit of concurrently happening checkpoints: one. */
- public static final int DEFAULT_MAX_CONCURRENT_CHECKPOINTS = 1;
+ @Deprecated
+ public static final int DEFAULT_MAX_CONCURRENT_CHECKPOINTS =
+
ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS.defaultValue();
- public static final int UNDEFINED_TOLERABLE_CHECKPOINT_NUMBER = -1;
+ @Deprecated public static final int UNDEFINED_TOLERABLE_CHECKPOINT_NUMBER
= -1;
/** Default id of checkpoint for which in-flight data should be ignored on
recovery. */
- public static final int DEFAULT_CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA =
-1;
-
- // ------------------------------------------------------------------------
-
- /** Checkpointing mode (exactly-once vs. at-least-once). */
- private CheckpointingMode checkpointingMode = DEFAULT_MODE;
-
- /** Periodic checkpoint triggering interval. */
- private long checkpointInterval = -1; // disabled
-
- /** Maximum time checkpoint may take before being discarded. */
- private long checkpointTimeout = DEFAULT_TIMEOUT;
-
- /** Minimal pause between checkpointing attempts. */
- private long minPauseBetweenCheckpoints =
DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS;
-
- /** Maximum number of checkpoint attempts in progress at the same time. */
- private int maxConcurrentCheckpoints = DEFAULT_MAX_CONCURRENT_CHECKPOINTS;
-
- /** Flag to force checkpointing in iterative jobs. */
- private boolean forceCheckpointing;
-
- /** Flag to force checkpointing in iterative jobs. */
- private boolean forceUnalignedCheckpoints;
-
- /** Flag to enable unaligned checkpoints. */
- private boolean unalignedCheckpointsEnabled;
-
- /** Id of checkpoint for which in-flight data should be ignored on
recovery. */
- private long checkpointIdOfIgnoredInFlightData =
- DEFAULT_CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA;
-
- /** The delay from the start of checkpoint after which AC switches to UC.
*/
- private Duration alignedCheckpointTimeout =
-
ExecutionCheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT.defaultValue();
-
- /** Flag to enable approximate local recovery. */
- private boolean approximateLocalRecovery;
-
- /** Cleanup behaviour for persistent checkpoints. */
- private ExternalizedCheckpointCleanup externalizedCheckpointCleanup =
-
ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT.defaultValue();
+ @Deprecated
+ public static final int DEFAULT_CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA =
+
ExecutionCheckpointingOptions.CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA
+ .defaultValue()
+ .intValue();
/**
- * Task would not fail if there is an error in their checkpointing.
- *
- * <p>{@link #tolerableCheckpointFailureNumber} would always overrule this
deprecated field if
- * they have conflicts.
+ * Internal {@link ConfigOption}s, that are not exposed and it's not
possible to configure them
+ * via config files. We are defining them here, so that we can store them
in the {@link
+ * #configuration}.
*
- * @deprecated Use {@link #tolerableCheckpointFailureNumber}.
+ * <p>If you decide to expose any of those {@link ConfigOption}s, please
double-check if the
+ * key, type and descriptions are sensible, as the initial values are
arbitrary.
*/
- @Deprecated private boolean failOnCheckpointingErrors = true;
+ //
--------------------------------------------------------------------------------------------
+
+ /** @deprecated This will be removed once iterations properly participate
in checkpointing. */
+ @Deprecated
+ private static final ConfigOption<Boolean> FORCE_CHECKPOINTING =
+ key("hidden.force.checkpointing")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Flag to force checkpointing in iterative
jobs");
+
+ private static final ConfigOption<Boolean> APPROXIMATE_LOCAL_RECOVERY =
Review Comment:
We should avoid hidden config options. Give them a proper key, move them
into the `...Options` class they belong and use annotations to hide them from
the docs generation. Otherwise it is very difficult to find them across the
code base, we recently did in the same in the planner that also had internal
options.
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java:
##########
@@ -186,7 +164,7 @@ public void disableCheckpointing() {
* @return True if checkpointing is enables, false otherwise.
*/
public boolean isCheckpointingEnabled() {
- return checkpointInterval > 0;
+ return
configuration.contains(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL);
Review Comment:
How about we introduce a new config option `checkpoint.enabled`? This would
fix this issue properly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]