Repository: flink
Updated Branches:
  refs/heads/release-1.0 f1d34b17b -> 0708dd08b


Revert "[FLINK-3800] [jobmanager] Terminate ExecutionGraphs properly"

This reverts commit 014a686ec5ea0e8809a5235fe988f828e5e70833.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0708dd08
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0708dd08
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0708dd08

Branch: refs/heads/release-1.0
Commit: 0708dd08b57d8cfca20c470ab4e909ea56cb9a38
Parents: f1d34b1
Author: Ufuk Celebi <[email protected]>
Authored: Fri Apr 29 11:22:33 2016 +0200
Committer: Ufuk Celebi <[email protected]>
Committed: Fri Apr 29 11:22:33 2016 +0200

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionGraph.java  |  28 +--
 .../restart/FixedDelayRestartStrategy.java      |  30 +--
 .../restart/NoRestartStrategy.java              |  17 +-
 .../executiongraph/restart/RestartStrategy.java |   5 -
 .../restart/RestartStrategyFactory.java         |  28 +--
 .../flink/runtime/jobmanager/JobManager.scala   |  16 +-
 .../JobManagerLeaderElectionTest.java           |   2 +-
 .../LeaderChangeJobRecoveryTest.java            | 198 -------------------
 .../LeaderChangeStateCleanupTest.java           |   2 +-
 .../LeaderElectionRetrievalTestingCluster.java  |  23 +--
 .../runtime/testingUtils/TestingCluster.scala   |  10 +-
 .../testingUtils/TestingJobManager.scala        |   6 +-
 .../flink/yarn/TestingYarnJobManager.scala      |   8 +-
 .../org/apache/flink/yarn/YarnJobManager.scala  |   8 +-
 14 files changed, 49 insertions(+), 332 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 8cb1ded..7cb83cd 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -790,14 +790,6 @@ public class ExecutionGraph implements Serializable {
        }
 
        public void fail(Throwable t) {
-               if (t instanceof UnrecoverableException) {
-                       if (restartStrategy != null) {
-                               // disable the restart strategy in case that we 
have seen a SuppressRestartsException
-                               // it basically overrides the restart behaviour 
of a the root cause
-                               restartStrategy.disable();
-                       }
-               }
-
                while (true) {
                        JobStatus current = state;
                        if (current == JobStatus.FAILING || 
current.isTerminalState()) {
@@ -1021,17 +1013,15 @@ public class ExecutionGraph implements Serializable {
                                                }
                                        }
                                        else if (current == JobStatus.FAILING) {
-                                               if 
(restartStrategy.canRestart() && transitionState(current, 
JobStatus.RESTARTING)) {
-                                                       // double check in case 
that in the meantime a SuppressRestartsException was thrown
-                                                       if 
(restartStrategy.canRestart()) {
-                                                               
restartStrategy.restart(this);
-                                                               break;
-                                                       } else {
-                                                               fail(new 
Exception("ExecutionGraph went into RESTARTING state but " +
-                                                                       "then 
the restart strategy was disabled."));
-                                                       }
-
-                                               } else if 
(!restartStrategy.canRestart() && transitionState(current, JobStatus.FAILED, 
failureCause)) {
+                                               boolean isRecoverable = 
!(failureCause instanceof UnrecoverableException);
+
+                                               if (isRecoverable && 
restartStrategy.canRestart() &&
+                                                               
transitionState(current, JobStatus.RESTARTING)) {
+                                                       
restartStrategy.restart(this);
+                                                       break;
+
+                                               } else if ((!isRecoverable || 
!restartStrategy.canRestart()) &&
+                                                       
transitionState(current, JobStatus.FAILED, failureCause)) {
                                                        postRunCleanup();
                                                        break;
                                                }

http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
index 464b48e..d3c7eba 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
@@ -41,7 +41,6 @@ public class FixedDelayRestartStrategy implements 
RestartStrategy {
        private final int maxNumberRestartAttempts;
        private final long delayBetweenRestartAttempts;
        private int currentRestartAttempt;
-       private boolean disabled = false;
 
        public FixedDelayRestartStrategy(
                int maxNumberRestartAttempts,
@@ -61,7 +60,7 @@ public class FixedDelayRestartStrategy implements 
RestartStrategy {
 
        @Override
        public boolean canRestart() {
-               return !disabled && currentRestartAttempt < 
maxNumberRestartAttempts;
+               return currentRestartAttempt < maxNumberRestartAttempts;
        }
 
        @Override
@@ -84,11 +83,6 @@ public class FixedDelayRestartStrategy implements 
RestartStrategy {
                }, executionGraph.getExecutionContext());
        }
 
-       @Override
-       public void disable() {
-               disabled = true;
-       }
-
        /**
         * Creates a FixedDelayRestartStrategy from the given Configuration.
         *
@@ -96,7 +90,7 @@ public class FixedDelayRestartStrategy implements 
RestartStrategy {
         * @return Initialized instance of FixedDelayRestartStrategy
         * @throws Exception
         */
-       public static FixedDelayRestartStrategyFactory 
createFactory(Configuration configuration) throws Exception {
+       public static FixedDelayRestartStrategy create(Configuration 
configuration) throws Exception {
                int maxAttempts = 
configuration.getInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 
1);
 
                String timeoutString = configuration.getString(
@@ -124,7 +118,7 @@ public class FixedDelayRestartStrategy implements 
RestartStrategy {
                        }
                }
 
-               return new FixedDelayRestartStrategyFactory(maxAttempts, delay);
+               return new FixedDelayRestartStrategy(maxAttempts, delay);
        }
 
        @Override
@@ -134,22 +128,4 @@ public class FixedDelayRestartStrategy implements 
RestartStrategy {
                                ", delayBetweenRestartAttempts=" + 
delayBetweenRestartAttempts +
                                ')';
        }
-
-       public static class FixedDelayRestartStrategyFactory extends 
RestartStrategyFactory {
-
-               private static final long serialVersionUID = 
6642934067762271950L;
-
-               private final int maxAttempts;
-               private final long delay;
-
-               public FixedDelayRestartStrategyFactory(int maxAttempts, long 
delay) {
-                       this.maxAttempts = maxAttempts;
-                       this.delay = delay;
-               }
-
-               @Override
-               public RestartStrategy createRestartStrategy() {
-                       return new FixedDelayRestartStrategy(maxAttempts, 
delay);
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
index 6cc5ee4..8911a98 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
@@ -36,31 +36,18 @@ public class NoRestartStrategy implements RestartStrategy {
                throw new RuntimeException("NoRestartStrategy does not support 
restart.");
        }
 
-       @Override
-       public void disable() {}
-
        /**
         * Creates a NoRestartStrategy instance.
         *
         * @param configuration Configuration object which is ignored
         * @return NoRestartStrategy instance
         */
-       public static NoRestartStrategyFactory createFactory(Configuration 
configuration) {
-               return new NoRestartStrategyFactory();
+       public static NoRestartStrategy create(Configuration configuration) {
+               return new NoRestartStrategy();
        }
 
        @Override
        public String toString() {
                return "NoRestartStrategy";
        }
-
-       public static class NoRestartStrategyFactory extends 
RestartStrategyFactory {
-
-               private static final long serialVersionUID = 
-1809462525812787862L;
-
-               @Override
-               public RestartStrategy createRestartStrategy() {
-                       return new NoRestartStrategy();
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
index c9e6277..2880c01 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
@@ -38,9 +38,4 @@ public interface RestartStrategy {
         * @param executionGraph The ExecutionGraph to be restarted
         */
        void restart(ExecutionGraph executionGraph);
-
-       /**
-        * Disables the restart strategy.
-        */
-       void disable();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
index e58d775..68d114e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
@@ -25,21 +25,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.Duration;
 
-import java.io.Serializable;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 
-public abstract class RestartStrategyFactory implements Serializable {
-       private static final long serialVersionUID = 7320252552640522191L;
-
+public class RestartStrategyFactory {
        private static final Logger LOG = 
LoggerFactory.getLogger(RestartStrategyFactory.class);
-       private static final String CREATE_METHOD = "createFactory";
-
-       /**
-        * Factory method to create a restart strategy
-        * @return The created restart strategy
-        */
-       public abstract RestartStrategy createRestartStrategy();
+       private static final String CREATE_METHOD = "create";
 
        /**
         * Creates a {@link RestartStrategy} instance from the given {@link 
org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration}.
@@ -67,10 +58,11 @@ public abstract class RestartStrategyFactory implements 
Serializable {
        /**
         * Creates a {@link RestartStrategy} instance from the given {@link 
Configuration}.
         *
+        * @param configuration Configuration object containing the 
configuration values.
         * @return RestartStrategy instance
         * @throws Exception which indicates that the RestartStrategy could not 
be instantiated.
         */
-       public static RestartStrategyFactory 
createRestartStrategyFactory(Configuration configuration) throws Exception {
+       public static RestartStrategy createFromConfig(Configuration 
configuration) throws Exception {
                String restartStrategyName = 
configuration.getString(ConfigConstants.RESTART_STRATEGY, "none").toLowerCase();
 
                switch (restartStrategyName) {
@@ -100,16 +92,16 @@ public abstract class RestartStrategyFactory implements 
Serializable {
                                }
 
                                if (numberExecutionRetries > 0 && delay >= 0) {
-                                       return new 
FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(numberExecutionRetries,
 delay);
+                                       return new 
FixedDelayRestartStrategy(numberExecutionRetries, delay);
                                } else {
-                                       return 
NoRestartStrategy.createFactory(configuration);
+                                       return 
NoRestartStrategy.create(configuration);
                                }
                        case "off":
                        case "disable":
-                               return 
NoRestartStrategy.createFactory(configuration);
+                               return NoRestartStrategy.create(configuration);
                        case "fixeddelay":
                        case "fixed-delay":
-                               return 
FixedDelayRestartStrategy.createFactory(configuration);
+                               return 
FixedDelayRestartStrategy.create(configuration);
                        default:
                                try {
                                        Class<?> clazz = 
Class.forName(restartStrategyName);
@@ -121,7 +113,7 @@ public abstract class RestartStrategyFactory implements 
Serializable {
                                                        Object result = 
method.invoke(null, configuration);
 
                                                        if (result != null) {
-                                                               return 
(RestartStrategyFactory) result;
+                                                               return 
(RestartStrategy) result;
                                                        }
                                                }
                                        }
@@ -136,7 +128,7 @@ public abstract class RestartStrategyFactory implements 
Serializable {
                                }
 
                                // fallback in case of an error
-                               return 
NoRestartStrategy.createFactory(configuration);
+                               return NoRestartStrategy.create(configuration);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 6391c82..cee5606 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -28,7 +28,6 @@ import akka.actor._
 import akka.pattern.ask
 
 import grizzled.slf4j.Logger
-import 
org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration
 
 import org.apache.flink.api.common.{ExecutionConfig, JobID}
 import org.apache.flink.configuration.{ConfigConstants, Configuration, 
GlobalConfiguration}
@@ -112,7 +111,7 @@ class JobManager(
     protected val scheduler: FlinkScheduler,
     protected val libraryCacheManager: BlobLibraryCacheManager,
     protected val archive: ActorRef,
-    protected val restartStrategyFactory: RestartStrategyFactory,
+    protected val defaultRestartStrategy: RestartStrategy,
     protected val timeout: FiniteDuration,
     protected val leaderElectionService: LeaderElectionService,
     protected val submittedJobGraphs : SubmittedJobGraphStore,
@@ -201,7 +200,7 @@ class JobManager(
     log.info(s"Stopping JobManager $getAddress.")
 
     val newFuturesToComplete = cancelAndClearEverything(
-      new UnrecoverableException(new Exception("The JobManager is shutting 
down.")),
+      new Exception("The JobManager is shutting down."),
       removeJobFromStateBackend = true)
 
     implicit val executionContext = context.dispatcher
@@ -298,7 +297,7 @@ class JobManager(
       log.info(s"JobManager ${self.path.toSerializationFormat} was revoked 
leadership.")
 
       val newFuturesToComplete = cancelAndClearEverything(
-        new UnrecoverableException(new Exception("JobManager is no longer the 
leader.")),
+        new Exception("JobManager is no longer the leader."),
         removeJobFromStateBackend = false)
 
       futuresToComplete = Some(futuresToComplete.getOrElse(Seq()) ++ 
newFuturesToComplete)
@@ -951,7 +950,7 @@ class JobManager(
         val restartStrategy = 
Option(jobGraph.getRestartStrategyConfiguration())
           .map(RestartStrategyFactory.createRestartStrategy(_)) match {
             case Some(strategy) => strategy
-            case None => restartStrategyFactory.createRestartStrategy()
+            case None => defaultRestartStrategy
           }
 
         log.info(s"Using restart strategy $restartStrategy for $jobId.")
@@ -1495,7 +1494,7 @@ class JobManager(
     * @param cause Cause for the cancelling.
     */
   private def cancelAndClearEverything(
-      cause: UnrecoverableException,
+      cause: Throwable,
       removeJobFromStateBackend: Boolean)
     : Seq[Future[Unit]] = {
     val futures = for ((jobID, (eg, jobInfo)) <- currentJobs) yield {
@@ -2097,7 +2096,7 @@ object JobManager {
     InstanceManager,
     FlinkScheduler,
     BlobLibraryCacheManager,
-    RestartStrategyFactory,
+    RestartStrategy,
     FiniteDuration, // timeout
     Int, // number of archived jobs
     LeaderElectionService,
@@ -2113,7 +2112,8 @@ object JobManager {
       ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
       ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000
 
-    val restartStrategy = 
RestartStrategyFactory.createRestartStrategyFactory(configuration)
+    val restartStrategy = RestartStrategyFactory
+      .createFromConfig(configuration)
 
     val archiveCount = 
configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_ARCHIVE_COUNT,
       ConfigConstants.DEFAULT_JOB_MANAGER_WEB_ARCHIVE_COUNT)

http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
index afc46a7..fe35c0d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
@@ -192,7 +192,7 @@ public class JobManagerLeaderElectionTest extends 
TestLogger {
                                new 
Scheduler(TestingUtils.defaultExecutionContext()),
                                new BlobLibraryCacheManager(new 
BlobServer(configuration), 10L),
                                ActorRef.noSender(),
-                               new 
NoRestartStrategy.NoRestartStrategyFactory(),
+                               new NoRestartStrategy(),
                                AkkaUtils.getDefaultTimeout(),
                                leaderElectionService,
                                submittedJobGraphStore,

http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
deleted file mode 100644
index b13ae81..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.leaderelection;
-
-import akka.actor.ActorRef;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobmanager.Tasks;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.messages.ExecutionGraphMessages;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
-import org.apache.flink.util.TestLogger;
-import org.junit.Before;
-import org.junit.Test;
-import scala.concurrent.Await;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
-import scala.concurrent.Promise;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static org.junit.Assert.assertTrue;
-
-public class LeaderChangeJobRecoveryTest extends TestLogger {
-
-       private static FiniteDuration timeout = FiniteDuration.apply(30, 
TimeUnit.SECONDS);
-
-       private int numTMs = 1;
-       private int numSlotsPerTM = 1;
-       private int parallelism = numTMs * numSlotsPerTM;
-
-       private Configuration configuration;
-       private LeaderElectionRetrievalTestingCluster cluster = null;
-       private JobGraph job = createBlockingJob(parallelism);
-
-       @Before
-       public void before() throws TimeoutException, InterruptedException {
-               Tasks.BlockingOnceReceiver$.MODULE$.blocking_$eq(true);
-
-               configuration = new Configuration();
-
-               
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1);
-               
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
-               
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
numSlotsPerTM);
-
-               cluster = new 
LeaderElectionRetrievalTestingCluster(configuration, true, false, new 
FixedDelayRestartStrategy(9999, 100));
-               cluster.start(false);
-
-               // wait for actors to be alive so that they have started their 
leader retrieval service
-               cluster.waitForActorsToBeAlive();
-       }
-
-       /**
-        * Tests that the job is not restarted or at least terminates 
eventually in case that the
-        * JobManager loses its leadership.
-        *
-        * @throws Exception
-        */
-       @Test
-       public void testNotRestartedWhenLosingLeadership() throws Exception {
-               UUID leaderSessionID = UUID.randomUUID();
-
-               cluster.grantLeadership(0, leaderSessionID);
-               cluster.notifyRetrievalListeners(0, leaderSessionID);
-
-               cluster.waitForTaskManagersToBeRegistered(timeout);
-
-               cluster.submitJobDetached(job);
-
-               ActorGateway jm = cluster.getLeaderGateway(timeout);
-
-               Future<Object> wait = jm.ask(new 
TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished(job.getJobID()),
 timeout);
-
-               Await.ready(wait, timeout);
-
-               Future<Object> futureExecutionGraph = jm.ask(new 
TestingJobManagerMessages.RequestExecutionGraph(job.getJobID()), timeout);
-
-               TestingJobManagerMessages.ResponseExecutionGraph 
responseExecutionGraph =
-                       (TestingJobManagerMessages.ResponseExecutionGraph) 
Await.result(futureExecutionGraph, timeout);
-
-               assertTrue(responseExecutionGraph instanceof 
TestingJobManagerMessages.ExecutionGraphFound);
-
-               ExecutionGraph executionGraph = 
((TestingJobManagerMessages.ExecutionGraphFound) 
responseExecutionGraph).executionGraph();
-
-               TestActorGateway testActorGateway = new TestActorGateway();
-
-               executionGraph.registerJobStatusListener(testActorGateway);
-
-               cluster.revokeLeadership();
-
-               Future<Boolean> hasReachedTerminalState = 
testActorGateway.hasReachedTerminalState();
-
-               assertTrue("The job should have reached a terminal state.", 
Await.result(hasReachedTerminalState, timeout));
-       }
-
-       public JobGraph createBlockingJob(int parallelism) {
-               Tasks.BlockingOnceReceiver$.MODULE$.blocking_$eq(true);
-
-               JobVertex sender = new JobVertex("sender");
-               JobVertex receiver = new JobVertex("receiver");
-
-               sender.setInvokableClass(Tasks.Sender.class);
-               receiver.setInvokableClass(Tasks.BlockingOnceReceiver.class);
-
-               sender.setParallelism(parallelism);
-               receiver.setParallelism(parallelism);
-
-               receiver.connectNewDataSetAsInput(sender, 
DistributionPattern.POINTWISE);
-
-               SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
-               sender.setSlotSharingGroup(slotSharingGroup);
-               receiver.setSlotSharingGroup(slotSharingGroup);
-
-               return new JobGraph("Blocking test job", sender, receiver);
-       }
-
-       public static class TestActorGateway implements ActorGateway {
-
-               private static final long serialVersionUID = 
-736146686160538227L;
-               private transient Promise<Boolean> terminalState = new 
scala.concurrent.impl.Promise.DefaultPromise<>();
-
-               public Future<Boolean> hasReachedTerminalState() {
-                       return terminalState.future();
-               }
-
-               @Override
-               public Future<Object> ask(Object message, FiniteDuration 
timeout) {
-                       return null;
-               }
-
-               @Override
-               public void tell(Object message) {
-                       this.tell(message, new 
AkkaActorGateway(ActorRef.noSender(), null));
-               }
-
-               @Override
-               public void tell(Object message, ActorGateway sender) {
-                       if (message instanceof 
ExecutionGraphMessages.JobStatusChanged) {
-                               ExecutionGraphMessages.JobStatusChanged 
jobStatusChanged = (ExecutionGraphMessages.JobStatusChanged) message;
-
-                               if 
(jobStatusChanged.newJobStatus().isTerminalState()) {
-                                       terminalState.success(true);
-                               }
-                       }
-               }
-
-               @Override
-               public void forward(Object message, ActorGateway sender) {
-
-               }
-
-               @Override
-               public Future<Object> retry(Object message, int numberRetries, 
FiniteDuration timeout, ExecutionContext executionContext) {
-                       return null;
-               }
-
-               @Override
-               public String path() {
-                       return null;
-               }
-
-               @Override
-               public ActorRef actor() {
-                       return null;
-               }
-
-               @Override
-               public UUID leaderSessionID() {
-                       return null;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
index 7876ff7..c490a64 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java
@@ -67,7 +67,7 @@ public class LeaderChangeStateCleanupTest extends TestLogger {
                
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
                
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
numSlotsPerTM);
 
-               cluster = new 
LeaderElectionRetrievalTestingCluster(configuration, true, false, null);
+               cluster = new 
LeaderElectionRetrievalTestingCluster(configuration, true, false);
                cluster.start(false); // TaskManagers don't have to register at 
the JobManager
 
                cluster.waitForActorsToBeAlive(); // we only wait until all 
actors are alive

http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
index cd89fa6..c8cf868 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.leaderelection;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
 import scala.Option;
@@ -39,7 +38,6 @@ public class LeaderElectionRetrievalTestingCluster extends 
TestingCluster {
 
        private final Configuration userConfiguration;
        private final boolean useSingleActorSystem;
-       private final RestartStrategy restartStrategy;
 
        public List<TestingLeaderElectionService> leaderElectionServices;
        public List<TestingLeaderRetrievalService> leaderRetrievalServices;
@@ -49,8 +47,7 @@ public class LeaderElectionRetrievalTestingCluster extends 
TestingCluster {
        public LeaderElectionRetrievalTestingCluster(
                        Configuration userConfiguration,
                        boolean singleActorSystem,
-                       boolean synchronousDispatcher,
-                       RestartStrategy restartStrategy) {
+                       boolean synchronousDispatcher) {
                super(userConfiguration, singleActorSystem, 
synchronousDispatcher);
 
                this.userConfiguration = userConfiguration;
@@ -58,8 +55,6 @@ public class LeaderElectionRetrievalTestingCluster extends 
TestingCluster {
 
                leaderElectionServices = new 
ArrayList<TestingLeaderElectionService>();
                leaderRetrievalServices = new 
ArrayList<TestingLeaderRetrievalService>();
-
-               this.restartStrategy = restartStrategy;
        }
 
        @Override
@@ -95,15 +90,6 @@ public class LeaderElectionRetrievalTestingCluster extends 
TestingCluster {
                                
ConfigConstants.DEFAULT_LOCAL_NUMBER_JOB_MANAGER);
        }
 
-       @Override
-       public RestartStrategy getRestartStrategy(RestartStrategy other) {
-               if (this.restartStrategy != null) {
-                       return this.restartStrategy;
-               } else {
-                       return other;
-               }
-       }
-
        public void grantLeadership(int index, UUID leaderSessionID) {
                if(leaderIndex >= 0) {
                        // first revoke leadership
@@ -123,11 +109,4 @@ public class LeaderElectionRetrievalTestingCluster extends 
TestingCluster {
                        service.notifyListener(address, leaderSessionID);
                }
        }
-
-       public void revokeLeadership() {
-               if (leaderIndex >= 0) {
-                       leaderElectionServices.get(leaderIndex).notLeader();
-                       leaderIndex = -1;
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 8f321bb..c72eb50 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -24,10 +24,10 @@ import akka.pattern.ask
 import akka.actor.{ActorRef, Props, ActorSystem}
 import akka.testkit.CallingThreadDispatcher
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategy
 import org.apache.flink.runtime.jobmanager.JobManager
 import org.apache.flink.runtime.leaderelection.LeaderElectionService
 import org.apache.flink.runtime.minicluster.FlinkMiniCluster
+import org.apache.flink.util.NetUtils
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.testingUtils.TestingMessages.Alive
 
@@ -96,7 +96,7 @@ class TestingCluster(
     instanceManager,
     scheduler,
     libraryCacheManager,
-    restartStrategyFactory,
+    restartStrategy,
     timeout,
     archiveCount,
     leaderElectionService,
@@ -118,7 +118,7 @@ class TestingCluster(
         scheduler,
         libraryCacheManager,
         archive,
-        restartStrategyFactory,
+        restartStrategy,
         timeout,
         leaderElectionService,
         submittedJobsGraphs,
@@ -155,10 +155,6 @@ class TestingCluster(
     None
   }
 
-  def getRestartStrategy(restartStrategy: RestartStrategy) = {
-    restartStrategy
-  }
-
   @throws(classOf[TimeoutException])
   @throws(classOf[InterruptedException])
   def waitForTaskManagersToBeAlive(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
index e854b13..53867e0 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -23,7 +23,7 @@ import akka.actor.ActorRef
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.checkpoint.{SavepointStore, 
CheckpointRecoveryFactory}
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy
 import org.apache.flink.runtime.instance.InstanceManager
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
 import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore}
@@ -44,7 +44,7 @@ class TestingJobManager(
     scheduler: Scheduler,
     libraryCacheManager: BlobLibraryCacheManager,
     archive: ActorRef,
-    restartStrategyFactory: RestartStrategyFactory,
+    restartStrategy: RestartStrategy,
     timeout: FiniteDuration,
     leaderElectionService: LeaderElectionService,
     submittedJobGraphs : SubmittedJobGraphStore,
@@ -58,7 +58,7 @@ class TestingJobManager(
     scheduler,
     libraryCacheManager,
     archive,
-    restartStrategyFactory,
+    restartStrategy,
     timeout,
     leaderElectionService,
     submittedJobGraphs,

http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
 
b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
index 717d631..4e6b745 100644
--- 
a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
+++ 
b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
@@ -24,7 +24,7 @@ import akka.actor.ActorRef
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.checkpoint.{SavepointStore, 
CheckpointRecoveryFactory}
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy
 import org.apache.flink.runtime.instance.InstanceManager
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
@@ -46,7 +46,7 @@ import scala.concurrent.duration.FiniteDuration
   * @param scheduler Scheduler to schedule Flink jobs
   * @param libraryCacheManager Manager to manage uploaded jar files
   * @param archive Archive for finished Flink jobs
-  * @param restartStrategyFactory Default restart strategy for job restarts
+  * @param restartStrategy Default restart strategy for job restarts
   * @param timeout Timeout for futures
   * @param leaderElectionService LeaderElectionService to participate in the 
leader election
   */
@@ -57,7 +57,7 @@ class TestingYarnJobManager(
     scheduler: Scheduler,
     libraryCacheManager: BlobLibraryCacheManager,
     archive: ActorRef,
-    restartStrategyFactory: RestartStrategyFactory,
+    restartStrategy: RestartStrategy,
     timeout: FiniteDuration,
     leaderElectionService: LeaderElectionService,
     submittedJobGraphs : SubmittedJobGraphStore,
@@ -71,7 +71,7 @@ class TestingYarnJobManager(
     scheduler,
     libraryCacheManager,
     archive,
-    restartStrategyFactory,
+    restartStrategy,
     timeout,
     leaderElectionService,
     submittedJobGraphs,

http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index a6d587b..314c5bd 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -33,7 +33,7 @@ import org.apache.flink.api.common.JobID
 import org.apache.flink.configuration.{Configuration => FlinkConfiguration, 
ConfigConstants}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.checkpoint.{SavepointStore, 
CheckpointRecoveryFactory}
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy
 import org.apache.flink.runtime.jobgraph.JobStatus
 import org.apache.flink.runtime.jobmanager.{SubmittedJobGraphStore, JobManager}
 import org.apache.flink.runtime.leaderelection.LeaderElectionService
@@ -75,7 +75,7 @@ import scala.util.Try
   * @param scheduler Scheduler to schedule Flink jobs
   * @param libraryCacheManager Manager to manage uploaded jar files
   * @param archive Archive for finished Flink jobs
-  * @param restartStrategyFactory Restart strategy to be used in case of a job 
recovery
+  * @param restartStrategy Restart strategy to be used in case of a job 
recovery
   * @param timeout Timeout for futures
   * @param leaderElectionService LeaderElectionService to participate in the 
leader election
   */
@@ -86,7 +86,7 @@ class YarnJobManager(
     scheduler: FlinkScheduler,
     libraryCacheManager: BlobLibraryCacheManager,
     archive: ActorRef,
-    restartStrategyFactory: RestartStrategyFactory,
+    restartStrategy: RestartStrategy,
     timeout: FiniteDuration,
     leaderElectionService: LeaderElectionService,
     submittedJobGraphs : SubmittedJobGraphStore,
@@ -100,7 +100,7 @@ class YarnJobManager(
     scheduler,
     libraryCacheManager,
     archive,
-    restartStrategyFactory,
+    restartStrategy,
     timeout,
     leaderElectionService,
     submittedJobGraphs,

Reply via email to