Repository: flink
Updated Branches:
  refs/heads/master abb449678 -> b05ea6939


[FLINK-4273] adapt JobRetrievalITCase to lazy classloader reconstruction


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b05ea693
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b05ea693
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b05ea693

Branch: refs/heads/master
Commit: b05ea693984a5f5bf2e53f89d9fbd531e7be83fd
Parents: abb4496
Author: Maximilian Michels <[email protected]>
Authored: Wed Aug 24 10:11:45 2016 +0200
Committer: Maximilian Michels <[email protected]>
Committed: Fri Aug 26 15:29:46 2016 +0200

----------------------------------------------------------------------
 .../flink/runtime/client/JobAttachmentClientActor.java   |  3 ++-
 .../org/apache/flink/runtime/jobmanager/JobManager.scala |  2 +-
 .../runtime/testingUtils/TestingJobManagerLike.scala     |  7 +++++--
 .../runtime/testingUtils/TestingJobManagerMessages.scala | 11 +++++++++++
 .../flink/test/clients/examples/JobRetrievalITCase.java  |  8 +++++---
 5 files changed, 24 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b05ea693/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java
index 5446002..ffab9cc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobAttachmentClientActor.java
@@ -130,7 +130,8 @@ public class JobAttachmentClientActor extends 
JobClientActor {
        }
 
        private void tryToAttachToJob() {
-               LOG.info("Sending message to JobManager {} to attach to job {} 
and wait for progress", jobID);
+               LOG.info("Sending message to JobManager {} to attach to job {} 
and wait for progress",
+                       jobManager, jobID);
 
                Futures.future(new Callable<Object>() {
                        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b05ea693/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index d35fb0a..0e28d98 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -478,7 +478,7 @@ class JobManager(
       val client = sender()
       currentJobs.get(jobID) match {
         case Some((executionGraph, jobInfo)) =>
-          log.info("Registering client for job $jobID")
+          log.info(s"Registering client for job $jobID")
           jobInfo.clients += ((client, listeningBehaviour))
           val listener = new StatusListenerMessenger(client, 
leaderSessionID.orNull)
           executionGraph.registerJobStatusListener(listener)

http://git-wip-us.apache.org/repos/asf/flink/blob/b05ea693/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
index df4f95a..6a9b490 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
@@ -26,7 +26,7 @@ import org.apache.flink.runtime.execution.ExecutionState
 import org.apache.flink.runtime.jobgraph.JobStatus
 import org.apache.flink.runtime.jobmanager.JobManager
 import 
org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
-import org.apache.flink.runtime.messages.JobManagerMessages.{GrantLeadership, 
RegisterJobClient}
+import org.apache.flink.runtime.messages.JobManagerMessages.{GrantLeadership, 
RegisterJobClient, RequestClassloadingProps}
 import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
 import 
org.apache.flink.runtime.messages.RegistrationMessages.RegisterTaskManager
 import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat
@@ -336,7 +336,10 @@ trait TestingJobManagerLike extends FlinkActor {
 
     case msg: RegisterJobClient =>
       super.handleMessage(msg)
-      waitForClient.foreach(_ ! true)
+      waitForClient.foreach(_ ! ClientConnected)
+    case msg: RequestClassloadingProps =>
+      super.handleMessage(msg)
+      waitForClient.foreach(_ ! ClassLoadingPropsDelivered)
 
     case 
NotifyWhenAtLeastNumTaskManagerAreRegistered(numRegisteredTaskManager) =>
       if (that.instanceManager.getNumberOfRegisteredTaskManagers >= 
numRegisteredTaskManager) {

http://git-wip-us.apache.org/repos/asf/flink/blob/b05ea693/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
index a88ed43..f121305 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
@@ -86,6 +86,14 @@ object TestingJobManagerMessages {
     * Notifies the sender when the [[TestingJobManager]] receives new clients 
for jobs
     */
   case object NotifyWhenClientConnects
+  /**
+    * Notifes of client connect
+    */
+  case object ClientConnected
+  /**
+    * Notifies when the client has requested class loading information
+    */
+  case object ClassLoadingPropsDelivered
 
   /**
    * Registers to be notified by an 
[[org.apache.flink.runtime.messages.Messages.Acknowledge]]
@@ -119,4 +127,7 @@ object TestingJobManagerMessages {
   def getNotifyWhenClientConnects(): AnyRef = NotifyWhenClientConnects
   def getDisablePostStop(): AnyRef = DisablePostStop
 
+  def getClientConnected(): AnyRef = ClientConnected
+  def getClassLoadingPropsDelivered(): AnyRef = ClassLoadingPropsDelivered
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b05ea693/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java
index db17ee8..c9059f1 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java
@@ -25,7 +25,6 @@ import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.runtime.client.JobRetrievalException;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -86,7 +85,7 @@ public class JobRetrievalITCase extends TestLogger {
                        public void run() {
                                try {
                                        
assertNotNull(client.retrieveJob(jobID));
-                               } catch (JobExecutionException e) {
+                               } catch (Throwable e) {
                                        fail(e.getMessage());
                                }
                        }
@@ -106,7 +105,10 @@ public class JobRetrievalITCase extends TestLogger {
                resumingThread.start();
 
                // wait for client to connect
-               testkit.expectMsgEquals(true);
+               testkit.expectMsgAllOf(
+                       TestingJobManagerMessages.getClientConnected(),
+                       
TestingJobManagerMessages.getClassLoadingPropsDelivered());
+
                // client has connected, we can release the lock
                lock.release();
 

Reply via email to