Repository: flink Updated Branches: refs/heads/master 4de72bbee -> 33d3b767e
[FLINK-9074] [e2e] Allow configuring externalized checkpoints for the general purpose DataStream job Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0a65f1ec Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0a65f1ec Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0a65f1ec Branch: refs/heads/master Commit: 0a65f1ec1f5893b11acd9f982265aa5ce7d5968b Parents: 4de72bb Author: Tzu-Li (Gordon) Tai <[email protected]> Authored: Wed May 9 11:40:24 2018 +0800 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Tue May 15 16:18:51 2018 +0800 ---------------------------------------------------------------------- .../tests/DataStreamAllroundTestJobFactory.java | 35 ++++++++++++++++++++ 1 file changed, 35 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0a65f1ec/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java index e7b0bdc..2577460 100644 --- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java +++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java @@ -30,6 +30,7 @@ import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; @@ -53,6 +54,8 @@ import java.util.List; * <ul> * <li>test.semantics (String, default - 'exactly-once'): This configures the semantics to test. Can be 'exactly-once' or 'at-least-once'.</li> * <li>environment.checkpoint_interval (long, default - 1000): the checkpoint interval.</li> + * <li>environment.externalize_checkpoint (boolean, default - false): whether or not checkpoints should be externalized.</li> + * <li>environment.externalize_checkpoint.cleanup (String, default - 'retain'): Configures the cleanup mode for externalized checkpoints. Can be 'retain' or 'delete'.</li> * <li>environment.parallelism (int, default - 1): parallelism to use for the job.</li> * <li>environment.max_parallelism (int, default - 128): max parallelism to use for the job</li> * <li>environment.restart_strategy.delay (long, default - 0): delay between restart attempts, in milliseconds.</li> @@ -90,6 +93,14 @@ class DataStreamAllroundTestJobFactory { .key("environment.restart_strategy.delay") .defaultValue(0); + private static final ConfigOption<Boolean> ENVIRONMENT_EXTERNALIZE_CHECKPOINT = ConfigOptions + .key("environment.externalize_checkpoint") + .defaultValue(false); + + private static final ConfigOption<String> ENVIRONMENT_EXTERNALIZE_CHECKPOINT_CLEANUP = ConfigOptions + .key("environment.externalize_checkpoint.cleanup") + .defaultValue("retain"); + private static final ConfigOption<String> STATE_BACKEND = ConfigOptions .key("state_backend") .defaultValue("file") @@ -180,6 +191,30 @@ class DataStreamAllroundTestJobFactory { throw new IllegalArgumentException("Unknown backend requested: " + stateBackend); } + boolean enableExternalizedCheckpoints = pt.getBoolean( + ENVIRONMENT_EXTERNALIZE_CHECKPOINT.key(), + ENVIRONMENT_EXTERNALIZE_CHECKPOINT.defaultValue()); + + if (enableExternalizedCheckpoints) { + String cleanupModeConfig = pt.get( + ENVIRONMENT_EXTERNALIZE_CHECKPOINT_CLEANUP.key(), + ENVIRONMENT_EXTERNALIZE_CHECKPOINT_CLEANUP.defaultValue()); + + CheckpointConfig.ExternalizedCheckpointCleanup cleanupMode; + switch (cleanupModeConfig) { + case "retain": + cleanupMode = CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION; + break; + case "delete": + cleanupMode = CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION; + break; + default: + throw new IllegalArgumentException("Unknown clean up mode for externalized checkpoints: " + cleanupModeConfig); + } + + env.getCheckpointConfig().enableExternalizedCheckpoints(cleanupMode); + } + // make parameters available in the web interface env.getConfig().setGlobalJobParameters(pt); }
