Repository: flink
Updated Branches:
  refs/heads/master 444315a12 -> 259a3a556


http://git-wip-us.apache.org/repos/asf/flink/blob/259a3a55/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
index 9640fcd..df4f95a 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
@@ -26,7 +26,7 @@ import org.apache.flink.runtime.execution.ExecutionState
 import org.apache.flink.runtime.jobgraph.JobStatus
 import org.apache.flink.runtime.jobmanager.JobManager
 import 
org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
-import org.apache.flink.runtime.messages.JobManagerMessages.GrantLeadership
+import org.apache.flink.runtime.messages.JobManagerMessages.{GrantLeadership, 
RegisterJobClient}
 import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
 import 
org.apache.flink.runtime.messages.RegistrationMessages.RegisterTaskManager
 import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat
@@ -67,6 +67,8 @@ trait TestingJobManagerLike extends FlinkActor {
       override def compare(x: (Int, ActorRef), y: (Int, ActorRef)): Int = y._1 
- x._1
     })
 
+  val waitForClient = scala.collection.mutable.HashSet[ActorRef]()
+
   val waitForShutdown = scala.collection.mutable.HashSet[ActorRef]()
 
   var disconnectDisabled = false
@@ -328,6 +330,14 @@ trait TestingJobManagerLike extends FlinkActor {
 
       waitForLeader.clear()
 
+    case NotifyWhenClientConnects =>
+      waitForClient += sender()
+      sender() ! true
+
+    case msg: RegisterJobClient =>
+      super.handleMessage(msg)
+      waitForClient.foreach(_ ! true)
+
     case 
NotifyWhenAtLeastNumTaskManagerAreRegistered(numRegisteredTaskManager) =>
       if (that.instanceManager.getNumberOfRegisteredTaskManagers >= 
numRegisteredTaskManager) {
         // there are already at least numRegisteredTaskManager registered --> 
send Acknowledge

http://git-wip-us.apache.org/repos/asf/flink/blob/259a3a55/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
index a411c8b..a88ed43 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
@@ -83,6 +83,11 @@ object TestingJobManagerMessages {
   case object NotifyWhenLeader
 
   /**
+    * Notifies the sender when the [[TestingJobManager]] receives new clients 
for jobs
+    */
+  case object NotifyWhenClientConnects
+
+  /**
    * Registers to be notified by an 
[[org.apache.flink.runtime.messages.Messages.Acknowledge]]
    * message when at least numRegisteredTaskManager have registered at the 
JobManager.
    *
@@ -111,6 +116,7 @@ object TestingJobManagerMessages {
   case class ResponseSavepoint(savepoint: Savepoint)
 
   def getNotifyWhenLeader(): AnyRef = NotifyWhenLeader
+  def getNotifyWhenClientConnects(): AnyRef = NotifyWhenClientConnects
   def getDisablePostStop(): AnyRef = DisablePostStop
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/259a3a55/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
index 073164c0..2adf7eb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java
@@ -25,17 +25,17 @@ import akka.actor.Props;
 import akka.pattern.Patterns;
 import akka.testkit.JavaTestKit;
 import akka.util.Timeout;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.FlinkUntypedActor;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobClientMessages;
-import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.JobClientMessages.AttachToJobAndWait;
 import org.apache.flink.runtime.messages.Messages;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import scala.concurrent.Await;
@@ -45,6 +45,8 @@ import scala.concurrent.duration.FiniteDuration;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.flink.runtime.messages.JobManagerMessages.*;
+
 public class JobClientActorTest extends TestLogger {
 
        private static ActorSystem system;
@@ -62,8 +64,8 @@ public class JobClientActorTest extends TestLogger {
        }
 
        /** Tests that a {@link JobClientActorSubmissionTimeoutException} is 
thrown when the job cannot
-        * be submitted by the JobClientActor. This is here the case, because 
the started JobManager
-        * never replies to a SubmitJob message.
+        * be submitted by the JobSubmissionClientActor. This is here the case, 
because the started JobManager
+        * never replies to a {@link SubmitJob} message.
         *
         * @throws Exception
         */
@@ -84,7 +86,7 @@ public class JobClientActorTest extends TestLogger {
                        leaderSessionID
                );
 
-               Props jobClientActorProps = 
JobClientActor.createJobClientActorProps(
+               Props jobClientActorProps = 
JobSubmissionClientActor.createActorProps(
                        testingLeaderRetrievalService,
                        jobClientActorTimeout,
                        false);
@@ -100,19 +102,56 @@ public class JobClientActorTest extends TestLogger {
                Await.result(jobExecutionResult, timeout);
        }
 
+
+       /** Tests that a {@link JobClientActorRegistrationTimeoutException} is 
thrown when the registration
+        * cannot be performed at the JobManager by the 
JobAttachmentClientActor. This is here the case, because the
+        * started JobManager never replies to a {@link RegisterJobClient} 
message.
+        */
+       @Test(expected=JobClientActorRegistrationTimeoutException.class)
+       public void testRegistrationTimeout() throws Exception {
+               FiniteDuration jobClientActorTimeout = new FiniteDuration(5, 
TimeUnit.SECONDS);
+               FiniteDuration timeout = jobClientActorTimeout.$times(2);
+
+               UUID leaderSessionID = UUID.randomUUID();
+
+               ActorRef jobManager = system.actorOf(
+                       Props.create(
+                               PlainActor.class,
+                               leaderSessionID));
+
+               TestingLeaderRetrievalService testingLeaderRetrievalService = 
new TestingLeaderRetrievalService(
+                       jobManager.path().toString(),
+                       leaderSessionID
+               );
+
+               Props jobClientActorProps = 
JobAttachmentClientActor.createActorProps(
+                       testingLeaderRetrievalService,
+                       jobClientActorTimeout,
+                       false);
+
+               ActorRef jobClientActor = system.actorOf(jobClientActorProps);
+
+               Future<Object> jobExecutionResult = Patterns.ask(
+                       jobClientActor,
+                       new 
JobClientMessages.AttachToJobAndWait(testJobGraph.getJobID()),
+                       new Timeout(timeout));
+
+               Await.result(jobExecutionResult, timeout);
+       }
+
        /** Tests that a {@link 
org.apache.flink.runtime.client.JobClientActorConnectionTimeoutException}
-        * is thrown when the JobClientActor wants to submit a job but has not 
connected to a JobManager.
+        * is thrown when the JobSubmissionClientActor wants to submit a job 
but has not connected to a JobManager.
         *
         * @throws Exception
         */
        @Test(expected=JobClientActorConnectionTimeoutException.class)
-       public void testConnectionTimeoutWithoutJobManager() throws Exception {
+       public void testConnectionTimeoutWithoutJobManagerForSubmission() 
throws Exception {
                FiniteDuration jobClientActorTimeout = new FiniteDuration(5, 
TimeUnit.SECONDS);
                FiniteDuration timeout = jobClientActorTimeout.$times(2);
 
                TestingLeaderRetrievalService testingLeaderRetrievalService = 
new TestingLeaderRetrievalService();
 
-               Props jobClientActorProps = 
JobClientActor.createJobClientActorProps(
+               Props jobClientActorProps = 
JobSubmissionClientActor.createActorProps(
                        testingLeaderRetrievalService,
                        jobClientActorTimeout,
                        false);
@@ -128,6 +167,32 @@ public class JobClientActorTest extends TestLogger {
        }
 
        /** Tests that a {@link 
org.apache.flink.runtime.client.JobClientActorConnectionTimeoutException}
+        * is thrown when the JobAttachmentClientActor attach to a job at the 
JobManager
+        * but has not connected to a JobManager.
+        */
+       @Test(expected=JobClientActorConnectionTimeoutException.class)
+       public void testConnectionTimeoutWithoutJobManagerForRegistration() 
throws Exception {
+               FiniteDuration jobClientActorTimeout = new FiniteDuration(5, 
TimeUnit.SECONDS);
+               FiniteDuration timeout = jobClientActorTimeout.$times(2);
+
+               TestingLeaderRetrievalService testingLeaderRetrievalService = 
new TestingLeaderRetrievalService();
+
+               Props jobClientActorProps = 
JobAttachmentClientActor.createActorProps(
+                       testingLeaderRetrievalService,
+                       jobClientActorTimeout,
+                       false);
+
+               ActorRef jobClientActor = system.actorOf(jobClientActorProps);
+
+               Future<Object> jobExecutionResult = Patterns.ask(
+                       jobClientActor,
+                       new 
JobClientMessages.AttachToJobAndWait(testJobGraph.getJobID()),
+                       new Timeout(timeout));
+
+               Await.result(jobExecutionResult, timeout);
+       }
+
+       /** Tests that a {@link 
org.apache.flink.runtime.client.JobClientActorConnectionTimeoutException}
         * is thrown after a successful job submission if the JobManager dies.
         *
         * @throws Exception
@@ -149,7 +214,7 @@ public class JobClientActorTest extends TestLogger {
                        leaderSessionID
                );
 
-               Props jobClientActorProps = 
JobClientActor.createJobClientActorProps(
+               Props jobClientActorProps = 
JobSubmissionClientActor.createActorProps(
                        testingLeaderRetrievalService,
                        jobClientActorTimeout,
                        false);
@@ -170,6 +235,91 @@ public class JobClientActorTest extends TestLogger {
                Await.result(jobExecutionResult, timeout);
        }
 
+       /** Tests that a {@link JobClientActorConnectionTimeoutException}
+        * is thrown after a successful registration of the client at the 
JobManager.
+        */
+       @Test(expected=JobClientActorConnectionTimeoutException.class)
+       public void testConnectionTimeoutAfterJobRegistration() throws 
Exception {
+               FiniteDuration jobClientActorTimeout = new FiniteDuration(5, 
TimeUnit.SECONDS);
+               FiniteDuration timeout = jobClientActorTimeout.$times(2);
+
+               UUID leaderSessionID = UUID.randomUUID();
+
+               ActorRef jobManager = system.actorOf(
+                       Props.create(
+                               JobAcceptingActor.class,
+                               leaderSessionID));
+
+               TestingLeaderRetrievalService testingLeaderRetrievalService = 
new TestingLeaderRetrievalService(
+                       jobManager.path().toString(),
+                       leaderSessionID
+               );
+
+               Props jobClientActorProps = 
JobAttachmentClientActor.createActorProps(
+                       testingLeaderRetrievalService,
+                       jobClientActorTimeout,
+                       false);
+
+               ActorRef jobClientActor = system.actorOf(jobClientActorProps);
+
+               Future<Object> jobExecutionResult = Patterns.ask(
+                       jobClientActor,
+                       new AttachToJobAndWait(testJobGraph.getJobID()),
+                       new Timeout(timeout));
+
+               Future<Object> waitFuture = Patterns.ask(jobManager, new 
RegisterTest(), new Timeout(timeout));
+
+               Await.result(waitFuture, timeout);
+
+               jobManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
+
+               Await.result(jobExecutionResult, timeout);
+       }
+
+
+       /** Tests that JobClient throws an Exception if the JobClientActor dies 
and can't answer to
+        * {@link akka.actor.Identify} message anymore.
+        */
+       @Test
+       public void testGuaranteedAnswerIfJobClientDies() throws Exception {
+               FiniteDuration timeout = new FiniteDuration(2, 
TimeUnit.SECONDS);
+
+                       UUID leaderSessionID = UUID.randomUUID();
+
+               ActorRef jobManager = system.actorOf(
+                       Props.create(
+                               JobAcceptingActor.class,
+                               leaderSessionID));
+
+               TestingLeaderRetrievalService testingLeaderRetrievalService = 
new TestingLeaderRetrievalService(
+                       jobManager.path().toString(),
+                       leaderSessionID
+               );
+
+               JobListeningContext jobListeningContext =
+                       JobClient.submitJob(
+                               system,
+                               testingLeaderRetrievalService,
+                               testJobGraph,
+                               timeout,
+                               false,
+                               getClass().getClassLoader());
+
+               Future<Object> waitFuture = Patterns.ask(jobManager, new 
RegisterTest(), new Timeout(timeout));
+               Await.result(waitFuture, timeout);
+
+               // kill the job client actor which has been registered at the 
JobManager
+               
jobListeningContext.getJobClientActor().tell(PoisonPill.getInstance(), 
ActorRef.noSender());
+
+               try {
+                       // should not block but return an error
+                       JobClient.awaitJobResult(jobListeningContext);
+                       Assert.fail();
+               } catch (JobExecutionException e) {
+                       // this is what we want
+               }
+       }
+
        public static class PlainActor extends FlinkUntypedActor {
 
                private final UUID leaderSessionID;
@@ -180,7 +330,6 @@ public class JobClientActorTest extends TestLogger {
 
                @Override
                protected void handleMessage(Object message) throws Exception {
-
                }
 
                @Override
@@ -200,17 +349,29 @@ public class JobClientActorTest extends TestLogger {
 
                @Override
                protected void handleMessage(Object message) throws Exception {
-                       if (message instanceof JobManagerMessages.SubmitJob) {
+                       if (message instanceof SubmitJob) {
                                getSender().tell(
-                                       new 
JobManagerMessages.JobSubmitSuccess(((JobManagerMessages.SubmitJob) 
message).jobGraph().getJobID()),
+                                       new JobSubmitSuccess(((SubmitJob) 
message).jobGraph().getJobID()),
                                        getSelf());
 
                                jobAccepted = true;
 
-                               if(testFuture != ActorRef.noSender()) {
+                               if (testFuture != ActorRef.noSender()) {
                                        
testFuture.tell(Messages.getAcknowledge(), getSelf());
                                }
-                       } else if (message instanceof RegisterTest) {
+                       }
+                       else if (message instanceof RegisterJobClient) {
+                               getSender().tell(
+                                       new 
RegisterJobClientSuccess(((RegisterJobClient) message).jobID()),
+                                       getSelf());
+
+                               jobAccepted = true;
+
+                               if (testFuture != ActorRef.noSender()) {
+                                       
testFuture.tell(Messages.getAcknowledge(), getSelf());
+                               }
+                       }
+                       else if (message instanceof RegisterTest) {
                                testFuture = getSender();
 
                                if (jobAccepted) {
@@ -226,4 +387,5 @@ public class JobClientActorTest extends TestLogger {
        }
 
        public static class RegisterTest{}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/259a3a55/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
index c71bd35..426dfba 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
@@ -286,7 +286,6 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends 
TestLogger {
                JobInfo expectedJobInfo = expected.getJobInfo();
                JobInfo actualJobInfo = actual.getJobInfo();
 
-               assertEquals(expectedJobInfo.listeningBehaviour(), 
actualJobInfo.listeningBehaviour());
-               assertEquals(expectedJobInfo.start(), actualJobInfo.start());
+               assertEquals(expectedJobInfo, actualJobInfo);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/259a3a55/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java
new file mode 100644
index 0000000..db17ee8
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.clients.examples;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.StandaloneClusterClient;
+import org.apache.flink.runtime.client.JobRetrievalException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.apache.flink.util.TestLogger;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import scala.collection.Seq;
+
+import java.util.concurrent.Semaphore;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+
+/**
+ * Tests retrieval of a job from a running Flink cluster
+ */
+public class JobRetrievalITCase extends TestLogger {
+
+       private static final Semaphore lock = new Semaphore(1);
+
+       private static FlinkMiniCluster cluster;
+
+       @BeforeClass
+       public static void before() {
+               cluster = new ForkableFlinkMiniCluster(new Configuration(), 
false);
+               cluster.start();
+       }
+
+       @AfterClass
+       public static void after() {
+               cluster.stop();
+               cluster = null;
+       }
+
+       @Test
+       public void testJobRetrieval() throws Exception {
+               final JobID jobID = new JobID();
+
+               final JobVertex imalock = new JobVertex("imalock");
+               imalock.setInvokableClass(SemaphoreInvokable.class);
+
+               final JobGraph jobGraph = new JobGraph(jobID, "testjob", 
imalock);
+
+               final ClusterClient client = new 
StandaloneClusterClient(cluster.configuration());
+
+               // acquire the lock to make sure that the job cannot complete 
until the job client
+               // has been attached in resumingThread
+               lock.acquire();
+               client.runDetached(jobGraph, 
JobRetrievalITCase.class.getClassLoader());
+
+               final Thread resumingThread = new Thread(new Runnable() {
+                       @Override
+                       public void run() {
+                               try {
+                                       
assertNotNull(client.retrieveJob(jobID));
+                               } catch (JobExecutionException e) {
+                                       fail(e.getMessage());
+                               }
+                       }
+               });
+
+               final Seq<ActorSystem> actorSystemSeq = 
cluster.jobManagerActorSystems().get();
+               final ActorSystem actorSystem = actorSystemSeq.last();
+               JavaTestKit testkit = new JavaTestKit(actorSystem);
+
+               final ActorRef jm = cluster.getJobManagersAsJava().get(0);
+               // wait until client connects
+               
jm.tell(TestingJobManagerMessages.getNotifyWhenClientConnects(), 
testkit.getRef());
+               // confirm registration
+               testkit.expectMsgEquals(true);
+
+               // kick off resuming
+               resumingThread.start();
+
+               // wait for client to connect
+               testkit.expectMsgEquals(true);
+               // client has connected, we can release the lock
+               lock.release();
+
+               resumingThread.join();
+       }
+
+       @Test
+       public void testNonExistingJobRetrieval() throws Exception {
+               final JobID jobID = new JobID();
+               ClusterClient client = new 
StandaloneClusterClient(cluster.configuration());
+
+               try {
+                       client.retrieveJob(jobID);
+                       fail();
+               } catch (JobRetrievalException e) {
+                       // this is what we want
+               }
+       }
+
+
+       public static class SemaphoreInvokable extends AbstractInvokable {
+
+               @Override
+               public void invoke() throws Exception {
+                       lock.acquire();
+               }
+       }
+
+}


Reply via email to