Repository: flink Updated Branches: refs/heads/release-1.0 f1d34b17b -> 0708dd08b
Revert "[FLINK-3800] [jobmanager] Terminate ExecutionGraphs properly" This reverts commit 014a686ec5ea0e8809a5235fe988f828e5e70833. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0708dd08 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0708dd08 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0708dd08 Branch: refs/heads/release-1.0 Commit: 0708dd08b57d8cfca20c470ab4e909ea56cb9a38 Parents: f1d34b1 Author: Ufuk Celebi <[email protected]> Authored: Fri Apr 29 11:22:33 2016 +0200 Committer: Ufuk Celebi <[email protected]> Committed: Fri Apr 29 11:22:33 2016 +0200 ---------------------------------------------------------------------- .../runtime/executiongraph/ExecutionGraph.java | 28 +-- .../restart/FixedDelayRestartStrategy.java | 30 +-- .../restart/NoRestartStrategy.java | 17 +- .../executiongraph/restart/RestartStrategy.java | 5 - .../restart/RestartStrategyFactory.java | 28 +-- .../flink/runtime/jobmanager/JobManager.scala | 16 +- .../JobManagerLeaderElectionTest.java | 2 +- .../LeaderChangeJobRecoveryTest.java | 198 ------------------- .../LeaderChangeStateCleanupTest.java | 2 +- .../LeaderElectionRetrievalTestingCluster.java | 23 +-- .../runtime/testingUtils/TestingCluster.scala | 10 +- .../testingUtils/TestingJobManager.scala | 6 +- .../flink/yarn/TestingYarnJobManager.scala | 8 +- .../org/apache/flink/yarn/YarnJobManager.scala | 8 +- 14 files changed, 49 insertions(+), 332 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 8cb1ded..7cb83cd 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -790,14 +790,6 @@ public class ExecutionGraph implements Serializable { } public void fail(Throwable t) { - if (t instanceof UnrecoverableException) { - if (restartStrategy != null) { - // disable the restart strategy in case that we have seen a SuppressRestartsException - // it basically overrides the restart behaviour of a the root cause - restartStrategy.disable(); - } - } - while (true) { JobStatus current = state; if (current == JobStatus.FAILING || current.isTerminalState()) { @@ -1021,17 +1013,15 @@ public class ExecutionGraph implements Serializable { } } else if (current == JobStatus.FAILING) { - if (restartStrategy.canRestart() && transitionState(current, JobStatus.RESTARTING)) { - // double check in case that in the meantime a SuppressRestartsException was thrown - if (restartStrategy.canRestart()) { - restartStrategy.restart(this); - break; - } else { - fail(new Exception("ExecutionGraph went into RESTARTING state but " + - "then the restart strategy was disabled.")); - } - - } else if (!restartStrategy.canRestart() && transitionState(current, JobStatus.FAILED, failureCause)) { + boolean isRecoverable = !(failureCause instanceof UnrecoverableException); + + if (isRecoverable && restartStrategy.canRestart() && + transitionState(current, JobStatus.RESTARTING)) { + restartStrategy.restart(this); + break; + + } else if ((!isRecoverable || !restartStrategy.canRestart()) && + transitionState(current, JobStatus.FAILED, failureCause)) { postRunCleanup(); break; } http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java index 464b48e..d3c7eba 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java @@ -41,7 +41,6 @@ public class FixedDelayRestartStrategy implements RestartStrategy { private final int maxNumberRestartAttempts; private final long delayBetweenRestartAttempts; private int currentRestartAttempt; - private boolean disabled = false; public FixedDelayRestartStrategy( int maxNumberRestartAttempts, @@ -61,7 +60,7 @@ public class FixedDelayRestartStrategy implements RestartStrategy { @Override public boolean canRestart() { - return !disabled && currentRestartAttempt < maxNumberRestartAttempts; + return currentRestartAttempt < maxNumberRestartAttempts; } @Override @@ -84,11 +83,6 @@ public class FixedDelayRestartStrategy implements RestartStrategy { }, executionGraph.getExecutionContext()); } - @Override - public void disable() { - disabled = true; - } - /** * Creates a FixedDelayRestartStrategy from the given Configuration. * @@ -96,7 +90,7 @@ public class FixedDelayRestartStrategy implements RestartStrategy { * @return Initialized instance of FixedDelayRestartStrategy * @throws Exception */ - public static FixedDelayRestartStrategyFactory createFactory(Configuration configuration) throws Exception { + public static FixedDelayRestartStrategy create(Configuration configuration) throws Exception { int maxAttempts = configuration.getInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1); String timeoutString = configuration.getString( @@ -124,7 +118,7 @@ public class FixedDelayRestartStrategy implements RestartStrategy { } } - return new FixedDelayRestartStrategyFactory(maxAttempts, delay); + return new FixedDelayRestartStrategy(maxAttempts, delay); } @Override @@ -134,22 +128,4 @@ public class FixedDelayRestartStrategy implements RestartStrategy { ", delayBetweenRestartAttempts=" + delayBetweenRestartAttempts + ')'; } - - public static class FixedDelayRestartStrategyFactory extends RestartStrategyFactory { - - private static final long serialVersionUID = 6642934067762271950L; - - private final int maxAttempts; - private final long delay; - - public FixedDelayRestartStrategyFactory(int maxAttempts, long delay) { - this.maxAttempts = maxAttempts; - this.delay = delay; - } - - @Override - public RestartStrategy createRestartStrategy() { - return new FixedDelayRestartStrategy(maxAttempts, delay); - } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java index 6cc5ee4..8911a98 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java @@ -36,31 +36,18 @@ public class NoRestartStrategy implements RestartStrategy { throw new RuntimeException("NoRestartStrategy does not support restart."); } - @Override - public void disable() {} - /** * Creates a NoRestartStrategy instance. * * @param configuration Configuration object which is ignored * @return NoRestartStrategy instance */ - public static NoRestartStrategyFactory createFactory(Configuration configuration) { - return new NoRestartStrategyFactory(); + public static NoRestartStrategy create(Configuration configuration) { + return new NoRestartStrategy(); } @Override public String toString() { return "NoRestartStrategy"; } - - public static class NoRestartStrategyFactory extends RestartStrategyFactory { - - private static final long serialVersionUID = -1809462525812787862L; - - @Override - public RestartStrategy createRestartStrategy() { - return new NoRestartStrategy(); - } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java index c9e6277..2880c01 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java @@ -38,9 +38,4 @@ public interface RestartStrategy { * @param executionGraph The ExecutionGraph to be restarted */ void restart(ExecutionGraph executionGraph); - - /** - * Disables the restart strategy. - */ - void disable(); } http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java index e58d775..68d114e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java @@ -25,21 +25,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.duration.Duration; -import java.io.Serializable; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -public abstract class RestartStrategyFactory implements Serializable { - private static final long serialVersionUID = 7320252552640522191L; - +public class RestartStrategyFactory { private static final Logger LOG = LoggerFactory.getLogger(RestartStrategyFactory.class); - private static final String CREATE_METHOD = "createFactory"; - - /** - * Factory method to create a restart strategy - * @return The created restart strategy - */ - public abstract RestartStrategy createRestartStrategy(); + private static final String CREATE_METHOD = "create"; /** * Creates a {@link RestartStrategy} instance from the given {@link org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration}. @@ -67,10 +58,11 @@ public abstract class RestartStrategyFactory implements Serializable { /** * Creates a {@link RestartStrategy} instance from the given {@link Configuration}. * + * @param configuration Configuration object containing the configuration values. * @return RestartStrategy instance * @throws Exception which indicates that the RestartStrategy could not be instantiated. */ - public static RestartStrategyFactory createRestartStrategyFactory(Configuration configuration) throws Exception { + public static RestartStrategy createFromConfig(Configuration configuration) throws Exception { String restartStrategyName = configuration.getString(ConfigConstants.RESTART_STRATEGY, "none").toLowerCase(); switch (restartStrategyName) { @@ -100,16 +92,16 @@ public abstract class RestartStrategyFactory implements Serializable { } if (numberExecutionRetries > 0 && delay >= 0) { - return new FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(numberExecutionRetries, delay); + return new FixedDelayRestartStrategy(numberExecutionRetries, delay); } else { - return NoRestartStrategy.createFactory(configuration); + return NoRestartStrategy.create(configuration); } case "off": case "disable": - return NoRestartStrategy.createFactory(configuration); + return NoRestartStrategy.create(configuration); case "fixeddelay": case "fixed-delay": - return FixedDelayRestartStrategy.createFactory(configuration); + return FixedDelayRestartStrategy.create(configuration); default: try { Class<?> clazz = Class.forName(restartStrategyName); @@ -121,7 +113,7 @@ public abstract class RestartStrategyFactory implements Serializable { Object result = method.invoke(null, configuration); if (result != null) { - return (RestartStrategyFactory) result; + return (RestartStrategy) result; } } } @@ -136,7 +128,7 @@ public abstract class RestartStrategyFactory implements Serializable { } // fallback in case of an error - return NoRestartStrategy.createFactory(configuration); + return NoRestartStrategy.create(configuration); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 6391c82..cee5606 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -28,7 +28,6 @@ import akka.actor._ import akka.pattern.ask import grizzled.slf4j.Logger -import org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration import org.apache.flink.api.common.{ExecutionConfig, JobID} import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration} @@ -112,7 +111,7 @@ class JobManager( protected val scheduler: FlinkScheduler, protected val libraryCacheManager: BlobLibraryCacheManager, protected val archive: ActorRef, - protected val restartStrategyFactory: RestartStrategyFactory, + protected val defaultRestartStrategy: RestartStrategy, protected val timeout: FiniteDuration, protected val leaderElectionService: LeaderElectionService, protected val submittedJobGraphs : SubmittedJobGraphStore, @@ -201,7 +200,7 @@ class JobManager( log.info(s"Stopping JobManager $getAddress.") val newFuturesToComplete = cancelAndClearEverything( - new UnrecoverableException(new Exception("The JobManager is shutting down.")), + new Exception("The JobManager is shutting down."), removeJobFromStateBackend = true) implicit val executionContext = context.dispatcher @@ -298,7 +297,7 @@ class JobManager( log.info(s"JobManager ${self.path.toSerializationFormat} was revoked leadership.") val newFuturesToComplete = cancelAndClearEverything( - new UnrecoverableException(new Exception("JobManager is no longer the leader.")), + new Exception("JobManager is no longer the leader."), removeJobFromStateBackend = false) futuresToComplete = Some(futuresToComplete.getOrElse(Seq()) ++ newFuturesToComplete) @@ -951,7 +950,7 @@ class JobManager( val restartStrategy = Option(jobGraph.getRestartStrategyConfiguration()) .map(RestartStrategyFactory.createRestartStrategy(_)) match { case Some(strategy) => strategy - case None => restartStrategyFactory.createRestartStrategy() + case None => defaultRestartStrategy } log.info(s"Using restart strategy $restartStrategy for $jobId.") @@ -1495,7 +1494,7 @@ class JobManager( * @param cause Cause for the cancelling. */ private def cancelAndClearEverything( - cause: UnrecoverableException, + cause: Throwable, removeJobFromStateBackend: Boolean) : Seq[Future[Unit]] = { val futures = for ((jobID, (eg, jobInfo)) <- currentJobs) yield { @@ -2097,7 +2096,7 @@ object JobManager { InstanceManager, FlinkScheduler, BlobLibraryCacheManager, - RestartStrategyFactory, + RestartStrategy, FiniteDuration, // timeout Int, // number of archived jobs LeaderElectionService, @@ -2113,7 +2112,8 @@ object JobManager { ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL, ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000 - val restartStrategy = RestartStrategyFactory.createRestartStrategyFactory(configuration) + val restartStrategy = RestartStrategyFactory + .createFromConfig(configuration) val archiveCount = configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_ARCHIVE_COUNT, ConfigConstants.DEFAULT_JOB_MANAGER_WEB_ARCHIVE_COUNT) http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java index afc46a7..fe35c0d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java @@ -192,7 +192,7 @@ public class JobManagerLeaderElectionTest extends TestLogger { new Scheduler(TestingUtils.defaultExecutionContext()), new BlobLibraryCacheManager(new BlobServer(configuration), 10L), ActorRef.noSender(), - new NoRestartStrategy.NoRestartStrategyFactory(), + new NoRestartStrategy(), AkkaUtils.getDefaultTimeout(), leaderElectionService, submittedJobGraphStore, http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java deleted file mode 100644 index b13ae81..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java +++ /dev/null @@ -1,198 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.leaderelection; - -import akka.actor.ActorRef; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.executiongraph.ExecutionGraph; -import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.instance.AkkaActorGateway; -import org.apache.flink.runtime.jobgraph.DistributionPattern; -import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobgraph.JobVertex; -import org.apache.flink.runtime.jobmanager.Tasks; -import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; -import org.apache.flink.runtime.messages.ExecutionGraphMessages; -import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; -import org.apache.flink.util.TestLogger; -import org.junit.Before; -import org.junit.Test; -import scala.concurrent.Await; -import scala.concurrent.ExecutionContext; -import scala.concurrent.Future; -import scala.concurrent.Promise; -import scala.concurrent.duration.FiniteDuration; - -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import static org.junit.Assert.assertTrue; - -public class LeaderChangeJobRecoveryTest extends TestLogger { - - private static FiniteDuration timeout = FiniteDuration.apply(30, TimeUnit.SECONDS); - - private int numTMs = 1; - private int numSlotsPerTM = 1; - private int parallelism = numTMs * numSlotsPerTM; - - private Configuration configuration; - private LeaderElectionRetrievalTestingCluster cluster = null; - private JobGraph job = createBlockingJob(parallelism); - - @Before - public void before() throws TimeoutException, InterruptedException { - Tasks.BlockingOnceReceiver$.MODULE$.blocking_$eq(true); - - configuration = new Configuration(); - - configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1); - configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs); - configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM); - - cluster = new LeaderElectionRetrievalTestingCluster(configuration, true, false, new FixedDelayRestartStrategy(9999, 100)); - cluster.start(false); - - // wait for actors to be alive so that they have started their leader retrieval service - cluster.waitForActorsToBeAlive(); - } - - /** - * Tests that the job is not restarted or at least terminates eventually in case that the - * JobManager loses its leadership. - * - * @throws Exception - */ - @Test - public void testNotRestartedWhenLosingLeadership() throws Exception { - UUID leaderSessionID = UUID.randomUUID(); - - cluster.grantLeadership(0, leaderSessionID); - cluster.notifyRetrievalListeners(0, leaderSessionID); - - cluster.waitForTaskManagersToBeRegistered(timeout); - - cluster.submitJobDetached(job); - - ActorGateway jm = cluster.getLeaderGateway(timeout); - - Future<Object> wait = jm.ask(new TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished(job.getJobID()), timeout); - - Await.ready(wait, timeout); - - Future<Object> futureExecutionGraph = jm.ask(new TestingJobManagerMessages.RequestExecutionGraph(job.getJobID()), timeout); - - TestingJobManagerMessages.ResponseExecutionGraph responseExecutionGraph = - (TestingJobManagerMessages.ResponseExecutionGraph) Await.result(futureExecutionGraph, timeout); - - assertTrue(responseExecutionGraph instanceof TestingJobManagerMessages.ExecutionGraphFound); - - ExecutionGraph executionGraph = ((TestingJobManagerMessages.ExecutionGraphFound) responseExecutionGraph).executionGraph(); - - TestActorGateway testActorGateway = new TestActorGateway(); - - executionGraph.registerJobStatusListener(testActorGateway); - - cluster.revokeLeadership(); - - Future<Boolean> hasReachedTerminalState = testActorGateway.hasReachedTerminalState(); - - assertTrue("The job should have reached a terminal state.", Await.result(hasReachedTerminalState, timeout)); - } - - public JobGraph createBlockingJob(int parallelism) { - Tasks.BlockingOnceReceiver$.MODULE$.blocking_$eq(true); - - JobVertex sender = new JobVertex("sender"); - JobVertex receiver = new JobVertex("receiver"); - - sender.setInvokableClass(Tasks.Sender.class); - receiver.setInvokableClass(Tasks.BlockingOnceReceiver.class); - - sender.setParallelism(parallelism); - receiver.setParallelism(parallelism); - - receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE); - - SlotSharingGroup slotSharingGroup = new SlotSharingGroup(); - sender.setSlotSharingGroup(slotSharingGroup); - receiver.setSlotSharingGroup(slotSharingGroup); - - return new JobGraph("Blocking test job", sender, receiver); - } - - public static class TestActorGateway implements ActorGateway { - - private static final long serialVersionUID = -736146686160538227L; - private transient Promise<Boolean> terminalState = new scala.concurrent.impl.Promise.DefaultPromise<>(); - - public Future<Boolean> hasReachedTerminalState() { - return terminalState.future(); - } - - @Override - public Future<Object> ask(Object message, FiniteDuration timeout) { - return null; - } - - @Override - public void tell(Object message) { - this.tell(message, new AkkaActorGateway(ActorRef.noSender(), null)); - } - - @Override - public void tell(Object message, ActorGateway sender) { - if (message instanceof ExecutionGraphMessages.JobStatusChanged) { - ExecutionGraphMessages.JobStatusChanged jobStatusChanged = (ExecutionGraphMessages.JobStatusChanged) message; - - if (jobStatusChanged.newJobStatus().isTerminalState()) { - terminalState.success(true); - } - } - } - - @Override - public void forward(Object message, ActorGateway sender) { - - } - - @Override - public Future<Object> retry(Object message, int numberRetries, FiniteDuration timeout, ExecutionContext executionContext) { - return null; - } - - @Override - public String path() { - return null; - } - - @Override - public ActorRef actor() { - return null; - } - - @Override - public UUID leaderSessionID() { - return null; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java index 7876ff7..c490a64 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeStateCleanupTest.java @@ -67,7 +67,7 @@ public class LeaderChangeStateCleanupTest extends TestLogger { configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs); configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM); - cluster = new LeaderElectionRetrievalTestingCluster(configuration, true, false, null); + cluster = new LeaderElectionRetrievalTestingCluster(configuration, true, false); cluster.start(false); // TaskManagers don't have to register at the JobManager cluster.waitForActorsToBeAlive(); // we only wait until all actors are alive http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java index cd89fa6..c8cf868 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.leaderelection; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.testingUtils.TestingCluster; import scala.Option; @@ -39,7 +38,6 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster { private final Configuration userConfiguration; private final boolean useSingleActorSystem; - private final RestartStrategy restartStrategy; public List<TestingLeaderElectionService> leaderElectionServices; public List<TestingLeaderRetrievalService> leaderRetrievalServices; @@ -49,8 +47,7 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster { public LeaderElectionRetrievalTestingCluster( Configuration userConfiguration, boolean singleActorSystem, - boolean synchronousDispatcher, - RestartStrategy restartStrategy) { + boolean synchronousDispatcher) { super(userConfiguration, singleActorSystem, synchronousDispatcher); this.userConfiguration = userConfiguration; @@ -58,8 +55,6 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster { leaderElectionServices = new ArrayList<TestingLeaderElectionService>(); leaderRetrievalServices = new ArrayList<TestingLeaderRetrievalService>(); - - this.restartStrategy = restartStrategy; } @Override @@ -95,15 +90,6 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster { ConfigConstants.DEFAULT_LOCAL_NUMBER_JOB_MANAGER); } - @Override - public RestartStrategy getRestartStrategy(RestartStrategy other) { - if (this.restartStrategy != null) { - return this.restartStrategy; - } else { - return other; - } - } - public void grantLeadership(int index, UUID leaderSessionID) { if(leaderIndex >= 0) { // first revoke leadership @@ -123,11 +109,4 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster { service.notifyListener(address, leaderSessionID); } } - - public void revokeLeadership() { - if (leaderIndex >= 0) { - leaderElectionServices.get(leaderIndex).notLeader(); - leaderIndex = -1; - } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala index 8f321bb..c72eb50 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala @@ -24,10 +24,10 @@ import akka.pattern.ask import akka.actor.{ActorRef, Props, ActorSystem} import akka.testkit.CallingThreadDispatcher import org.apache.flink.configuration.{ConfigConstants, Configuration} -import org.apache.flink.runtime.executiongraph.restart.RestartStrategy import org.apache.flink.runtime.jobmanager.JobManager import org.apache.flink.runtime.leaderelection.LeaderElectionService import org.apache.flink.runtime.minicluster.FlinkMiniCluster +import org.apache.flink.util.NetUtils import org.apache.flink.runtime.taskmanager.TaskManager import org.apache.flink.runtime.testingUtils.TestingMessages.Alive @@ -96,7 +96,7 @@ class TestingCluster( instanceManager, scheduler, libraryCacheManager, - restartStrategyFactory, + restartStrategy, timeout, archiveCount, leaderElectionService, @@ -118,7 +118,7 @@ class TestingCluster( scheduler, libraryCacheManager, archive, - restartStrategyFactory, + restartStrategy, timeout, leaderElectionService, submittedJobsGraphs, @@ -155,10 +155,6 @@ class TestingCluster( None } - def getRestartStrategy(restartStrategy: RestartStrategy) = { - restartStrategy - } - @throws(classOf[TimeoutException]) @throws(classOf[InterruptedException]) def waitForTaskManagersToBeAlive(): Unit = { http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala index e854b13..53867e0 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala @@ -23,7 +23,7 @@ import akka.actor.ActorRef import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.checkpoint.{SavepointStore, CheckpointRecoveryFactory} import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager -import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy import org.apache.flink.runtime.instance.InstanceManager import org.apache.flink.runtime.jobmanager.scheduler.Scheduler import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore} @@ -44,7 +44,7 @@ class TestingJobManager( scheduler: Scheduler, libraryCacheManager: BlobLibraryCacheManager, archive: ActorRef, - restartStrategyFactory: RestartStrategyFactory, + restartStrategy: RestartStrategy, timeout: FiniteDuration, leaderElectionService: LeaderElectionService, submittedJobGraphs : SubmittedJobGraphStore, @@ -58,7 +58,7 @@ class TestingJobManager( scheduler, libraryCacheManager, archive, - restartStrategyFactory, + restartStrategy, timeout, leaderElectionService, submittedJobGraphs, http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala index 717d631..4e6b745 100644 --- a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala +++ b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala @@ -24,7 +24,7 @@ import akka.actor.ActorRef import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.checkpoint.{SavepointStore, CheckpointRecoveryFactory} import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager -import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy import org.apache.flink.runtime.instance.InstanceManager import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore import org.apache.flink.runtime.jobmanager.scheduler.Scheduler @@ -46,7 +46,7 @@ import scala.concurrent.duration.FiniteDuration * @param scheduler Scheduler to schedule Flink jobs * @param libraryCacheManager Manager to manage uploaded jar files * @param archive Archive for finished Flink jobs - * @param restartStrategyFactory Default restart strategy for job restarts + * @param restartStrategy Default restart strategy for job restarts * @param timeout Timeout for futures * @param leaderElectionService LeaderElectionService to participate in the leader election */ @@ -57,7 +57,7 @@ class TestingYarnJobManager( scheduler: Scheduler, libraryCacheManager: BlobLibraryCacheManager, archive: ActorRef, - restartStrategyFactory: RestartStrategyFactory, + restartStrategy: RestartStrategy, timeout: FiniteDuration, leaderElectionService: LeaderElectionService, submittedJobGraphs : SubmittedJobGraphStore, @@ -71,7 +71,7 @@ class TestingYarnJobManager( scheduler, libraryCacheManager, archive, - restartStrategyFactory, + restartStrategy, timeout, leaderElectionService, submittedJobGraphs, http://git-wip-us.apache.org/repos/asf/flink/blob/0708dd08/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala index a6d587b..314c5bd 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala @@ -33,7 +33,7 @@ import org.apache.flink.api.common.JobID import org.apache.flink.configuration.{Configuration => FlinkConfiguration, ConfigConstants} import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.checkpoint.{SavepointStore, CheckpointRecoveryFactory} -import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy import org.apache.flink.runtime.jobgraph.JobStatus import org.apache.flink.runtime.jobmanager.{SubmittedJobGraphStore, JobManager} import org.apache.flink.runtime.leaderelection.LeaderElectionService @@ -75,7 +75,7 @@ import scala.util.Try * @param scheduler Scheduler to schedule Flink jobs * @param libraryCacheManager Manager to manage uploaded jar files * @param archive Archive for finished Flink jobs - * @param restartStrategyFactory Restart strategy to be used in case of a job recovery + * @param restartStrategy Restart strategy to be used in case of a job recovery * @param timeout Timeout for futures * @param leaderElectionService LeaderElectionService to participate in the leader election */ @@ -86,7 +86,7 @@ class YarnJobManager( scheduler: FlinkScheduler, libraryCacheManager: BlobLibraryCacheManager, archive: ActorRef, - restartStrategyFactory: RestartStrategyFactory, + restartStrategy: RestartStrategy, timeout: FiniteDuration, leaderElectionService: LeaderElectionService, submittedJobGraphs : SubmittedJobGraphStore, @@ -100,7 +100,7 @@ class YarnJobManager( scheduler, libraryCacheManager, archive, - restartStrategyFactory, + restartStrategy, timeout, leaderElectionService, submittedJobGraphs,
