[runtime tests] Fix buggy recover a task manager failure integration test case

[runtime tests] Fix TaskManagerFailsITCase and 
TaskManagerFailsWithSlotSharingITCase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/517289dc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/517289dc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/517289dc

Branch: refs/heads/master
Commit: 517289dc599e697e7a14fae175421e93a16b955b
Parents: 730e056
Author: Till Rohrmann <[email protected]>
Authored: Sun Jan 25 19:33:03 2015 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Tue Jan 27 14:33:49 2015 +0100

----------------------------------------------------------------------
 .../flink/runtime/taskmanager/TaskManager.scala |  23 ++--
 .../runtime/jobmanager/RecoveryITCase.scala     |  25 +++-
 .../jobmanager/TaskManagerFailsITCase.scala     |  10 +-
 .../TaskManagerFailsWithSlotSharingITCase.scala |   3 +-
 .../TaskManagerRegistrationITCase.scala         |  39 ++++--
 .../apache/flink/runtime/jobmanager/Tasks.scala |  12 ++
 .../testingUtils/TestingJobManager.scala        | 126 ++++++++++++++-----
 .../TestingJobManagerMessages.scala             |  11 +-
 .../runtime/testingUtils/TestingUtils.scala     |   4 +-
 9 files changed, 188 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/517289dc/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 17bc0ac..5d2d21d 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -178,6 +178,7 @@ import scala.collection.JavaConverters._
     registrationDuration = 0 seconds
 
     registered = false
+    currentJobManager = ActorRef.noSender
 
     context.system.scheduler.scheduleOnce(registrationDelay, self, 
RegisterAtJobManager)
   }
@@ -190,8 +191,8 @@ import scala.collection.JavaConverters._
         registrationDelay *= 2
 
         if (registrationDuration > maxRegistrationDuration) {
-          log.warning("TaskManager could not register at JobManager {} after 
{}.", jobManagerAkkaURL,
-
+          log.warning("TaskManager could not register at JobManager {} after 
{}.",
+            jobManagerAkkaURL,
             maxRegistrationDuration)
 
           self ! PoisonPill
@@ -212,10 +213,8 @@ import scala.collection.JavaConverters._
         finishRegistration(id, blobPort)
         registered = true
       } else {
-        if (log.isDebugEnabled) {
-          log.debug("The TaskManager {} is already registered at the 
JobManager {}, but received " +
-            "another AcknowledgeRegistration message.", self.path, 
currentJobManager.path)
-        }
+        log.info("The TaskManager {} is already registered at the JobManager 
{}, but received " +
+          "another AcknowledgeRegistration message.", self.path, 
currentJobManager.path)
       }
     }
 
@@ -228,10 +227,8 @@ import scala.collection.JavaConverters._
         registered = true
       } else {
         // ignore AlreadyRegistered messages which arrived after 
AcknowledgeRegistration
-        if(log.isDebugEnabled){
-          log.debug("The TaskManager {} has already been registered at the 
JobManager {}.",
-            self.path, sender.path)
-        }
+        log.info("The TaskManager {} has already been registered at the 
JobManager {}.",
+          self.path, sender.path)
       }
 
     case RefuseRegistration(reason) =>
@@ -243,10 +240,8 @@ import scala.collection.JavaConverters._
         self ! PoisonPill
       } else {
         // ignore RefuseRegistration messages which arrived after 
AcknowledgeRegistration
-        if(log.isDebugEnabled) {
-          log.debug("Received RefuseRegistration from the JobManager even 
though being already " +
-            "registered")
-        }
+        log.info("Received RefuseRegistration from the JobManager even though 
being already " +
+          "registered")
       }
 
     case SubmitTask(tdd) => {

http://git-wip-us.apache.org/repos/asf/flink/blob/517289dc/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
index 6ed7620..e20471d 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
@@ -18,13 +18,15 @@
 
 package org.apache.flink.runtime.jobmanager
 
-import akka.actor.{PoisonPill, ActorSystem}
+import akka.actor.{ActorRef, PoisonPill, ActorSystem}
 import akka.testkit.{ImplicitSender, TestKit}
-import org.apache.flink.runtime.jobgraph.{JobGraph, DistributionPattern, 
AbstractJobVertex}
+import org.apache.flink.runtime.jobgraph.{JobStatus, JobGraph, 
DistributionPattern,
+AbstractJobVertex}
 import org.apache.flink.runtime.jobmanager.Tasks.{BlockingOnceReceiver, 
FailingOnceReceiver}
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup
 import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultSuccess, 
SubmissionSuccess,
 SubmitJob}
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
 import org.apache.flink.runtime.testingUtils.TestingUtils
 import org.junit.runner.RunWith
 import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
@@ -148,7 +150,6 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
       val cluster = TestingUtils.startTestingCluster(NUM_TASKS, 2)
 
       val jm = cluster.getJobManager
-      val taskManagers = cluster.getTaskManagers
 
       try {
         within(TestingUtils.TESTING_DURATION){
@@ -156,9 +157,23 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
 
           expectMsg(SubmissionSuccess(jobGraph.getJobID))
 
-          Thread.sleep(500)
+          jm ! WaitForAllVerticesToBeRunningOrFinished(jobGraph.getJobID)
+
+          expectMsg(AllVerticesRunning(jobGraph.getJobID))
+
           BlockingOnceReceiver.blocking = false
-          taskManagers(0) ! PoisonPill
+          jm ! NotifyWhenJobStatus(jobGraph.getJobID, JobStatus.RESTARTING)
+          jm ! RequestWorkingTaskManager(jobGraph.getJobID)
+
+          val WorkingTaskManager(tm) = expectMsgType[WorkingTaskManager]
+
+          tm match {
+            case ActorRef.noSender => fail("There has to be at least one task 
manager on which" +
+              "the tasks are running.")
+            case t => t ! PoisonPill
+          }
+
+          expectMsg(JobStatusIs(jobGraph.getJobID, JobStatus.RESTARTING))
 
           val result = expectMsgType[JobResultSuccess]
 

http://git-wip-us.apache.org/repos/asf/flink/blob/517289dc/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.scala
index 912ed95..0f6eeca 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.scala
@@ -23,7 +23,8 @@ import akka.testkit.{ImplicitSender, TestKit}
 import org.apache.flink.runtime.jobgraph.{AbstractJobVertex, 
DistributionPattern, JobGraph}
 import org.apache.flink.runtime.jobmanager.Tasks.{BlockingReceiver, Sender}
 import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultFailed, 
SubmissionSuccess, SubmitJob}
-import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.{AllVerticesRunning,
 WaitForAllVerticesToBeRunningOrFinished}
+import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.{WorkingTaskManager,
+RequestWorkingTaskManager, AllVerticesRunning, 
WaitForAllVerticesToBeRunningOrFinished}
 import org.apache.flink.runtime.testingUtils.TestingUtils
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
@@ -55,7 +56,6 @@ with WordSpecLike with Matchers with BeforeAndAfterAll {
 
       val cluster = TestingUtils.startTestingCluster(num_tasks, 2)
 
-      val taskManagers = cluster.getTaskManagers
       val jm = cluster.getJobManager
 
       try {
@@ -64,10 +64,14 @@ with WordSpecLike with Matchers with BeforeAndAfterAll {
           expectMsg(SubmissionSuccess(jobGraph.getJobID))
 
           jm ! WaitForAllVerticesToBeRunningOrFinished(jobID)
+
           expectMsg(AllVerticesRunning(jobID))
 
+          jm ! RequestWorkingTaskManager(jobID)
+
+          val tm = expectMsgType[WorkingTaskManager].taskManager
           // kill one task manager
-          taskManagers(0) ! PoisonPill
+          tm ! PoisonPill
           expectMsgType[JobResultFailed]
         }
       }finally{

http://git-wip-us.apache.org/repos/asf/flink/blob/517289dc/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
index cede7f2..fba7c76 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala
@@ -24,7 +24,7 @@ import org.apache.flink.runtime.jobgraph.{AbstractJobVertex, 
DistributionPattern
 import org.apache.flink.runtime.jobmanager.Tasks.{BlockingReceiver, Sender}
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup
 import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultFailed, 
SubmissionSuccess, SubmitJob}
-import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.{AllVerticesRunning,
 WaitForAllVerticesToBeRunningOrFinished}
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
 import org.apache.flink.runtime.testingUtils.TestingUtils
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
@@ -71,6 +71,7 @@ ImplicitSender with WordSpecLike with Matchers with 
BeforeAndAfterAll {
           expectMsg(SubmissionSuccess(jobGraph.getJobID))
 
           jm ! WaitForAllVerticesToBeRunningOrFinished(jobID)
+
           expectMsg(AllVerticesRunning(jobID))
 
           //kill task manager

http://git-wip-us.apache.org/repos/asf/flink/blob/517289dc/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationITCase.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationITCase.scala
index e4c1efb..bd542b7 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationITCase.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationITCase.scala
@@ -22,12 +22,14 @@ import java.net.InetAddress
 
 import akka.actor._
 import akka.testkit.{TestKit, ImplicitSender}
+import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.instance.{InstanceID, HardwareDescription, 
InstanceConnectionInfo}
 import 
org.apache.flink.runtime.messages.RegistrationMessages.{AlreadyRegistered,
 RefuseRegistration, AcknowledgeRegistration, RegisterTaskManager}
 import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat
 import org.apache.flink.runtime.testingUtils.TestingUtils
 import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
+import scala.concurrent.duration._
 
 class TaskManagerRegistrationITCase(_system: ActorSystem) extends 
TestKit(_system) with
 ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {
@@ -82,6 +84,9 @@ ImplicitSender with WordSpecLike with Matchers with 
BeforeAndAfterAll {
       val tm = TestingUtils.startTestingTaskManager(self)
 
       try {
+        ignoreMsg{
+          case _: Heartbeat => true
+        }
         within(TestingUtils.TESTING_DURATION) {
           expectMsgType[RegisterTaskManager]
 
@@ -90,14 +95,32 @@ ImplicitSender with WordSpecLike with Matchers with 
BeforeAndAfterAll {
           tm ! RefuseRegistration("Should be ignored")
 
           // Check if the TaskManager is still alive
-          tm ! Identify
-
-          expectMsgPF() {
-            // wait for actor identity
-            case x: ActorIdentity => true
-            // ignore heartbeats
-            case h: Heartbeat => false
-          }
+          tm ! Identify(1)
+
+          expectMsgType[ActorIdentity]
+
+        }
+      } finally {
+        tm ! Kill
+      }
+    }
+
+    "shutdown after the maximum registration duration has been exceeded" in {
+
+      val config = new Configuration()
+      config.setString(ConfigConstants.JOB_MANAGER_AKKA_URL, 
self.path.toString)
+      config.setString(ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, 
"1 second")
+
+      val tm = 
TestingUtils.startTestingTaskManagerWithConfiguration("LOCALHOST", config)
+
+      watch(tm)
+
+      try {
+        ignoreMsg{
+          case _: RegisterTaskManager => true
+        }
+        within(2 seconds) {
+          expectTerminated(tm)
         }
       } finally {
         tm ! Kill

http://git-wip-us.apache.org/repos/asf/flink/blob/517289dc/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
index 42832b4..334de11 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
@@ -133,6 +133,18 @@ object Tasks {
   }
 
   class BlockingOnceReceiver extends Receiver {
+    import BlockingOnceReceiver.blocking
+
+    override def invoke(): Unit = {
+      if(blocking) {
+        val o = new Object
+        o.synchronized{
+          o.wait()
+        }
+      } else {
+        super.invoke()
+      }
+    }
 
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/517289dc/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
index bd5bc50..ad85547 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala
@@ -18,27 +18,35 @@
 
 package org.apache.flink.runtime.testingUtils
 
-import akka.actor.{ActorRef, Props}
+import akka.actor.{Cancellable, ActorRef, Props}
 import akka.pattern.{ask, pipe}
 import org.apache.flink.runtime.ActorLogMessages
 import org.apache.flink.runtime.execution.ExecutionState
-import org.apache.flink.runtime.jobgraph.JobID
+import org.apache.flink.runtime.jobgraph.{JobStatus, JobID}
 import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist}
-import 
org.apache.flink.runtime.messages.ExecutionGraphMessages.ExecutionStateChanged
+import 
org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
 
 import scala.collection.convert.WrapAsScala
 import scala.concurrent.Future
+import scala.concurrent.duration._
 
 
 trait TestingJobManager extends ActorLogMessages with WrapAsScala {
   that: JobManager =>
 
+  import context._
+
   val waitForAllVerticesToBeRunning = scala.collection.mutable.HashMap[JobID, 
Set[ActorRef]]()
 
   val waitForAllVerticesToBeRunningOrFinished =
     scala.collection.mutable.HashMap[JobID, Set[ActorRef]]()
 
+  var periodicCheck: Option[Cancellable] = None
+
+  val waitForJobStatus = scala.collection.mutable.HashMap[JobID,
+    collection.mutable.HashMap[JobStatus, Set[ActorRef]]]()
+
   override def archiveProps = Props(new MemoryArchivist(archiveCount) with 
TestingMemoryArchivist)
 
   abstract override def receiveWithLogMessages: Receive = {
@@ -52,54 +60,42 @@ trait TestingJobManager extends ActorLogMessages with 
WrapAsScala {
           executionGraph)
         case None => archive.tell(RequestExecutionGraph(jobID), sender)
       }
+
     case WaitForAllVerticesToBeRunning(jobID) =>
       if(checkIfAllVerticesRunning(jobID)){
         sender ! AllVerticesRunning(jobID)
       }else{
-        currentJobs.get(jobID) match {
-          case Some((eg, _)) => eg.registerExecutionListener(self)
-          case None =>
-        }
         val waiting = waitForAllVerticesToBeRunning.getOrElse(jobID, 
Set[ActorRef]())
         waitForAllVerticesToBeRunning += jobID -> (waiting + sender)
+
+        if(periodicCheck.isEmpty){
+          periodicCheck =
+            Some(context.system.scheduler.schedule(0 seconds, 200 millis, 
self, NotifyListeners))
+        }
       }
     case WaitForAllVerticesToBeRunningOrFinished(jobID) =>
       if(checkIfAllVerticesRunningOrFinished(jobID)){
         sender ! AllVerticesRunning(jobID)
       }else{
-        currentJobs.get(jobID) match {
-          case Some((eg, _)) => eg.registerExecutionListener(self)
-          case None =>
-        }
         val waiting = waitForAllVerticesToBeRunningOrFinished.getOrElse(jobID, 
Set[ActorRef]())
         waitForAllVerticesToBeRunningOrFinished += jobID -> (waiting + sender)
-      }
-    case ExecutionStateChanged(jobID, _, _, _, _, _, _, _, _) =>
-      val cleanupRunning = waitForAllVerticesToBeRunning.get(jobID) match {
-        case Some(listeners) if checkIfAllVerticesRunning(jobID) =>
-          for(listener <- listeners){
-            listener ! AllVerticesRunning(jobID)
-          }
-          true
-        case _ => false
-      }
 
-      if(cleanupRunning){
-        waitForAllVerticesToBeRunning.remove(jobID)
+        if(periodicCheck.isEmpty){
+          periodicCheck =
+            Some(context.system.scheduler.schedule(0 seconds, 200 millis, 
self, NotifyListeners))
+        }
       }
 
-      val cleanupRunningOrFinished = 
waitForAllVerticesToBeRunningOrFinished.get(jobID) match {
-        case Some(listeners) if checkIfAllVerticesRunningOrFinished(jobID) =>
-          for(listener <- listeners){
-            listener ! AllVerticesRunning(jobID)
-          }
-          true
-        case _ => false
+    case NotifyListeners =>
+      for(jobID <- currentJobs.keySet){
+        notifyListeners(jobID)
       }
 
-      if (cleanupRunningOrFinished) {
-        waitForAllVerticesToBeRunningOrFinished.remove(jobID)
+      if(waitForAllVerticesToBeRunning.isEmpty && 
waitForAllVerticesToBeRunningOrFinished.isEmpty) {
+        periodicCheck foreach { _.cancel() }
+        periodicCheck = None
       }
+
     case NotifyWhenJobRemoved(jobID) => {
       val tms = instanceManager.getAllRegisteredInstances.map(_.getTaskManager)
 
@@ -113,6 +109,44 @@ trait TestingJobManager extends ActorLogMessages with 
WrapAsScala {
       Future.fold(responses)(true)(_ & _) pipeTo sender
     }
 
+    case RequestWorkingTaskManager(jobID) =>
+      currentJobs.get(jobID) match {
+        case Some((eg, _)) =>
+          if(eg.getAllExecutionVertices.isEmpty){
+            sender ! WorkingTaskManager(ActorRef.noSender)
+          } else {
+            val resource = 
eg.getAllExecutionVertices.head.getCurrentAssignedResource
+
+            if(resource == null){
+              sender ! WorkingTaskManager(ActorRef.noSender)
+            } else {
+              sender ! 
WorkingTaskManager(resource.getInstance().getTaskManager)
+            }
+          }
+        case None => sender ! WorkingTaskManager(ActorRef.noSender)
+      }
+
+    case NotifyWhenJobStatus(jobID, state) =>
+      val jobStatusListener = waitForJobStatus.getOrElseUpdate(jobID,
+        scala.collection.mutable.HashMap[JobStatus, Set[ActorRef]]())
+
+      val listener = jobStatusListener.getOrElse(state, Set[ActorRef]())
+
+      jobStatusListener += state -> (listener + sender)
+
+    case msg@JobStatusChanged(jobID, newJobStatus, _, _) =>
+      super.receiveWithLogMessages(msg)
+      waitForJobStatus.get(jobID) match {
+        case Some(stateListener) =>
+          stateListener.get(newJobStatus) match {
+            case Some(listeners) =>
+              listeners foreach {
+                _ ! JobStatusIs(jobID, newJobStatus)
+              }
+            case _ =>
+          }
+        case _ =>
+      }
   }
 
   def checkIfAllVerticesRunning(jobID: JobID): Boolean = {
@@ -134,4 +168,32 @@ trait TestingJobManager extends ActorLogMessages with 
WrapAsScala {
       case None => false
     }
   }
+
+  def notifyListeners(jobID: JobID): Unit = {
+    val cleanupRunning = waitForAllVerticesToBeRunning.get(jobID) match {
+      case Some(listeners) if checkIfAllVerticesRunning(jobID) =>
+        for(listener <- listeners){
+          listener ! AllVerticesRunning(jobID)
+        }
+        true
+      case _ => false
+    }
+
+    if(cleanupRunning){
+      waitForAllVerticesToBeRunning.remove(jobID)
+    }
+
+    val cleanupRunningOrFinished = 
waitForAllVerticesToBeRunningOrFinished.get(jobID) match {
+      case Some(listeners) if checkIfAllVerticesRunningOrFinished(jobID) =>
+        for(listener <- listeners){
+          listener ! AllVerticesRunning(jobID)
+        }
+        true
+      case _ => false
+    }
+
+    if (cleanupRunningOrFinished) {
+      waitForAllVerticesToBeRunningOrFinished.remove(jobID)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/517289dc/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
index cdd81cd..c59efdc 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
@@ -18,8 +18,9 @@
 
 package org.apache.flink.runtime.testingUtils
 
+import akka.actor.ActorRef
 import org.apache.flink.runtime.executiongraph.ExecutionGraph
-import org.apache.flink.runtime.jobgraph.JobID
+import org.apache.flink.runtime.jobgraph.{JobStatus, JobID}
 
 object TestingJobManagerMessages {
 
@@ -39,4 +40,12 @@ object TestingJobManagerMessages {
   case class AllVerticesRunning(jobID: JobID)
 
   case class NotifyWhenJobRemoved(jobID: JobID)
+
+  case class RequestWorkingTaskManager(jobID: JobID)
+  case class WorkingTaskManager(taskManager: ActorRef)
+
+  case class NotifyWhenJobStatus(jobID: JobID, state: JobStatus)
+  case class JobStatusIs(jobID: JobID, state: JobStatus)
+
+  case object NotifyListeners
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/517289dc/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index b658279..5fd133f 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -44,10 +44,12 @@ object TestingUtils {
     val ioRWSerializerClass = 
classOf[IOReadableWritableSerializer].getCanonicalName
     val ioRWClass = classOf[IOReadableWritable].getCanonicalName
 
+    val logLevel = AkkaUtils.getLogLevel
+
     s"""akka.daemonic = on
       |akka.test.timefactor = 10
       |akka.loggers = ["akka.event.slf4j.Slf4jLogger"]
-      |akka.loglevel = "DEBUG"
+      |akka.loglevel = $logLevel
       |akka.stdout-loglevel = "OFF"
       |akka.jvm-exit-on-fata-error = off
       |akka.log-config-on-start = off

Reply via email to