[FLINK-8977] [e2e] Allow configuring restart strategy for general purpose DataStream job
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/22e400dd Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/22e400dd Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/22e400dd Branch: refs/heads/release-1.5 Commit: 22e400dded26ea4b0cfebf2245572f3ca7b480a9 Parents: d7ec5a9 Author: Tzu-Li (Gordon) Tai <[email protected]> Authored: Mon May 14 11:56:07 2018 +0800 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Tue May 22 16:49:26 2018 +0800 ---------------------------------------------------------------------- .../tests/DataStreamAllroundTestJobFactory.java | 42 ++++++++++++++++---- 1 file changed, 35 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/22e400dd/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java ---------------------------------------------------------------------- diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java index 05bbc77..4710100 100644 --- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java +++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java @@ -66,7 +66,9 @@ import java.util.List; * <li>environment.externalize_checkpoint.cleanup (String, default - 'retain'): Configures the cleanup mode for externalized checkpoints. Can be 'retain' or 'delete'.</li> * <li>environment.parallelism (int, default - 1): parallelism to use for the job.</li> * <li>environment.max_parallelism (int, default - 128): max parallelism to use for the job</li> - * <li>environment.restart_strategy.delay (long, default - 0): delay between restart attempts, in milliseconds.</li> + * <li>environment.restart_strategy (String, default - 'fixed_delay'): The failure restart strategy to use. Can be 'fixed_delay' or 'no_restart'.</li> + * <li>environment.restart_strategy.fixed_delay.attempts (Integer, default - Integer.MAX_VALUE): The number of allowed attempts to restart the job, when using 'fixed_delay' restart.</li> + * <li>environment.restart_strategy.fixed_delay.delay (long, default - 0): delay between restart attempts, in milliseconds, when using 'fixed_delay' restart.</li> * <li>state_backend (String, default - 'file'): Supported values are 'file' for FsStateBackend and 'rocks' for RocksDBStateBackend.</li> * <li>state_backend.checkpoint_directory (String): The checkpoint directory.</li> * <li>state_backend.rocks.incremental (boolean, default - false): Activate or deactivate incremental snapshots if RocksDBStateBackend is selected.</li> @@ -124,9 +126,17 @@ class DataStreamAllroundTestJobFactory { .key("environment.max_parallelism") .defaultValue(128); - private static final ConfigOption<Integer> ENVIRONMENT_RESTART_DELAY = ConfigOptions - .key("environment.restart_strategy.delay") - .defaultValue(0); + private static final ConfigOption<String> ENVIRONMENT_RESTART_STRATEGY = ConfigOptions + .key("environment.restart_strategy") + .defaultValue("fixed_delay"); + + private static final ConfigOption<Integer> ENVIRONMENT_RESTART_STRATEGY_FIXED_ATTEMPTS = ConfigOptions + .key("environment.restart_strategy.fixed_delay.attempts") + .defaultValue(Integer.MAX_VALUE); + + private static final ConfigOption<Long> ENVIRONMENT_RESTART_STRATEGY_FIXED_DELAY = ConfigOptions + .key("environment.restart_strategy.fixed.delay") + .defaultValue(0L); private static final ConfigOption<Boolean> ENVIRONMENT_EXTERNALIZE_CHECKPOINT = ConfigOptions .key("environment.externalize_checkpoint") @@ -199,9 +209,27 @@ class DataStreamAllroundTestJobFactory { env.setMaxParallelism(pt.getInt(ENVIRONMENT_MAX_PARALLELISM.key(), ENVIRONMENT_MAX_PARALLELISM.defaultValue())); // restart strategy - env.setRestartStrategy(RestartStrategies.fixedDelayRestart( - Integer.MAX_VALUE, - pt.getInt(ENVIRONMENT_RESTART_DELAY.key(), ENVIRONMENT_RESTART_DELAY.defaultValue()))); + String restartStrategyConfig = pt.get(ENVIRONMENT_RESTART_STRATEGY.key()); + if (restartStrategyConfig != null) { + RestartStrategies.RestartStrategyConfiguration restartStrategy; + switch (restartStrategyConfig) { + case "fixed_delay": + restartStrategy = RestartStrategies.fixedDelayRestart( + pt.getInt( + ENVIRONMENT_RESTART_STRATEGY_FIXED_ATTEMPTS.key(), + ENVIRONMENT_RESTART_STRATEGY_FIXED_ATTEMPTS.defaultValue()), + pt.getLong( + ENVIRONMENT_RESTART_STRATEGY_FIXED_DELAY.key(), + ENVIRONMENT_RESTART_STRATEGY_FIXED_DELAY.defaultValue())); + break; + case "no_restart": + restartStrategy = RestartStrategies.noRestart(); + break; + default: + throw new IllegalArgumentException("Unkown restart strategy: " + restartStrategyConfig); + } + env.setRestartStrategy(restartStrategy); + } // state backend final String stateBackend = pt.get(
