[FLINK-9143] Use cluster strategy if none was set on client side

Added NoOrFixedIfCheckpointingEnabledRestartStrategy

This closes #6283.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/57872d53
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/57872d53
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/57872d53

Branch: refs/heads/master
Commit: 57872d53c4584faace6dc8e4038ad1f2d068a453
Parents: c9ad0a0
Author: Dawid Wysakowicz <dwysakow...@apache.org>
Authored: Thu Jul 5 13:48:23 2018 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Fri Jul 13 18:32:57 2018 +0200

----------------------------------------------------------------------
 .../flink/api/common/ExecutionConfig.java       | 10 +--
 .../restartstrategy/RestartStrategies.java      | 48 ++++++++++++-
 .../runtime/webmonitor/WebFrontendITCase.java   |  2 +-
 ...ckpointingEnabledRestartStrategyFactory.java | 42 ++++++++++++
 .../restart/NoRestartStrategy.java              |  4 +-
 .../restart/RestartStrategyFactory.java         | 63 ++++++++---------
 .../restart/RestartStrategyResolving.java       | 66 ++++++++++++++++++
 .../apache/flink/runtime/jobgraph/JobGraph.java | 20 +++++-
 .../flink/runtime/jobmaster/JobMaster.java      | 16 +++--
 .../flink/runtime/jobmanager/JobManager.scala   | 20 +++---
 .../checkpoint/CoordinatorShutdownTest.java     | 19 ++++--
 .../restart/RestartStrategyResolvingTest.java   | 71 ++++++++++++++++++++
 .../flink/runtime/jobmaster/JobMasterTest.java  | 41 +++++++++++
 .../TestingJobManagerSharedServicesBuilder.java |  4 +-
 .../api/graph/StreamingJobGraphGenerator.java   | 15 -----
 .../streaming/api/RestartStrategyTest.java      | 10 ++-
 16 files changed, 367 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/57872d53/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java 
b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 7a0a574..59fa803 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -23,6 +23,7 @@ import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.util.Preconditions;
 
 import com.esotericsoftware.kryo.Serializer;
 
@@ -138,7 +139,8 @@ public class ExecutionConfig implements Serializable, 
Archiveable<ArchivedExecut
        @Deprecated
        private long executionRetryDelay = DEFAULT_RESTART_DELAY;
 
-       private RestartStrategies.RestartStrategyConfiguration 
restartStrategyConfiguration;
+       private RestartStrategies.RestartStrategyConfiguration 
restartStrategyConfiguration =
+               new RestartStrategies.FallbackRestartStrategyConfiguration();
        
        private long taskCancellationIntervalMillis = -1;
 
@@ -390,7 +392,7 @@ public class ExecutionConfig implements Serializable, 
Archiveable<ArchivedExecut
         */
        @PublicEvolving
        public void 
setRestartStrategy(RestartStrategies.RestartStrategyConfiguration 
restartStrategyConfiguration) {
-               this.restartStrategyConfiguration = 
restartStrategyConfiguration;
+               this.restartStrategyConfiguration = 
Preconditions.checkNotNull(restartStrategyConfiguration);
        }
 
        /**
@@ -401,14 +403,14 @@ public class ExecutionConfig implements Serializable, 
Archiveable<ArchivedExecut
        @PublicEvolving
        @SuppressWarnings("deprecation")
        public RestartStrategies.RestartStrategyConfiguration 
getRestartStrategy() {
-               if (restartStrategyConfiguration == null) {
+               if (restartStrategyConfiguration instanceof 
RestartStrategies.FallbackRestartStrategyConfiguration) {
                        // support the old API calls by creating a restart 
strategy from them
                        if (getNumberOfExecutionRetries() > 0 && 
getExecutionRetryDelay() >= 0) {
                                return 
RestartStrategies.fixedDelayRestart(getNumberOfExecutionRetries(), 
getExecutionRetryDelay());
                        } else if (getNumberOfExecutionRetries() == 0) {
                                return RestartStrategies.noRestart();
                        } else {
-                               return null;
+                               return restartStrategyConfiguration;
                        }
                } else {
                        return restartStrategyConfiguration;

http://git-wip-us.apache.org/repos/asf/flink/blob/57872d53/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
index f3eb3a5..4f67290 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.time.Time;
 
 import java.io.Serializable;
+import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -106,6 +107,19 @@ public class RestartStrategies {
                public String getDescription() {
                        return "Restart deactivated.";
                }
+
+               @Override
+               public boolean equals(Object o) {
+                       if (this == o) {
+                               return true;
+                       }
+                       return o instanceof NoRestartStrategyConfiguration;
+               }
+
+               @Override
+               public int hashCode() {
+                       return Objects.hash();
+               }
        }
 
        /**
@@ -188,6 +202,25 @@ public class RestartStrategies {
                        return "Failure rate restart with maximum of " + 
maxFailureRate + " failures within interval " + failureInterval.toString()
                                        + " and fixed delay " + 
delayBetweenAttemptsInterval.toString();
                }
+
+               @Override
+               public boolean equals(Object o) {
+                       if (this == o) {
+                               return true;
+                       }
+                       if (o == null || getClass() != o.getClass()) {
+                               return false;
+                       }
+                       FailureRateRestartStrategyConfiguration that = 
(FailureRateRestartStrategyConfiguration) o;
+                       return maxFailureRate == that.maxFailureRate &&
+                               Objects.equals(failureInterval, 
that.failureInterval) &&
+                               Objects.equals(delayBetweenAttemptsInterval, 
that.delayBetweenAttemptsInterval);
+               }
+
+               @Override
+               public int hashCode() {
+                       return Objects.hash(maxFailureRate, failureInterval, 
delayBetweenAttemptsInterval);
+               }
        }
 
        /**
@@ -195,12 +228,25 @@ public class RestartStrategies {
         * strategy. Useful especially when one has a custom implementation of 
restart strategy set via
         * flink-conf.yaml.
         */
-       public static final class FallbackRestartStrategyConfiguration extends 
RestartStrategyConfiguration{
+       public static final class FallbackRestartStrategyConfiguration extends 
RestartStrategyConfiguration {
                private static final long serialVersionUID = 
-4441787204284085544L;
 
                @Override
                public String getDescription() {
                        return "Cluster level default restart strategy";
                }
+
+               @Override
+               public boolean equals(Object o) {
+                       if (this == o) {
+                               return true;
+                       }
+                       return o instanceof 
FallbackRestartStrategyConfiguration;
+               }
+
+               @Override
+               public int hashCode() {
+                       return Objects.hash();
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/57872d53/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
index fb8258a..b90277f 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
@@ -313,7 +313,7 @@ public class WebFrontendITCase extends TestLogger {
                        assertEquals(HttpResponseStatus.OK, 
response.getStatus());
                        assertEquals("application/json; charset=UTF-8", 
response.getType());
                        assertEquals("{\"jid\":\"" + jid + 
"\",\"name\":\"Stoppable streaming test job\"," +
-                               
"\"execution-config\":{\"execution-mode\":\"PIPELINED\",\"restart-strategy\":\"default\","
 +
+                               
"\"execution-config\":{\"execution-mode\":\"PIPELINED\",\"restart-strategy\":\"Cluster
 level default restart strategy\"," +
                                
"\"job-parallelism\":-1,\"object-reuse-mode\":false,\"user-config\":{}}}", 
response.getContent());
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/57872d53/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoOrFixedIfCheckpointingEnabledRestartStrategyFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoOrFixedIfCheckpointingEnabledRestartStrategyFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoOrFixedIfCheckpointingEnabledRestartStrategyFactory.java
new file mode 100644
index 0000000..7b5c1a7
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoOrFixedIfCheckpointingEnabledRestartStrategyFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.restart;
+
+/**
+ * Default restart strategy that resolves either to {@link NoRestartStrategy} 
or {@link FixedDelayRestartStrategy}
+ * depending if checkpointing was enabled.
+ */
+public class NoOrFixedIfCheckpointingEnabledRestartStrategyFactory extends 
RestartStrategyFactory {
+       private static final long DEFAULT_RESTART_DELAY = 0;
+
+       private static final long serialVersionUID = -1809462525812787862L;
+
+       @Override
+       public RestartStrategy createRestartStrategy() {
+               return createRestartStrategy(false);
+       }
+
+       RestartStrategy createRestartStrategy(boolean isCheckpointingEnabled) {
+               if (isCheckpointingEnabled) {
+                       return new FixedDelayRestartStrategy(Integer.MAX_VALUE, 
DEFAULT_RESTART_DELAY);
+               } else {
+                       return new NoRestartStrategy();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/57872d53/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
index 5502d2d..b639614 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
@@ -38,10 +38,10 @@ public class NoRestartStrategy implements RestartStrategy {
        }
 
        /**
-        * Creates a NoRestartStrategy instance.
+        * Creates a NoRestartStrategyFactory instance.
         *
         * @param configuration Configuration object which is ignored
-        * @return NoRestartStrategy instance
+        * @return NoRestartStrategyFactory instance
         */
        public static NoRestartStrategyFactory createFactory(Configuration 
configuration) {
                return new NoRestartStrategyFactory();

http://git-wip-us.apache.org/repos/asf/flink/blob/57872d53/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
index 717e1d2..f15ee0b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
@@ -82,38 +82,41 @@ public abstract class RestartStrategyFactory implements 
Serializable {
         * @throws Exception which indicates that the RestartStrategy could not 
be instantiated.
         */
        public static RestartStrategyFactory 
createRestartStrategyFactory(Configuration configuration) throws Exception {
-               String restartStrategyName = 
configuration.getString(ConfigConstants.RESTART_STRATEGY, "none");
+               String restartStrategyName = 
configuration.getString(ConfigConstants.RESTART_STRATEGY, null);
+
+               if (restartStrategyName == null) {
+                       // support deprecated ConfigConstants values
+                       final int numberExecutionRetries = 
configuration.getInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS,
+                               ConfigConstants.DEFAULT_EXECUTION_RETRIES);
+                       String pauseString = 
configuration.getString(AkkaOptions.WATCH_HEARTBEAT_PAUSE);
+                       String delayString = 
configuration.getString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY,
+                               pauseString);
+
+                       long delay;
+
+                       try {
+                               delay = Duration.apply(delayString).toMillis();
+                       } catch (NumberFormatException nfe) {
+                               if (delayString.equals(pauseString)) {
+                                       throw new Exception("Invalid config 
value for " +
+                                               
AkkaOptions.WATCH_HEARTBEAT_PAUSE.key() + ": " + pauseString +
+                                               ". Value must be a valid 
duration (such as '10 s' or '1 min')");
+                               } else {
+                                       throw new Exception("Invalid config 
value for " +
+                                               
ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY + ": " + delayString +
+                                               ". Value must be a valid 
duration (such as '100 milli' or '10 s')");
+                               }
+                       }
+
+                       if (numberExecutionRetries > 0 && delay >= 0) {
+                               return new 
FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(numberExecutionRetries,
 delay);
+                       } else {
+                               return new 
NoOrFixedIfCheckpointingEnabledRestartStrategyFactory();
+                       }
+               }
 
                switch (restartStrategyName.toLowerCase()) {
                        case "none":
-                               // support deprecated ConfigConstants values
-                               final int numberExecutionRetries = 
configuration.getInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS,
-                                       
ConfigConstants.DEFAULT_EXECUTION_RETRIES);
-                               String pauseString = 
configuration.getString(AkkaOptions.WATCH_HEARTBEAT_PAUSE);
-                               String delayString = 
configuration.getString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY,
-                                       pauseString);
-
-                               long delay;
-
-                               try {
-                                       delay = 
Duration.apply(delayString).toMillis();
-                               } catch (NumberFormatException nfe) {
-                                       if (delayString.equals(pauseString)) {
-                                               throw new Exception("Invalid 
config value for " +
-                                                       
AkkaOptions.WATCH_HEARTBEAT_PAUSE.key() + ": " + pauseString +
-                                                       ". Value must be a 
valid duration (such as '10 s' or '1 min')");
-                                       } else {
-                                               throw new Exception("Invalid 
config value for " +
-                                                       
ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY + ": " + delayString +
-                                                       ". Value must be a 
valid duration (such as '100 milli' or '10 s')");
-                                       }
-                               }
-
-                               if (numberExecutionRetries > 0 && delay >= 0) {
-                                       return new 
FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(numberExecutionRetries,
 delay);
-                               } else {
-                                       return 
NoRestartStrategy.createFactory(configuration);
-                               }
                        case "off":
                        case "disable":
                                return 
NoRestartStrategy.createFactory(configuration);
@@ -149,7 +152,7 @@ public abstract class RestartStrategyFactory implements 
Serializable {
                                }
 
                                // fallback in case of an error
-                               return 
NoRestartStrategy.createFactory(configuration);
+                               return new 
NoOrFixedIfCheckpointingEnabledRestartStrategyFactory();
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/57872d53/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolving.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolving.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolving.java
new file mode 100644
index 0000000..ad7aa93
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolving.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.restart;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+
+/**
+ * Utility method for resolving {@link RestartStrategy}.
+ */
+public final class RestartStrategyResolving {
+
+       /**
+        * Resolves which {@link RestartStrategy} to use. It should be used 
only on the server side.
+        * The resolving strategy is as follows:
+        * <ol>
+        * <li>Strategy set within job graph.</li>
+        * <li>Strategy set flink-conf.yaml on the server set, unless is set to 
{@link NoRestartStrategy} and checkpointing
+        * is enabled.</li>
+        * <li>If no strategy was set on client and server side and 
checkpointing was enabled then
+        * {@link FixedDelayRestartStrategy} is used</li>
+        * </ol>
+        *
+        * @param clientConfiguration restart configuration given within the 
job graph
+        * @param serverStrategyFactory default server side strategy factory
+        * @param isCheckpointingEnabled if checkpointing was enabled for the 
job
+        * @return resolved strategy
+        */
+       public static RestartStrategy resolve(
+                       RestartStrategies.RestartStrategyConfiguration 
clientConfiguration,
+                       RestartStrategyFactory serverStrategyFactory,
+                       boolean isCheckpointingEnabled) {
+
+               final RestartStrategy clientSideRestartStrategy =
+                       
RestartStrategyFactory.createRestartStrategy(clientConfiguration);
+
+               if (clientSideRestartStrategy != null) {
+                       return clientSideRestartStrategy;
+               } else {
+                       if (serverStrategyFactory instanceof 
NoOrFixedIfCheckpointingEnabledRestartStrategyFactory) {
+                               return 
((NoOrFixedIfCheckpointingEnabledRestartStrategyFactory) serverStrategyFactory)
+                                       
.createRestartStrategy(isCheckpointingEnabled);
+                       } else {
+                               return 
serverStrategyFactory.createRestartStrategy();
+                       }
+               }
+       }
+
+       private RestartStrategyResolving() {
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/57872d53/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index b3e03de..377f870 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -329,7 +329,7 @@ public class JobGraph implements Serializable {
         * Sets the settings for asynchronous snapshots. A value of {@code 
null} means that
         * snapshotting is not enabled.
         *
-        * @param settings The snapshot settings, or null, to disable 
snapshotting.
+        * @param settings The snapshot settings
         */
        public void setSnapshotSettings(JobCheckpointingSettings settings) {
                this.snapshotSettings = settings;
@@ -339,13 +339,29 @@ public class JobGraph implements Serializable {
         * Gets the settings for asynchronous snapshots. This method returns 
null, when
         * checkpointing is not enabled.
         *
-        * @return The snapshot settings, or null, if checkpointing is not 
enabled.
+        * @return The snapshot settings
         */
        public JobCheckpointingSettings getCheckpointingSettings() {
                return snapshotSettings;
        }
 
        /**
+        * Checks if the checkpointing was enabled for this job graph
+        *
+        * @return true if checkpointing enabled
+        */
+       public boolean isCheckpointingEnabled() {
+
+               if (snapshotSettings == null) {
+                       return false;
+               }
+
+               long checkpointInterval = 
snapshotSettings.getCheckpointCoordinatorConfiguration().getCheckpointInterval();
+               return checkpointInterval > 0 &&
+                       checkpointInterval < Long.MAX_VALUE;
+       }
+
+       /**
         * Searches for a vertex with a matching ID and returns it.
         *
         * @param id

http://git-wip-us.apache.org/repos/asf/flink/blob/57872d53/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 7557bc3..1660f95 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.jobmaster;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Time;
@@ -51,7 +52,7 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.IntermediateResult;
 import org.apache.flink.runtime.executiongraph.JobStatusListener;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
+import 
org.apache.flink.runtime.executiongraph.restart.RestartStrategyResolving;
 import org.apache.flink.runtime.heartbeat.HeartbeatListener;
 import org.apache.flink.runtime.heartbeat.HeartbeatManager;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -273,11 +274,11 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                                                
.deserializeValue(userCodeLoader)
                                                .getRestartStrategy();
 
-               this.restartStrategy = (restartStrategyConfiguration != null) ?
-                               
RestartStrategyFactory.createRestartStrategy(restartStrategyConfiguration) :
-                               
jobManagerSharedServices.getRestartStrategyFactory().createRestartStrategy();
+               this.restartStrategy = 
RestartStrategyResolving.resolve(restartStrategyConfiguration,
+                       jobManagerSharedServices.getRestartStrategyFactory(),
+                       jobGraph.isCheckpointingEnabled());
 
-               log.info("Using restart strategy {} for {} ({}).", 
restartStrategy, jobName, jid);
+               log.info("Using restart strategy {} for {} ({}).", 
this.restartStrategy, jobName, jid);
 
                resourceManagerLeaderRetriever = 
highAvailabilityServices.getResourceManagerLeaderRetriever();
 
@@ -1649,4 +1650,9 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                        return CompletableFuture.completedFuture(null);
                }
        }
+
+       @VisibleForTesting
+       RestartStrategy getRestartStrategy() {
+               return restartStrategy;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/57872d53/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index cebff58..1c8174f 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -49,7 +49,7 @@ import 
org.apache.flink.runtime.execution.SuppressRestartsException
 import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders.ResolveOrder
 import 
org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, 
LibraryCacheManager}
 import org.apache.flink.runtime.executiongraph._
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
+import 
org.apache.flink.runtime.executiongraph.restart.{RestartStrategyFactory, 
RestartStrategyResolving}
 import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution
 import org.apache.flink.runtime.highavailability.{HighAvailabilityServices, 
HighAvailabilityServicesUtils}
 import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceID, 
InstanceManager}
@@ -1250,15 +1250,15 @@ class JobManager(
           throw new JobSubmissionException(jobId, "The given job is empty")
         }
 
-        val restartStrategy =
-          Option(jobGraph.getSerializedExecutionConfig()
-            .deserializeValue(userCodeLoader)
-            .getRestartStrategy())
-            .map(RestartStrategyFactory.createRestartStrategy)
-            .filter(p => p != null) match {
-            case Some(strategy) => strategy
-            case None => restartStrategyFactory.createRestartStrategy()
-          }
+        val restartStrategyConfiguration = jobGraph
+          .getSerializedExecutionConfig
+          .deserializeValue(userCodeLoader)
+          .getRestartStrategy
+
+        val restartStrategy = RestartStrategyResolving
+          .resolve(restartStrategyConfiguration,
+            restartStrategyFactory,
+            jobGraph.isCheckpointingEnabled)
 
         log.info(s"Using restart strategy $restartStrategy for $jobId.")
 

http://git-wip-us.apache.org/repos/asf/flink/blob/57872d53/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
index 8a6a9d8..f6b7730 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
@@ -40,14 +42,14 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -68,7 +70,10 @@ public class CoordinatorShutdownTest extends TestLogger {
                        JobVertex vertex = new JobVertex("Test Vertex");
                        
vertex.setInvokableClass(FailingBlockingInvokable.class);
                        List<JobVertexID> vertexIdList = 
Collections.singletonList(vertex.getID());
-                       
+
+                       final ExecutionConfig executionConfig = new 
ExecutionConfig();
+                       
executionConfig.setRestartStrategy(RestartStrategies.noRestart());
+
                        JobGraph testGraph = new JobGraph("test job", vertex);
                        testGraph.setSnapshotSettings(
                                new JobCheckpointingSettings(
@@ -83,7 +88,9 @@ public class CoordinatorShutdownTest extends TestLogger {
                                                
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
                                                true),
                                        null));
-                       
+                       testGraph.setExecutionConfig(executionConfig);
+
+
                        ActorGateway jmGateway = 
cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
 
                        FiniteDuration timeout = new FiniteDuration(60, 
TimeUnit.SECONDS);

http://git-wip-us.apache.org/repos/asf/flink/blob/57872d53/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolvingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolvingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolvingTest.java
new file mode 100644
index 0000000..4194e97
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolvingTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.restart;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static 
org.apache.flink.api.common.restartstrategy.RestartStrategies.fallBackRestart;
+import static 
org.apache.flink.api.common.restartstrategy.RestartStrategies.noRestart;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
+
+/**
+ * Tests for {@link RestartStrategyResolving}.
+ */
+public class RestartStrategyResolvingTest extends TestLogger {
+
+       @Test
+       public void testClientSideHighestPriority() {
+               RestartStrategy resolvedStrategy = 
RestartStrategyResolving.resolve(noRestart(),
+                       new 
FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(2, 1000L),
+                       true);
+
+               assertThat(resolvedStrategy, 
instanceOf(NoRestartStrategy.class));
+       }
+
+       @Test
+       public void testFixedStrategySetWhenCheckpointingEnabled() {
+               RestartStrategy resolvedStrategy = 
RestartStrategyResolving.resolve(fallBackRestart(),
+                       new 
NoOrFixedIfCheckpointingEnabledRestartStrategyFactory(),
+                       true);
+
+               assertThat(resolvedStrategy, 
instanceOf(FixedDelayRestartStrategy.class));
+       }
+
+       @Test
+       public void testServerStrategyIsUsedSetWhenCheckpointingEnabled() {
+               RestartStrategy resolvedStrategy = 
RestartStrategyResolving.resolve(fallBackRestart(),
+                       new 
FailureRateRestartStrategy.FailureRateRestartStrategyFactory(5, 
Time.seconds(5), Time.seconds(2)),
+                       true);
+
+               assertThat(resolvedStrategy, 
instanceOf(FailureRateRestartStrategy.class));
+       }
+
+       @Test
+       public void testServerStrategyIsUsedSetWhenCheckpointingDisabled() {
+               RestartStrategy resolvedStrategy = 
RestartStrategyResolving.resolve(fallBackRestart(),
+                       new 
FailureRateRestartStrategy.FailureRateRestartStrategyFactory(5, 
Time.seconds(5), Time.seconds(2)),
+                       false);
+
+               assertThat(resolvedStrategy, 
instanceOf(FailureRateRestartStrategy.class));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/57872d53/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index d7dc017..82fdc94 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -46,6 +46,9 @@ import 
org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -110,6 +113,8 @@ import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Tests for {@link JobMaster}.
@@ -358,6 +363,42 @@ public class JobMasterTest extends TestLogger {
        }
 
        /**
+        * Tests that in a streaming use case where checkpointing is enabled, a
+        * fixed delay with Integer.MAX_VALUE retries is instantiated if no 
other restart
+        * strategy has been specified.
+        */
+       @Test
+       public void testAutomaticRestartingWhenCheckpointing() throws Exception 
{
+               // create savepoint data
+               final long savepointId = 42L;
+               final File savepointFile = createSavepoint(savepointId);
+
+               // set savepoint settings
+               final SavepointRestoreSettings savepointRestoreSettings = 
SavepointRestoreSettings.forPath(
+                       savepointFile.getAbsolutePath(),
+                       true);
+               final JobGraph jobGraph = 
createJobGraphWithCheckpointing(savepointRestoreSettings);
+
+               final StandaloneCompletedCheckpointStore 
completedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
+               final TestingCheckpointRecoveryFactory 
testingCheckpointRecoveryFactory = new TestingCheckpointRecoveryFactory(
+                       completedCheckpointStore,
+                       new StandaloneCheckpointIDCounter());
+               
haServices.setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory);
+               final JobMaster jobMaster = createJobMaster(
+                       new Configuration(),
+                       jobGraph,
+                       haServices,
+                       new TestingJobManagerSharedServicesBuilder()
+                               
.setRestartStrategyFactory(RestartStrategyFactory.createRestartStrategyFactory(configuration))
+                               .build());
+
+               RestartStrategy restartStrategy = 
jobMaster.getRestartStrategy();
+
+               assertNotNull(restartStrategy);
+               assertTrue(restartStrategy instanceof 
FixedDelayRestartStrategy);
+       }
+
+       /**
         * Tests that an existing checkpoint will have precedence over an 
savepoint
         */
        @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/57872d53/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java
index f0b232a..030e4e6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
-import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import 
org.apache.flink.runtime.executiongraph.restart.NoOrFixedIfCheckpointingEnabledRestartStrategyFactory;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
 import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
 import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSampleCoordinator;
@@ -48,7 +48,7 @@ public class TestingJobManagerSharedServicesBuilder {
        public TestingJobManagerSharedServicesBuilder() {
                scheduledExecutorService = TestingUtils.defaultExecutor();
                libraryCacheManager = mock(LibraryCacheManager.class);
-               restartStrategyFactory = new 
NoRestartStrategy.NoRestartStrategyFactory();
+               restartStrategyFactory = new 
NoOrFixedIfCheckpointingEnabledRestartStrategyFactory();
                stackTraceSampleCoordinator = 
mock(StackTraceSampleCoordinator.class);
                backPressureStatsTracker = 
VoidBackPressureStatsTracker.INSTANCE;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/57872d53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 603b9e4..e905eac 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
@@ -81,12 +80,6 @@ public class StreamingJobGraphGenerator {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(StreamingJobGraphGenerator.class);
 
-       /**
-        * Restart delay used for the FixedDelayRestartStrategy in case 
checkpointing was enabled but
-        * no restart strategy has been specified.
-        */
-       private static final long DEFAULT_RESTART_DELAY = 0L;
-
        // 
------------------------------------------------------------------------
 
        public static JobGraph createJobGraph(StreamGraph streamGraph) {
@@ -590,17 +583,9 @@ public class StreamingJobGraphGenerator {
 
                long interval = cfg.getCheckpointInterval();
                if (interval > 0) {
-
                        ExecutionConfig executionConfig = 
streamGraph.getExecutionConfig();
                        // propagate the expected behaviour for checkpoint 
errors to task.
                        
executionConfig.setFailTaskOnCheckpointError(cfg.isFailOnCheckpointingErrors());
-
-                       // check if a restart strategy has been set, if not 
then set the FixedDelayRestartStrategy
-                       if (executionConfig.getRestartStrategy() == null) {
-                               // if the user enabled checkpointing, the 
default number of exec retries is infinite.
-                               executionConfig.setRestartStrategy(
-                                       
RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, DEFAULT_RESTART_DELAY));
-                       }
                } else {
                        // interval of max value means disable periodic 
checkpoint
                        interval = Long.MAX_VALUE;

http://git-wip-us.apache.org/repos/asf/flink/blob/57872d53/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java
index b231bea..03b5a53 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java
@@ -33,12 +33,11 @@ import org.junit.Test;
 public class RestartStrategyTest extends TestLogger {
 
        /**
-        * Tests that in a streaming use case where checkpointing is enabled, a
-        * fixed delay with Integer.MAX_VALUE retries is instantiated if no 
other restart
-        * strategy has been specified.
+        * Tests that in a streaming use case where checkpointing is enabled, 
there is no default strategy set on the
+        * client side.
         */
        @Test
-       public void testAutomaticRestartingWhenCheckpointing() throws Exception 
{
+       public void testFallbackStrategyOnClientSideWhenCheckpointingEnabled() 
throws Exception {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.enableCheckpointing(500);
 
@@ -51,8 +50,7 @@ public class RestartStrategyTest extends TestLogger {
                        
jobGraph.getSerializedExecutionConfig().deserializeValue(getClass().getClassLoader()).getRestartStrategy();
 
                Assert.assertNotNull(restartStrategy);
-               Assert.assertTrue(restartStrategy instanceof 
RestartStrategies.FixedDelayRestartStrategyConfiguration);
-               Assert.assertEquals(Integer.MAX_VALUE, 
((RestartStrategies.FixedDelayRestartStrategyConfiguration) 
restartStrategy).getRestartAttempts());
+               Assert.assertTrue(restartStrategy instanceof 
RestartStrategies.FallbackRestartStrategyConfiguration);
        }
 
        /**

Reply via email to