This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5e62da0f95d9abe35997e45dc9b0df3a9c7495cd
Author: Till Rohrmann <trohrm...@apache.org>
AuthorDate: Tue Aug 21 00:14:32 2018 +0200

    [FLINK-10011] Release JobGraph after losing leadership in JobManager
    
    The JobManager now releases its lock on all JobGraphs it has stored in
    the SubmittedJobGraphStore if the JobManager loses leadership. This ensures
    that a different JobManager can delete these jobs after it has recovered
    them and reached a globally terminal state. This is especially important
    when using stand-by JobManagers where a former leader might still be
    connected to ZooKeeper and, thus, keeping all ephemeral nodes/locks.
---
 .../org/apache/flink/runtime/akka/ActorUtils.java  |  10 ++
 .../flink/runtime/jobmanager/JobManager.scala      |  39 +++--
 .../flink/runtime/dispatcher/DispatcherHATest.java |   9 +-
 .../flink/runtime/jobmanager/JobManagerTest.java   |   3 +-
 .../jobmanager/ZooKeeperHAJobManagerTest.java      | 180 +++++++++++++++++++++
 .../testingUtils/TestingJobManagerLike.scala       |   8 +
 .../testingUtils/TestingJobManagerMessages.scala   |   3 +
 7 files changed, 231 insertions(+), 21 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java
index f2f9059..9a99281 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java
@@ -19,9 +19,11 @@
 package org.apache.flink.runtime.akka;
 
 import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
 
 import akka.actor.ActorRef;
 import akka.actor.Kill;
+import akka.actor.PoisonPill;
 import akka.pattern.Patterns;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -85,5 +87,13 @@ public class ActorUtils {
                return FutureUtils.completeAll(terminationFutures);
        }
 
+       public static void stopActor(AkkaActorGateway akkaActorGateway) {
+               stopActor(akkaActorGateway.actor());
+       }
+
+       public static void stopActor(ActorRef actorRef) {
+               actorRef.tell(PoisonPill.getInstance(), ActorRef.noSender());
+       }
+
        private ActorUtils() {}
 }
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 4f0709e..c588ecc 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1728,21 +1728,22 @@ class JobManager(
     val futureOption = currentJobs.remove(jobID) match {
       case Some((eg, _)) =>
         val cleanUpFuture: Future[Unit] = Future {
-          val cleanupHABlobs = if (removeJobFromStateBackend) {
-            try {
+          val cleanupHABlobs = try {
+            if (removeJobFromStateBackend) {
               // ...otherwise, we can have lingering resources when there is a 
 concurrent shutdown
               // and the ZooKeeper client is closed. Not removing the job 
immediately allow the
               // shutdown to release all resources.
               submittedJobGraphs.removeJobGraph(jobID)
               true
-            } catch {
-              case t: Throwable => {
-                log.warn(s"Could not remove submitted job graph $jobID.", t)
-                false
-              }
+            } else {
+              submittedJobGraphs.releaseJobGraph(jobID)
+              false
+            }
+          } catch {
+            case t: Throwable => {
+              log.warn(s"Could not remove submitted job graph $jobID.", t)
+              false
             }
-          } else {
-            false
           }
 
           blobServer.cleanupJob(jobID, cleanupHABlobs)
@@ -1777,19 +1778,23 @@ class JobManager(
     */
   private def cancelAndClearEverything(cause: Throwable)
     : Seq[Future[Unit]] = {
-    val futures = for ((jobID, (eg, jobInfo)) <- currentJobs) yield {
-      future {
-        eg.suspend(cause)
-        jobManagerMetricGroup.removeJob(eg.getJobID)
+
+    val futures = currentJobs.values.flatMap(
+      egJobInfo => {
+        val executionGraph = egJobInfo._1
+        val jobInfo = egJobInfo._2
+
+        executionGraph.suspend(cause)
+
+        val jobId = executionGraph.getJobID
 
         jobInfo.notifyNonDetachedClients(
           decorateMessage(
             Failure(
-              new JobExecutionException(jobID, "All jobs are cancelled and 
cleared.", cause))))
-      }(context.dispatcher)
-    }
+              new JobExecutionException(jobId, "All jobs are cancelled and 
cleared.", cause))))
 
-    currentJobs.clear()
+        removeJob(jobId, false)
+      })
 
     futures.toSeq
   }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
index 5876c5f..adf7618 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
@@ -162,9 +163,13 @@ public class DispatcherHATest extends TestLogger {
        }
 
        @Nonnull
-       private JobGraph createNonEmptyJobGraph() {
+       public static JobGraph createNonEmptyJobGraph() {
                final JobVertex noOpVertex = new JobVertex("NoOp vertex");
-               return new JobGraph(noOpVertex);
+               noOpVertex.setInvokableClass(NoOpInvokable.class);
+               final JobGraph jobGraph = new JobGraph(noOpVertex);
+               jobGraph.setAllowQueuedScheduling(true);
+
+               return jobGraph;
        }
 
        private static class HATestingDispatcher extends TestingDispatcher {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 873a4f1..c499f26 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -32,8 +32,8 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointDeclineReason;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
 import 
org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
 import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
 import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
@@ -151,7 +151,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-
 import static org.mockito.Mockito.mock;
 
 public class JobManagerTest extends TestLogger {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperHAJobManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperHAJobManagerTest.java
new file mode 100644
index 0000000..8e5b1b9
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperHAJobManagerTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.akka.ActorUtils;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.dispatcher.DispatcherHATest;
+import org.apache.flink.runtime.dispatcher.NoOpSubmittedJobGraphListener;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.testingUtils.TestingJobManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
+import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.ExtendedActorSystem;
+import akka.actor.Identify;
+import akka.actor.Terminated;
+import akka.pattern.Patterns;
+import org.apache.curator.framework.CuratorFramework;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the ZooKeeper HA service and {@link JobManager} interaction.
+ */
+public class ZooKeeperHAJobManagerTest extends TestLogger {
+
+       @ClassRule
+       public static final ZooKeeperResource ZOO_KEEPER_RESOURCE = new 
ZooKeeperResource();
+
+       @ClassRule
+       public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+       private static final FiniteDuration TIMEOUT = FiniteDuration.apply(10L, 
TimeUnit.SECONDS);
+
+       private static ActorSystem system;
+
+       @BeforeClass
+       public static void setup() {
+               system = AkkaUtils.createLocalActorSystem(new Configuration());
+       }
+
+       @AfterClass
+       public static void teardown() throws Exception {
+               final Future<Terminated> terminationFuture = system.terminate();
+               Await.ready(terminationFuture, TIMEOUT);
+       }
+
+       /**
+        * Tests that the {@link JobManager} releases all locked {@link 
JobGraph} if it loses
+        * leadership.
+        */
+       @Test
+       public void testJobGraphReleaseWhenLosingLeadership() throws Exception {
+               final Configuration configuration = new Configuration();
+               
configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
ZOO_KEEPER_RESOURCE.getConnectString());
+               
configuration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
TEMPORARY_FOLDER.newFolder().getAbsolutePath());
+
+               try (TestingHighAvailabilityServices highAvailabilityServices = 
new TestingHighAvailabilityServices()) {
+
+                       final CuratorFramework client = 
ZooKeeperUtils.startCuratorFramework(configuration);
+                       final TestingLeaderElectionService 
leaderElectionService = new TestingLeaderElectionService();
+                       
highAvailabilityServices.setJobMasterLeaderElectionService(HighAvailabilityServices.DEFAULT_JOB_ID,
 leaderElectionService);
+                       
highAvailabilityServices.setSubmittedJobGraphStore(ZooKeeperUtils.createSubmittedJobGraphs(client,
 configuration));
+                       
highAvailabilityServices.setCheckpointRecoveryFactory(new 
StandaloneCheckpointRecoveryFactory());
+
+                       final CuratorFramework otherClient = 
ZooKeeperUtils.startCuratorFramework(configuration);
+                       final ZooKeeperSubmittedJobGraphStore 
otherSubmittedJobGraphStore = 
ZooKeeperUtils.createSubmittedJobGraphs(otherClient, configuration);
+                       
otherSubmittedJobGraphStore.start(NoOpSubmittedJobGraphListener.INSTANCE);
+
+                       ActorRef jobManagerActorRef = null;
+                       try {
+                               jobManagerActorRef = 
JobManager.startJobManagerActors(
+                                       configuration,
+                                       system,
+                                       TestingUtils.defaultExecutor(),
+                                       TestingUtils.defaultExecutor(),
+                                       highAvailabilityServices,
+                                       NoOpMetricRegistry.INSTANCE,
+                                       Option.empty(),
+                                       TestingJobManager.class,
+                                       MemoryArchivist.class)._1();
+
+                               waitForActorToBeStarted(jobManagerActorRef, 
TIMEOUT);
+
+                               final ActorGateway jobManager = new 
AkkaActorGateway(jobManagerActorRef, 
HighAvailabilityServices.DEFAULT_LEADER_ID);
+
+                               
leaderElectionService.isLeader(HighAvailabilityServices.DEFAULT_LEADER_ID).get();
+
+                               final JobGraph nonEmptyJobGraph = 
DispatcherHATest.createNonEmptyJobGraph();
+
+                               final JobManagerMessages.SubmitJob 
submitJobMessage = new JobManagerMessages.SubmitJob(nonEmptyJobGraph, 
ListeningBehaviour.DETACHED);
+
+                               Await.result(jobManager.ask(submitJobMessage, 
TIMEOUT), TIMEOUT);
+
+                               Collection<JobID> jobIds = 
otherSubmittedJobGraphStore.getJobIds();
+
+                               final JobID jobId = nonEmptyJobGraph.getJobID();
+                               assertThat(jobIds, contains(jobId));
+
+                               // revoke the leadership
+                               leaderElectionService.notLeader();
+
+                               
Await.result(jobManager.ask(TestingJobManagerMessages.getWaitForBackgroundTasksToFinish(),
 TIMEOUT), TIMEOUT);
+
+                               final SubmittedJobGraph recoveredJobGraph = 
akka.serialization.JavaSerializer.currentSystem().withValue(
+                                       ((ExtendedActorSystem) system),
+                                       () -> 
otherSubmittedJobGraphStore.recoverJobGraph(jobId));
+
+                               assertThat(recoveredJobGraph, 
is(notNullValue()));
+
+                               
otherSubmittedJobGraphStore.removeJobGraph(jobId);
+
+                               jobIds = 
otherSubmittedJobGraphStore.getJobIds();
+
+                               assertThat(jobIds, not(contains(jobId)));
+                       } finally {
+                               client.close();
+                               otherClient.close();
+
+                               if (jobManagerActorRef != null) {
+                                       
ActorUtils.stopActor(jobManagerActorRef);
+                               }
+                       }
+               }
+       }
+
+       private void waitForActorToBeStarted(ActorRef jobManagerActorRef, 
FiniteDuration timeout) throws InterruptedException, 
java.util.concurrent.TimeoutException {
+               Await.ready(Patterns.ask(jobManagerActorRef, new Identify(42), 
timeout.toMillis()), timeout);
+       }
+}
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
index 0640f39..ebe4639 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
@@ -454,6 +454,14 @@ trait TestingJobManagerLike extends FlinkActor {
         val receiver = waitForNumRegisteredTaskManagers.dequeue()._2
         receiver ! Acknowledge.get()
       }
+
+    case WaitForBackgroundTasksToFinish =>
+      val future = futuresToComplete match {
+        case Some(futures) => Future.sequence(futures)
+        case None => Future.successful(Seq())
+      }
+
+      future.pipeTo(sender())
   }
 
   def checkIfAllVerticesRunning(jobID: JobID): Boolean = {
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
index c8529a9..64af056 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
@@ -59,6 +59,8 @@ object TestingJobManagerMessages {
 
   case object NotifyListeners
 
+  case object WaitForBackgroundTasksToFinish
+
   case class NotifyWhenTaskManagerTerminated(taskManager: ActorRef)
   case class TaskManagerTerminated(taskManager: ActorRef)
 
@@ -164,4 +166,5 @@ object TestingJobManagerMessages {
   def getClientConnected(): AnyRef = ClientConnected
   def getClassLoadingPropsDelivered(): AnyRef = ClassLoadingPropsDelivered
 
+  def getWaitForBackgroundTasksToFinish(): AnyRef = 
WaitForBackgroundTasksToFinish
 }

Reply via email to