Repository: flink Updated Branches: refs/heads/master 852c5298e -> ac3997927
[FLINK-4596] Fallback restart strategy config to let jobs choose restart configuration set at cluster level Added java doc for fallback restart strategy This closes #2592. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ac399792 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ac399792 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ac399792 Branch: refs/heads/master Commit: ac3997927e38f9da45de0b001d308430c323c842 Parents: 852c529 Author: Nagarjun <[email protected]> Authored: Wed Oct 5 00:26:05 2016 -0700 Committer: Till Rohrmann <[email protected]> Committed: Fri Nov 4 14:56:56 2016 +0100 ---------------------------------------------------------------------- docs/setup/fault_tolerance.md | 6 ++++++ .../common/restartstrategy/RestartStrategies.java | 18 ++++++++++++++++++ .../restart/RestartStrategyFactory.java | 2 ++ .../flink/runtime/jobmanager/JobManager.scala | 7 ++++--- 4 files changed, 30 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ac399792/docs/setup/fault_tolerance.md ---------------------------------------------------------------------- diff --git a/docs/setup/fault_tolerance.md b/docs/setup/fault_tolerance.md index 20f68ee..fa1c821 100644 --- a/docs/setup/fault_tolerance.md +++ b/docs/setup/fault_tolerance.md @@ -457,4 +457,10 @@ env.setRestartStrategy(RestartStrategies.noRestart()) </div> </div> +### Fallback Restart Strategy + +The cluster defined restart strategy is used. +This helpful for streaming programs which enable checkpointing. +Per default, a fixed delay restart strategy is chosen if there is no other restart strategy defined. + {% top %} http://git-wip-us.apache.org/repos/asf/flink/blob/ac399792/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java b/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java index d5db466..7073c2c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java @@ -42,6 +42,10 @@ public class RestartStrategies { return new NoRestartStrategyConfiguration(); } + public static RestartStrategyConfiguration fallBackRestart() { + return new FallbackRestartStrategyConfiguration(); + } + /** * Generates a FixedDelayRestartStrategyConfiguration. * @@ -173,4 +177,18 @@ public class RestartStrategies { + " and fixed delay " + delayBetweenAttemptsInterval.toString(); } } + + /** + * Restart strategy configuration that could be used by jobs to use cluster level restart + * strategy. Useful especially when one has a custom implementation of restart strategy set via + * flink-conf.yaml. + */ + final public static class FallbackRestartStrategyConfiguration extends RestartStrategyConfiguration{ + private static final long serialVersionUID = -4441787204284085544L; + + @Override + public String getDescription() { + return "Cluster level default restart strategy"; + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/ac399792/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java index 870bf63..27ee9b6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java @@ -66,6 +66,8 @@ public abstract class RestartStrategyFactory implements Serializable { config.getFailureInterval(), config.getDelayBetweenAttemptsInterval() ); + } else if (restartStrategyConfiguration instanceof RestartStrategies.FallbackRestartStrategyConfiguration) { + return null; } else { throw new IllegalArgumentException("Unknown restart strategy configuration " + restartStrategyConfiguration + "."); http://git-wip-us.apache.org/repos/asf/flink/blob/ac399792/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 9cc8be6..3f0689f 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -1221,7 +1221,8 @@ class JobManager( Option(jobGraph.getSerializedExecutionConfig() .deserializeValue(userCodeLoader) .getRestartStrategy()) - .map(RestartStrategyFactory.createRestartStrategy) match { + .map(RestartStrategyFactory.createRestartStrategy) + .filter(p => p != null) match { case Some(strategy) => strategy case None => restartStrategyFactory.createRestartStrategy() } @@ -1534,7 +1535,7 @@ class JobManager( case _ => unhandled(actorMsg) } } - + /** * Handle unmatched messages with an exception. */ @@ -2607,7 +2608,7 @@ object JobManager { case Some(actorName) => actorSystem.actorOf(jobManagerProps, actorName) case None => actorSystem.actorOf(jobManagerProps) } - + metricsRegistry match { case Some(registry) => registry.startQueryService(actorSystem, null)
