This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.6 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5e62da0f95d9abe35997e45dc9b0df3a9c7495cd Author: Till Rohrmann <trohrm...@apache.org> AuthorDate: Tue Aug 21 00:14:32 2018 +0200 [FLINK-10011] Release JobGraph after losing leadership in JobManager The JobManager now releases its lock on all JobGraphs it has stored in the SubmittedJobGraphStore if the JobManager loses leadership. This ensures that a different JobManager can delete these jobs after it has recovered them and reached a globally terminal state. This is especially important when using stand-by JobManagers where a former leader might still be connected to ZooKeeper and, thus, keeping all ephemeral nodes/locks. --- .../org/apache/flink/runtime/akka/ActorUtils.java | 10 ++ .../flink/runtime/jobmanager/JobManager.scala | 39 +++-- .../flink/runtime/dispatcher/DispatcherHATest.java | 9 +- .../flink/runtime/jobmanager/JobManagerTest.java | 3 +- .../jobmanager/ZooKeeperHAJobManagerTest.java | 180 +++++++++++++++++++++ .../testingUtils/TestingJobManagerLike.scala | 8 + .../testingUtils/TestingJobManagerMessages.scala | 3 + 7 files changed, 231 insertions(+), 21 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java index f2f9059..9a99281 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java @@ -19,9 +19,11 @@ package org.apache.flink.runtime.akka; import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.instance.AkkaActorGateway; import akka.actor.ActorRef; import akka.actor.Kill; +import akka.actor.PoisonPill; import akka.pattern.Patterns; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,5 +87,13 @@ public class ActorUtils { return FutureUtils.completeAll(terminationFutures); } + public static void stopActor(AkkaActorGateway akkaActorGateway) { + stopActor(akkaActorGateway.actor()); + } + + public static void stopActor(ActorRef actorRef) { + actorRef.tell(PoisonPill.getInstance(), ActorRef.noSender()); + } + private ActorUtils() {} } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 4f0709e..c588ecc 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -1728,21 +1728,22 @@ class JobManager( val futureOption = currentJobs.remove(jobID) match { case Some((eg, _)) => val cleanUpFuture: Future[Unit] = Future { - val cleanupHABlobs = if (removeJobFromStateBackend) { - try { + val cleanupHABlobs = try { + if (removeJobFromStateBackend) { // ...otherwise, we can have lingering resources when there is a concurrent shutdown // and the ZooKeeper client is closed. Not removing the job immediately allow the // shutdown to release all resources. submittedJobGraphs.removeJobGraph(jobID) true - } catch { - case t: Throwable => { - log.warn(s"Could not remove submitted job graph $jobID.", t) - false - } + } else { + submittedJobGraphs.releaseJobGraph(jobID) + false + } + } catch { + case t: Throwable => { + log.warn(s"Could not remove submitted job graph $jobID.", t) + false } - } else { - false } blobServer.cleanupJob(jobID, cleanupHABlobs) @@ -1777,19 +1778,23 @@ class JobManager( */ private def cancelAndClearEverything(cause: Throwable) : Seq[Future[Unit]] = { - val futures = for ((jobID, (eg, jobInfo)) <- currentJobs) yield { - future { - eg.suspend(cause) - jobManagerMetricGroup.removeJob(eg.getJobID) + + val futures = currentJobs.values.flatMap( + egJobInfo => { + val executionGraph = egJobInfo._1 + val jobInfo = egJobInfo._2 + + executionGraph.suspend(cause) + + val jobId = executionGraph.getJobID jobInfo.notifyNonDetachedClients( decorateMessage( Failure( - new JobExecutionException(jobID, "All jobs are cancelled and cleared.", cause)))) - }(context.dispatcher) - } + new JobExecutionException(jobId, "All jobs are cancelled and cleared.", cause)))) - currentJobs.clear() + removeJob(jobId, false) + }) futures.toSeq } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java index 5876c5f..adf7618 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java @@ -41,6 +41,7 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.runtime.util.TestingFatalErrorHandler; import org.apache.flink.util.Preconditions; import org.apache.flink.util.TestLogger; @@ -162,9 +163,13 @@ public class DispatcherHATest extends TestLogger { } @Nonnull - private JobGraph createNonEmptyJobGraph() { + public static JobGraph createNonEmptyJobGraph() { final JobVertex noOpVertex = new JobVertex("NoOp vertex"); - return new JobGraph(noOpVertex); + noOpVertex.setInvokableClass(NoOpInvokable.class); + final JobGraph jobGraph = new JobGraph(noOpVertex); + jobGraph.setAllowQueuedScheduling(true); + + return jobGraph; } private static class HATestingDispatcher extends TestingDispatcher { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java index 873a4f1..c499f26 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java @@ -32,8 +32,8 @@ import org.apache.flink.runtime.checkpoint.CheckpointDeclineReason; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.CheckpointOptions; -import org.apache.flink.runtime.checkpoint.CheckpointType; import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy; +import org.apache.flink.runtime.checkpoint.CheckpointType; import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted; import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager; import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful; @@ -151,7 +151,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; - import static org.mockito.Mockito.mock; public class JobManagerTest extends TestLogger { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperHAJobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperHAJobManagerTest.java new file mode 100644 index 0000000..8e5b1b9 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperHAJobManagerTest.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.akka.ActorUtils; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.akka.ListeningBehaviour; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; +import org.apache.flink.runtime.dispatcher.DispatcherHATest; +import org.apache.flink.runtime.dispatcher.NoOpSubmittedJobGraphListener; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.AkkaActorGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.metrics.NoOpMetricRegistry; +import org.apache.flink.runtime.testingUtils.TestingJobManager; +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.util.ZooKeeperUtils; +import org.apache.flink.runtime.zookeeper.ZooKeeperResource; +import org.apache.flink.util.TestLogger; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.ExtendedActorSystem; +import akka.actor.Identify; +import akka.actor.Terminated; +import akka.pattern.Patterns; +import org.apache.curator.framework.CuratorFramework; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.util.Collection; +import java.util.concurrent.TimeUnit; + +import scala.Option; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.assertThat; + +/** + * Tests for the ZooKeeper HA service and {@link JobManager} interaction. + */ +public class ZooKeeperHAJobManagerTest extends TestLogger { + + @ClassRule + public static final ZooKeeperResource ZOO_KEEPER_RESOURCE = new ZooKeeperResource(); + + @ClassRule + public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + private static final FiniteDuration TIMEOUT = FiniteDuration.apply(10L, TimeUnit.SECONDS); + + private static ActorSystem system; + + @BeforeClass + public static void setup() { + system = AkkaUtils.createLocalActorSystem(new Configuration()); + } + + @AfterClass + public static void teardown() throws Exception { + final Future<Terminated> terminationFuture = system.terminate(); + Await.ready(terminationFuture, TIMEOUT); + } + + /** + * Tests that the {@link JobManager} releases all locked {@link JobGraph} if it loses + * leadership. + */ + @Test + public void testJobGraphReleaseWhenLosingLeadership() throws Exception { + final Configuration configuration = new Configuration(); + configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, ZOO_KEEPER_RESOURCE.getConnectString()); + configuration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, TEMPORARY_FOLDER.newFolder().getAbsolutePath()); + + try (TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices()) { + + final CuratorFramework client = ZooKeeperUtils.startCuratorFramework(configuration); + final TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService(); + highAvailabilityServices.setJobMasterLeaderElectionService(HighAvailabilityServices.DEFAULT_JOB_ID, leaderElectionService); + highAvailabilityServices.setSubmittedJobGraphStore(ZooKeeperUtils.createSubmittedJobGraphs(client, configuration)); + highAvailabilityServices.setCheckpointRecoveryFactory(new StandaloneCheckpointRecoveryFactory()); + + final CuratorFramework otherClient = ZooKeeperUtils.startCuratorFramework(configuration); + final ZooKeeperSubmittedJobGraphStore otherSubmittedJobGraphStore = ZooKeeperUtils.createSubmittedJobGraphs(otherClient, configuration); + otherSubmittedJobGraphStore.start(NoOpSubmittedJobGraphListener.INSTANCE); + + ActorRef jobManagerActorRef = null; + try { + jobManagerActorRef = JobManager.startJobManagerActors( + configuration, + system, + TestingUtils.defaultExecutor(), + TestingUtils.defaultExecutor(), + highAvailabilityServices, + NoOpMetricRegistry.INSTANCE, + Option.empty(), + TestingJobManager.class, + MemoryArchivist.class)._1(); + + waitForActorToBeStarted(jobManagerActorRef, TIMEOUT); + + final ActorGateway jobManager = new AkkaActorGateway(jobManagerActorRef, HighAvailabilityServices.DEFAULT_LEADER_ID); + + leaderElectionService.isLeader(HighAvailabilityServices.DEFAULT_LEADER_ID).get(); + + final JobGraph nonEmptyJobGraph = DispatcherHATest.createNonEmptyJobGraph(); + + final JobManagerMessages.SubmitJob submitJobMessage = new JobManagerMessages.SubmitJob(nonEmptyJobGraph, ListeningBehaviour.DETACHED); + + Await.result(jobManager.ask(submitJobMessage, TIMEOUT), TIMEOUT); + + Collection<JobID> jobIds = otherSubmittedJobGraphStore.getJobIds(); + + final JobID jobId = nonEmptyJobGraph.getJobID(); + assertThat(jobIds, contains(jobId)); + + // revoke the leadership + leaderElectionService.notLeader(); + + Await.result(jobManager.ask(TestingJobManagerMessages.getWaitForBackgroundTasksToFinish(), TIMEOUT), TIMEOUT); + + final SubmittedJobGraph recoveredJobGraph = akka.serialization.JavaSerializer.currentSystem().withValue( + ((ExtendedActorSystem) system), + () -> otherSubmittedJobGraphStore.recoverJobGraph(jobId)); + + assertThat(recoveredJobGraph, is(notNullValue())); + + otherSubmittedJobGraphStore.removeJobGraph(jobId); + + jobIds = otherSubmittedJobGraphStore.getJobIds(); + + assertThat(jobIds, not(contains(jobId))); + } finally { + client.close(); + otherClient.close(); + + if (jobManagerActorRef != null) { + ActorUtils.stopActor(jobManagerActorRef); + } + } + } + } + + private void waitForActorToBeStarted(ActorRef jobManagerActorRef, FiniteDuration timeout) throws InterruptedException, java.util.concurrent.TimeoutException { + Await.ready(Patterns.ask(jobManagerActorRef, new Identify(42), timeout.toMillis()), timeout); + } +} diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala index 0640f39..ebe4639 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala @@ -454,6 +454,14 @@ trait TestingJobManagerLike extends FlinkActor { val receiver = waitForNumRegisteredTaskManagers.dequeue()._2 receiver ! Acknowledge.get() } + + case WaitForBackgroundTasksToFinish => + val future = futuresToComplete match { + case Some(futures) => Future.sequence(futures) + case None => Future.successful(Seq()) + } + + future.pipeTo(sender()) } def checkIfAllVerticesRunning(jobID: JobID): Boolean = { diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala index c8529a9..64af056 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala @@ -59,6 +59,8 @@ object TestingJobManagerMessages { case object NotifyListeners + case object WaitForBackgroundTasksToFinish + case class NotifyWhenTaskManagerTerminated(taskManager: ActorRef) case class TaskManagerTerminated(taskManager: ActorRef) @@ -164,4 +166,5 @@ object TestingJobManagerMessages { def getClientConnected(): AnyRef = ClientConnected def getClassLoadingPropsDelivered(): AnyRef = ClassLoadingPropsDelivered + def getWaitForBackgroundTasksToFinish(): AnyRef = WaitForBackgroundTasksToFinish }