Repository: flink Updated Branches: refs/heads/master 444315a12 -> 259a3a556
http://git-wip-us.apache.org/repos/asf/flink/blob/259a3a55/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala index 9640fcd..df4f95a 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala @@ -26,7 +26,7 @@ import org.apache.flink.runtime.execution.ExecutionState import org.apache.flink.runtime.jobgraph.JobStatus import org.apache.flink.runtime.jobmanager.JobManager import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged -import org.apache.flink.runtime.messages.JobManagerMessages.GrantLeadership +import org.apache.flink.runtime.messages.JobManagerMessages.{GrantLeadership, RegisterJobClient} import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect} import org.apache.flink.runtime.messages.RegistrationMessages.RegisterTaskManager import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat @@ -67,6 +67,8 @@ trait TestingJobManagerLike extends FlinkActor { override def compare(x: (Int, ActorRef), y: (Int, ActorRef)): Int = y._1 - x._1 }) + val waitForClient = scala.collection.mutable.HashSet[ActorRef]() + val waitForShutdown = scala.collection.mutable.HashSet[ActorRef]() var disconnectDisabled = false @@ -328,6 +330,14 @@ trait TestingJobManagerLike extends FlinkActor { waitForLeader.clear() + case NotifyWhenClientConnects => + waitForClient += sender() + sender() ! true + + case msg: RegisterJobClient => + super.handleMessage(msg) + waitForClient.foreach(_ ! true) + case NotifyWhenAtLeastNumTaskManagerAreRegistered(numRegisteredTaskManager) => if (that.instanceManager.getNumberOfRegisteredTaskManagers >= numRegisteredTaskManager) { // there are already at least numRegisteredTaskManager registered --> send Acknowledge http://git-wip-us.apache.org/repos/asf/flink/blob/259a3a55/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala index a411c8b..a88ed43 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala @@ -83,6 +83,11 @@ object TestingJobManagerMessages { case object NotifyWhenLeader /** + * Notifies the sender when the [[TestingJobManager]] receives new clients for jobs + */ + case object NotifyWhenClientConnects + + /** * Registers to be notified by an [[org.apache.flink.runtime.messages.Messages.Acknowledge]] * message when at least numRegisteredTaskManager have registered at the JobManager. * @@ -111,6 +116,7 @@ object TestingJobManagerMessages { case class ResponseSavepoint(savepoint: Savepoint) def getNotifyWhenLeader(): AnyRef = NotifyWhenLeader + def getNotifyWhenClientConnects(): AnyRef = NotifyWhenClientConnects def getDisablePostStop(): AnyRef = DisablePostStop } http://git-wip-us.apache.org/repos/asf/flink/blob/259a3a55/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java index 073164c0..2adf7eb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java @@ -25,17 +25,17 @@ import akka.actor.Props; import akka.pattern.Patterns; import akka.testkit.JavaTestKit; import akka.util.Timeout; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.FlinkUntypedActor; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; import org.apache.flink.runtime.messages.JobClientMessages; -import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.messages.JobClientMessages.AttachToJobAndWait; import org.apache.flink.runtime.messages.Messages; import org.apache.flink.util.TestLogger; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import scala.concurrent.Await; @@ -45,6 +45,8 @@ import scala.concurrent.duration.FiniteDuration; import java.util.UUID; import java.util.concurrent.TimeUnit; +import static org.apache.flink.runtime.messages.JobManagerMessages.*; + public class JobClientActorTest extends TestLogger { private static ActorSystem system; @@ -62,8 +64,8 @@ public class JobClientActorTest extends TestLogger { } /** Tests that a {@link JobClientActorSubmissionTimeoutException} is thrown when the job cannot - * be submitted by the JobClientActor. This is here the case, because the started JobManager - * never replies to a SubmitJob message. + * be submitted by the JobSubmissionClientActor. This is here the case, because the started JobManager + * never replies to a {@link SubmitJob} message. * * @throws Exception */ @@ -84,7 +86,7 @@ public class JobClientActorTest extends TestLogger { leaderSessionID ); - Props jobClientActorProps = JobClientActor.createJobClientActorProps( + Props jobClientActorProps = JobSubmissionClientActor.createActorProps( testingLeaderRetrievalService, jobClientActorTimeout, false); @@ -100,19 +102,56 @@ public class JobClientActorTest extends TestLogger { Await.result(jobExecutionResult, timeout); } + + /** Tests that a {@link JobClientActorRegistrationTimeoutException} is thrown when the registration + * cannot be performed at the JobManager by the JobAttachmentClientActor. This is here the case, because the + * started JobManager never replies to a {@link RegisterJobClient} message. + */ + @Test(expected=JobClientActorRegistrationTimeoutException.class) + public void testRegistrationTimeout() throws Exception { + FiniteDuration jobClientActorTimeout = new FiniteDuration(5, TimeUnit.SECONDS); + FiniteDuration timeout = jobClientActorTimeout.$times(2); + + UUID leaderSessionID = UUID.randomUUID(); + + ActorRef jobManager = system.actorOf( + Props.create( + PlainActor.class, + leaderSessionID)); + + TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService( + jobManager.path().toString(), + leaderSessionID + ); + + Props jobClientActorProps = JobAttachmentClientActor.createActorProps( + testingLeaderRetrievalService, + jobClientActorTimeout, + false); + + ActorRef jobClientActor = system.actorOf(jobClientActorProps); + + Future<Object> jobExecutionResult = Patterns.ask( + jobClientActor, + new JobClientMessages.AttachToJobAndWait(testJobGraph.getJobID()), + new Timeout(timeout)); + + Await.result(jobExecutionResult, timeout); + } + /** Tests that a {@link org.apache.flink.runtime.client.JobClientActorConnectionTimeoutException} - * is thrown when the JobClientActor wants to submit a job but has not connected to a JobManager. + * is thrown when the JobSubmissionClientActor wants to submit a job but has not connected to a JobManager. * * @throws Exception */ @Test(expected=JobClientActorConnectionTimeoutException.class) - public void testConnectionTimeoutWithoutJobManager() throws Exception { + public void testConnectionTimeoutWithoutJobManagerForSubmission() throws Exception { FiniteDuration jobClientActorTimeout = new FiniteDuration(5, TimeUnit.SECONDS); FiniteDuration timeout = jobClientActorTimeout.$times(2); TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService(); - Props jobClientActorProps = JobClientActor.createJobClientActorProps( + Props jobClientActorProps = JobSubmissionClientActor.createActorProps( testingLeaderRetrievalService, jobClientActorTimeout, false); @@ -128,6 +167,32 @@ public class JobClientActorTest extends TestLogger { } /** Tests that a {@link org.apache.flink.runtime.client.JobClientActorConnectionTimeoutException} + * is thrown when the JobAttachmentClientActor attach to a job at the JobManager + * but has not connected to a JobManager. + */ + @Test(expected=JobClientActorConnectionTimeoutException.class) + public void testConnectionTimeoutWithoutJobManagerForRegistration() throws Exception { + FiniteDuration jobClientActorTimeout = new FiniteDuration(5, TimeUnit.SECONDS); + FiniteDuration timeout = jobClientActorTimeout.$times(2); + + TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService(); + + Props jobClientActorProps = JobAttachmentClientActor.createActorProps( + testingLeaderRetrievalService, + jobClientActorTimeout, + false); + + ActorRef jobClientActor = system.actorOf(jobClientActorProps); + + Future<Object> jobExecutionResult = Patterns.ask( + jobClientActor, + new JobClientMessages.AttachToJobAndWait(testJobGraph.getJobID()), + new Timeout(timeout)); + + Await.result(jobExecutionResult, timeout); + } + + /** Tests that a {@link org.apache.flink.runtime.client.JobClientActorConnectionTimeoutException} * is thrown after a successful job submission if the JobManager dies. * * @throws Exception @@ -149,7 +214,7 @@ public class JobClientActorTest extends TestLogger { leaderSessionID ); - Props jobClientActorProps = JobClientActor.createJobClientActorProps( + Props jobClientActorProps = JobSubmissionClientActor.createActorProps( testingLeaderRetrievalService, jobClientActorTimeout, false); @@ -170,6 +235,91 @@ public class JobClientActorTest extends TestLogger { Await.result(jobExecutionResult, timeout); } + /** Tests that a {@link JobClientActorConnectionTimeoutException} + * is thrown after a successful registration of the client at the JobManager. + */ + @Test(expected=JobClientActorConnectionTimeoutException.class) + public void testConnectionTimeoutAfterJobRegistration() throws Exception { + FiniteDuration jobClientActorTimeout = new FiniteDuration(5, TimeUnit.SECONDS); + FiniteDuration timeout = jobClientActorTimeout.$times(2); + + UUID leaderSessionID = UUID.randomUUID(); + + ActorRef jobManager = system.actorOf( + Props.create( + JobAcceptingActor.class, + leaderSessionID)); + + TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService( + jobManager.path().toString(), + leaderSessionID + ); + + Props jobClientActorProps = JobAttachmentClientActor.createActorProps( + testingLeaderRetrievalService, + jobClientActorTimeout, + false); + + ActorRef jobClientActor = system.actorOf(jobClientActorProps); + + Future<Object> jobExecutionResult = Patterns.ask( + jobClientActor, + new AttachToJobAndWait(testJobGraph.getJobID()), + new Timeout(timeout)); + + Future<Object> waitFuture = Patterns.ask(jobManager, new RegisterTest(), new Timeout(timeout)); + + Await.result(waitFuture, timeout); + + jobManager.tell(PoisonPill.getInstance(), ActorRef.noSender()); + + Await.result(jobExecutionResult, timeout); + } + + + /** Tests that JobClient throws an Exception if the JobClientActor dies and can't answer to + * {@link akka.actor.Identify} message anymore. + */ + @Test + public void testGuaranteedAnswerIfJobClientDies() throws Exception { + FiniteDuration timeout = new FiniteDuration(2, TimeUnit.SECONDS); + + UUID leaderSessionID = UUID.randomUUID(); + + ActorRef jobManager = system.actorOf( + Props.create( + JobAcceptingActor.class, + leaderSessionID)); + + TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService( + jobManager.path().toString(), + leaderSessionID + ); + + JobListeningContext jobListeningContext = + JobClient.submitJob( + system, + testingLeaderRetrievalService, + testJobGraph, + timeout, + false, + getClass().getClassLoader()); + + Future<Object> waitFuture = Patterns.ask(jobManager, new RegisterTest(), new Timeout(timeout)); + Await.result(waitFuture, timeout); + + // kill the job client actor which has been registered at the JobManager + jobListeningContext.getJobClientActor().tell(PoisonPill.getInstance(), ActorRef.noSender()); + + try { + // should not block but return an error + JobClient.awaitJobResult(jobListeningContext); + Assert.fail(); + } catch (JobExecutionException e) { + // this is what we want + } + } + public static class PlainActor extends FlinkUntypedActor { private final UUID leaderSessionID; @@ -180,7 +330,6 @@ public class JobClientActorTest extends TestLogger { @Override protected void handleMessage(Object message) throws Exception { - } @Override @@ -200,17 +349,29 @@ public class JobClientActorTest extends TestLogger { @Override protected void handleMessage(Object message) throws Exception { - if (message instanceof JobManagerMessages.SubmitJob) { + if (message instanceof SubmitJob) { getSender().tell( - new JobManagerMessages.JobSubmitSuccess(((JobManagerMessages.SubmitJob) message).jobGraph().getJobID()), + new JobSubmitSuccess(((SubmitJob) message).jobGraph().getJobID()), getSelf()); jobAccepted = true; - if(testFuture != ActorRef.noSender()) { + if (testFuture != ActorRef.noSender()) { testFuture.tell(Messages.getAcknowledge(), getSelf()); } - } else if (message instanceof RegisterTest) { + } + else if (message instanceof RegisterJobClient) { + getSender().tell( + new RegisterJobClientSuccess(((RegisterJobClient) message).jobID()), + getSelf()); + + jobAccepted = true; + + if (testFuture != ActorRef.noSender()) { + testFuture.tell(Messages.getAcknowledge(), getSelf()); + } + } + else if (message instanceof RegisterTest) { testFuture = getSender(); if (jobAccepted) { @@ -226,4 +387,5 @@ public class JobClientActorTest extends TestLogger { } public static class RegisterTest{} + } http://git-wip-us.apache.org/repos/asf/flink/blob/259a3a55/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java index c71bd35..426dfba 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java @@ -286,7 +286,6 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger { JobInfo expectedJobInfo = expected.getJobInfo(); JobInfo actualJobInfo = actual.getJobInfo(); - assertEquals(expectedJobInfo.listeningBehaviour(), actualJobInfo.listeningBehaviour()); - assertEquals(expectedJobInfo.start(), actualJobInfo.start()); + assertEquals(expectedJobInfo, actualJobInfo); } } http://git-wip-us.apache.org/repos/asf/flink/blob/259a3a55/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java new file mode 100644 index 0000000..db17ee8 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.test.clients.examples; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; +import org.apache.flink.api.common.JobID; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.StandaloneClusterClient; +import org.apache.flink.runtime.client.JobRetrievalException; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.minicluster.FlinkMiniCluster; +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; +import org.apache.flink.test.util.ForkableFlinkMiniCluster; +import org.apache.flink.util.TestLogger; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import scala.collection.Seq; + +import java.util.concurrent.Semaphore; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + + +/** + * Tests retrieval of a job from a running Flink cluster + */ +public class JobRetrievalITCase extends TestLogger { + + private static final Semaphore lock = new Semaphore(1); + + private static FlinkMiniCluster cluster; + + @BeforeClass + public static void before() { + cluster = new ForkableFlinkMiniCluster(new Configuration(), false); + cluster.start(); + } + + @AfterClass + public static void after() { + cluster.stop(); + cluster = null; + } + + @Test + public void testJobRetrieval() throws Exception { + final JobID jobID = new JobID(); + + final JobVertex imalock = new JobVertex("imalock"); + imalock.setInvokableClass(SemaphoreInvokable.class); + + final JobGraph jobGraph = new JobGraph(jobID, "testjob", imalock); + + final ClusterClient client = new StandaloneClusterClient(cluster.configuration()); + + // acquire the lock to make sure that the job cannot complete until the job client + // has been attached in resumingThread + lock.acquire(); + client.runDetached(jobGraph, JobRetrievalITCase.class.getClassLoader()); + + final Thread resumingThread = new Thread(new Runnable() { + @Override + public void run() { + try { + assertNotNull(client.retrieveJob(jobID)); + } catch (JobExecutionException e) { + fail(e.getMessage()); + } + } + }); + + final Seq<ActorSystem> actorSystemSeq = cluster.jobManagerActorSystems().get(); + final ActorSystem actorSystem = actorSystemSeq.last(); + JavaTestKit testkit = new JavaTestKit(actorSystem); + + final ActorRef jm = cluster.getJobManagersAsJava().get(0); + // wait until client connects + jm.tell(TestingJobManagerMessages.getNotifyWhenClientConnects(), testkit.getRef()); + // confirm registration + testkit.expectMsgEquals(true); + + // kick off resuming + resumingThread.start(); + + // wait for client to connect + testkit.expectMsgEquals(true); + // client has connected, we can release the lock + lock.release(); + + resumingThread.join(); + } + + @Test + public void testNonExistingJobRetrieval() throws Exception { + final JobID jobID = new JobID(); + ClusterClient client = new StandaloneClusterClient(cluster.configuration()); + + try { + client.retrieveJob(jobID); + fail(); + } catch (JobRetrievalException e) { + // this is what we want + } + } + + + public static class SemaphoreInvokable extends AbstractInvokable { + + @Override + public void invoke() throws Exception { + lock.acquire(); + } + } + +}
