[FLINK-8977] [e2e] Allow configuring restart strategy for general purpose 
DataStream job


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c8187e87
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c8187e87
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c8187e87

Branch: refs/heads/master
Commit: c8187e8752bd9b17ad76f3076087c4e562a9c5e4
Parents: 5ee9100
Author: Tzu-Li (Gordon) Tai <[email protected]>
Authored: Mon May 14 11:56:07 2018 +0800
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Tue May 22 15:50:54 2018 +0800

----------------------------------------------------------------------
 .../tests/DataStreamAllroundTestJobFactory.java | 42 ++++++++++++++++----
 1 file changed, 35 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c8187e87/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
index 05bbc77..4710100 100644
--- 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
+++ 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
@@ -66,7 +66,9 @@ import java.util.List;
  *     <li>environment.externalize_checkpoint.cleanup (String, default - 
'retain'): Configures the cleanup mode for externalized checkpoints. Can be 
'retain' or 'delete'.</li>
  *     <li>environment.parallelism (int, default - 1): parallelism to use for 
the job.</li>
  *     <li>environment.max_parallelism (int, default - 128): max parallelism 
to use for the job</li>
- *     <li>environment.restart_strategy.delay (long, default - 0): delay 
between restart attempts, in milliseconds.</li>
+ *     <li>environment.restart_strategy (String, default - 'fixed_delay'): The 
failure restart strategy to use. Can be 'fixed_delay' or 'no_restart'.</li>
+ *     <li>environment.restart_strategy.fixed_delay.attempts (Integer, default 
- Integer.MAX_VALUE): The number of allowed attempts to restart the job, when 
using 'fixed_delay' restart.</li>
+ *     <li>environment.restart_strategy.fixed_delay.delay (long, default - 0): 
delay between restart attempts, in milliseconds, when using 'fixed_delay' 
restart.</li>
  *     <li>state_backend (String, default - 'file'): Supported values are 
'file' for FsStateBackend and 'rocks' for RocksDBStateBackend.</li>
  *     <li>state_backend.checkpoint_directory (String): The checkpoint 
directory.</li>
  *     <li>state_backend.rocks.incremental (boolean, default - false): 
Activate or deactivate incremental snapshots if RocksDBStateBackend is 
selected.</li>
@@ -124,9 +126,17 @@ class DataStreamAllroundTestJobFactory {
                .key("environment.max_parallelism")
                .defaultValue(128);
 
-       private static final ConfigOption<Integer> ENVIRONMENT_RESTART_DELAY = 
ConfigOptions
-               .key("environment.restart_strategy.delay")
-               .defaultValue(0);
+       private static final ConfigOption<String> ENVIRONMENT_RESTART_STRATEGY 
= ConfigOptions
+               .key("environment.restart_strategy")
+               .defaultValue("fixed_delay");
+
+       private static final ConfigOption<Integer> 
ENVIRONMENT_RESTART_STRATEGY_FIXED_ATTEMPTS = ConfigOptions
+               .key("environment.restart_strategy.fixed_delay.attempts")
+               .defaultValue(Integer.MAX_VALUE);
+
+       private static final ConfigOption<Long> 
ENVIRONMENT_RESTART_STRATEGY_FIXED_DELAY = ConfigOptions
+               .key("environment.restart_strategy.fixed.delay")
+               .defaultValue(0L);
 
        private static final ConfigOption<Boolean> 
ENVIRONMENT_EXTERNALIZE_CHECKPOINT = ConfigOptions
                .key("environment.externalize_checkpoint")
@@ -199,9 +209,27 @@ class DataStreamAllroundTestJobFactory {
                
env.setMaxParallelism(pt.getInt(ENVIRONMENT_MAX_PARALLELISM.key(), 
ENVIRONMENT_MAX_PARALLELISM.defaultValue()));
 
                // restart strategy
-               env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
-                       Integer.MAX_VALUE,
-                       pt.getInt(ENVIRONMENT_RESTART_DELAY.key(), 
ENVIRONMENT_RESTART_DELAY.defaultValue())));
+               String restartStrategyConfig = 
pt.get(ENVIRONMENT_RESTART_STRATEGY.key());
+               if (restartStrategyConfig != null) {
+                       RestartStrategies.RestartStrategyConfiguration 
restartStrategy;
+                       switch (restartStrategyConfig) {
+                               case "fixed_delay":
+                                       restartStrategy = 
RestartStrategies.fixedDelayRestart(
+                                               pt.getInt(
+                                                       
ENVIRONMENT_RESTART_STRATEGY_FIXED_ATTEMPTS.key(),
+                                                       
ENVIRONMENT_RESTART_STRATEGY_FIXED_ATTEMPTS.defaultValue()),
+                                               pt.getLong(
+                                                       
ENVIRONMENT_RESTART_STRATEGY_FIXED_DELAY.key(),
+                                                       
ENVIRONMENT_RESTART_STRATEGY_FIXED_DELAY.defaultValue()));
+                                       break;
+                               case "no_restart":
+                                       restartStrategy = 
RestartStrategies.noRestart();
+                                       break;
+                               default:
+                                       throw new 
IllegalArgumentException("Unkown restart strategy: " + restartStrategyConfig);
+                       }
+                       env.setRestartStrategy(restartStrategy);
+               }
 
                // state backend
                final String stateBackend = pt.get(

Reply via email to