Repository: flink
Updated Branches:
  refs/heads/master 852c5298e -> ac3997927


[FLINK-4596] Fallback restart strategy config to let jobs choose restart 
configuration set at cluster level

Added java doc for fallback restart strategy

This closes #2592.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ac399792
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ac399792
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ac399792

Branch: refs/heads/master
Commit: ac3997927e38f9da45de0b001d308430c323c842
Parents: 852c529
Author: Nagarjun <[email protected]>
Authored: Wed Oct 5 00:26:05 2016 -0700
Committer: Till Rohrmann <[email protected]>
Committed: Fri Nov 4 14:56:56 2016 +0100

----------------------------------------------------------------------
 docs/setup/fault_tolerance.md                     |  6 ++++++
 .../common/restartstrategy/RestartStrategies.java | 18 ++++++++++++++++++
 .../restart/RestartStrategyFactory.java           |  2 ++
 .../flink/runtime/jobmanager/JobManager.scala     |  7 ++++---
 4 files changed, 30 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ac399792/docs/setup/fault_tolerance.md
----------------------------------------------------------------------
diff --git a/docs/setup/fault_tolerance.md b/docs/setup/fault_tolerance.md
index 20f68ee..fa1c821 100644
--- a/docs/setup/fault_tolerance.md
+++ b/docs/setup/fault_tolerance.md
@@ -457,4 +457,10 @@ env.setRestartStrategy(RestartStrategies.noRestart())
 </div>
 </div>
 
+### Fallback Restart Strategy
+
+The cluster defined restart strategy is used. 
+This helpful for streaming programs which enable checkpointing.
+Per default, a fixed delay restart strategy is chosen if there is no other 
restart strategy defined.
+
 {% top %}

http://git-wip-us.apache.org/repos/asf/flink/blob/ac399792/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
index d5db466..7073c2c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
@@ -42,6 +42,10 @@ public class RestartStrategies {
                return new NoRestartStrategyConfiguration();
        }
 
+       public static RestartStrategyConfiguration fallBackRestart() {
+               return new FallbackRestartStrategyConfiguration();
+       }
+
        /**
         * Generates a FixedDelayRestartStrategyConfiguration.
         *
@@ -173,4 +177,18 @@ public class RestartStrategies {
                                        + " and fixed delay " + 
delayBetweenAttemptsInterval.toString();
                }
        }
+
+       /**
+        * Restart strategy configuration that could be used by jobs to use 
cluster level restart
+        * strategy. Useful especially when one has a custom implementation of 
restart strategy set via
+        * flink-conf.yaml.
+        */
+       final public static class FallbackRestartStrategyConfiguration extends 
RestartStrategyConfiguration{
+               private static final long serialVersionUID = 
-4441787204284085544L;
+
+               @Override
+               public String getDescription() {
+                       return "Cluster level default restart strategy";
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ac399792/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
index 870bf63..27ee9b6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
@@ -66,6 +66,8 @@ public abstract class RestartStrategyFactory implements 
Serializable {
                                        config.getFailureInterval(),
                                        config.getDelayBetweenAttemptsInterval()
                        );
+               } else if (restartStrategyConfiguration instanceof 
RestartStrategies.FallbackRestartStrategyConfiguration) {
+                       return null;
                } else {
                        throw new IllegalArgumentException("Unknown restart 
strategy configuration " +
                                restartStrategyConfiguration + ".");

http://git-wip-us.apache.org/repos/asf/flink/blob/ac399792/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 9cc8be6..3f0689f 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1221,7 +1221,8 @@ class JobManager(
           Option(jobGraph.getSerializedExecutionConfig()
             .deserializeValue(userCodeLoader)
             .getRestartStrategy())
-            .map(RestartStrategyFactory.createRestartStrategy) match {
+            .map(RestartStrategyFactory.createRestartStrategy)
+            .filter(p => p != null) match {
             case Some(strategy) => strategy
             case None => restartStrategyFactory.createRestartStrategy()
           }
@@ -1534,7 +1535,7 @@ class JobManager(
       case _ => unhandled(actorMsg)
     }
   }
-  
+
   /**
    * Handle unmatched messages with an exception.
    */
@@ -2607,7 +2608,7 @@ object JobManager {
       case Some(actorName) => actorSystem.actorOf(jobManagerProps, actorName)
       case None => actorSystem.actorOf(jobManagerProps)
     }
-    
+
     metricsRegistry match {
       case Some(registry) =>
         registry.startQueryService(actorSystem, null)

Reply via email to