[runtime tests] Fix buggy recover a task manager failure integration test case
[runtime tests] Fix TaskManagerFailsITCase and TaskManagerFailsWithSlotSharingITCase Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/517289dc Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/517289dc Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/517289dc Branch: refs/heads/master Commit: 517289dc599e697e7a14fae175421e93a16b955b Parents: 730e056 Author: Till Rohrmann <[email protected]> Authored: Sun Jan 25 19:33:03 2015 +0100 Committer: Till Rohrmann <[email protected]> Committed: Tue Jan 27 14:33:49 2015 +0100 ---------------------------------------------------------------------- .../flink/runtime/taskmanager/TaskManager.scala | 23 ++-- .../runtime/jobmanager/RecoveryITCase.scala | 25 +++- .../jobmanager/TaskManagerFailsITCase.scala | 10 +- .../TaskManagerFailsWithSlotSharingITCase.scala | 3 +- .../TaskManagerRegistrationITCase.scala | 39 ++++-- .../apache/flink/runtime/jobmanager/Tasks.scala | 12 ++ .../testingUtils/TestingJobManager.scala | 126 ++++++++++++++----- .../TestingJobManagerMessages.scala | 11 +- .../runtime/testingUtils/TestingUtils.scala | 4 +- 9 files changed, 188 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/517289dc/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 17bc0ac..5d2d21d 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -178,6 +178,7 @@ import scala.collection.JavaConverters._ registrationDuration = 0 seconds registered = false + currentJobManager = ActorRef.noSender context.system.scheduler.scheduleOnce(registrationDelay, self, RegisterAtJobManager) } @@ -190,8 +191,8 @@ import scala.collection.JavaConverters._ registrationDelay *= 2 if (registrationDuration > maxRegistrationDuration) { - log.warning("TaskManager could not register at JobManager {} after {}.", jobManagerAkkaURL, - + log.warning("TaskManager could not register at JobManager {} after {}.", + jobManagerAkkaURL, maxRegistrationDuration) self ! PoisonPill @@ -212,10 +213,8 @@ import scala.collection.JavaConverters._ finishRegistration(id, blobPort) registered = true } else { - if (log.isDebugEnabled) { - log.debug("The TaskManager {} is already registered at the JobManager {}, but received " + - "another AcknowledgeRegistration message.", self.path, currentJobManager.path) - } + log.info("The TaskManager {} is already registered at the JobManager {}, but received " + + "another AcknowledgeRegistration message.", self.path, currentJobManager.path) } } @@ -228,10 +227,8 @@ import scala.collection.JavaConverters._ registered = true } else { // ignore AlreadyRegistered messages which arrived after AcknowledgeRegistration - if(log.isDebugEnabled){ - log.debug("The TaskManager {} has already been registered at the JobManager {}.", - self.path, sender.path) - } + log.info("The TaskManager {} has already been registered at the JobManager {}.", + self.path, sender.path) } case RefuseRegistration(reason) => @@ -243,10 +240,8 @@ import scala.collection.JavaConverters._ self ! PoisonPill } else { // ignore RefuseRegistration messages which arrived after AcknowledgeRegistration - if(log.isDebugEnabled) { - log.debug("Received RefuseRegistration from the JobManager even though being already " + - "registered") - } + log.info("Received RefuseRegistration from the JobManager even though being already " + + "registered") } case SubmitTask(tdd) => { http://git-wip-us.apache.org/repos/asf/flink/blob/517289dc/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala index 6ed7620..e20471d 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala @@ -18,13 +18,15 @@ package org.apache.flink.runtime.jobmanager -import akka.actor.{PoisonPill, ActorSystem} +import akka.actor.{ActorRef, PoisonPill, ActorSystem} import akka.testkit.{ImplicitSender, TestKit} -import org.apache.flink.runtime.jobgraph.{JobGraph, DistributionPattern, AbstractJobVertex} +import org.apache.flink.runtime.jobgraph.{JobStatus, JobGraph, DistributionPattern, +AbstractJobVertex} import org.apache.flink.runtime.jobmanager.Tasks.{BlockingOnceReceiver, FailingOnceReceiver} import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultSuccess, SubmissionSuccess, SubmitJob} +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._ import org.apache.flink.runtime.testingUtils.TestingUtils import org.junit.runner.RunWith import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} @@ -148,7 +150,6 @@ WordSpecLike with Matchers with BeforeAndAfterAll { val cluster = TestingUtils.startTestingCluster(NUM_TASKS, 2) val jm = cluster.getJobManager - val taskManagers = cluster.getTaskManagers try { within(TestingUtils.TESTING_DURATION){ @@ -156,9 +157,23 @@ WordSpecLike with Matchers with BeforeAndAfterAll { expectMsg(SubmissionSuccess(jobGraph.getJobID)) - Thread.sleep(500) + jm ! WaitForAllVerticesToBeRunningOrFinished(jobGraph.getJobID) + + expectMsg(AllVerticesRunning(jobGraph.getJobID)) + BlockingOnceReceiver.blocking = false - taskManagers(0) ! PoisonPill + jm ! NotifyWhenJobStatus(jobGraph.getJobID, JobStatus.RESTARTING) + jm ! RequestWorkingTaskManager(jobGraph.getJobID) + + val WorkingTaskManager(tm) = expectMsgType[WorkingTaskManager] + + tm match { + case ActorRef.noSender => fail("There has to be at least one task manager on which" + + "the tasks are running.") + case t => t ! PoisonPill + } + + expectMsg(JobStatusIs(jobGraph.getJobID, JobStatus.RESTARTING)) val result = expectMsgType[JobResultSuccess] http://git-wip-us.apache.org/repos/asf/flink/blob/517289dc/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.scala index 912ed95..0f6eeca 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsITCase.scala @@ -23,7 +23,8 @@ import akka.testkit.{ImplicitSender, TestKit} import org.apache.flink.runtime.jobgraph.{AbstractJobVertex, DistributionPattern, JobGraph} import org.apache.flink.runtime.jobmanager.Tasks.{BlockingReceiver, Sender} import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultFailed, SubmissionSuccess, SubmitJob} -import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.{AllVerticesRunning, WaitForAllVerticesToBeRunningOrFinished} +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.{WorkingTaskManager, +RequestWorkingTaskManager, AllVerticesRunning, WaitForAllVerticesToBeRunningOrFinished} import org.apache.flink.runtime.testingUtils.TestingUtils import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner @@ -55,7 +56,6 @@ with WordSpecLike with Matchers with BeforeAndAfterAll { val cluster = TestingUtils.startTestingCluster(num_tasks, 2) - val taskManagers = cluster.getTaskManagers val jm = cluster.getJobManager try { @@ -64,10 +64,14 @@ with WordSpecLike with Matchers with BeforeAndAfterAll { expectMsg(SubmissionSuccess(jobGraph.getJobID)) jm ! WaitForAllVerticesToBeRunningOrFinished(jobID) + expectMsg(AllVerticesRunning(jobID)) + jm ! RequestWorkingTaskManager(jobID) + + val tm = expectMsgType[WorkingTaskManager].taskManager // kill one task manager - taskManagers(0) ! PoisonPill + tm ! PoisonPill expectMsgType[JobResultFailed] } }finally{ http://git-wip-us.apache.org/repos/asf/flink/blob/517289dc/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala index cede7f2..fba7c76 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerFailsWithSlotSharingITCase.scala @@ -24,7 +24,7 @@ import org.apache.flink.runtime.jobgraph.{AbstractJobVertex, DistributionPattern import org.apache.flink.runtime.jobmanager.Tasks.{BlockingReceiver, Sender} import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultFailed, SubmissionSuccess, SubmitJob} -import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.{AllVerticesRunning, WaitForAllVerticesToBeRunningOrFinished} +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._ import org.apache.flink.runtime.testingUtils.TestingUtils import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner @@ -71,6 +71,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll { expectMsg(SubmissionSuccess(jobGraph.getJobID)) jm ! WaitForAllVerticesToBeRunningOrFinished(jobID) + expectMsg(AllVerticesRunning(jobID)) //kill task manager http://git-wip-us.apache.org/repos/asf/flink/blob/517289dc/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationITCase.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationITCase.scala index e4c1efb..bd542b7 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/TaskManagerRegistrationITCase.scala @@ -22,12 +22,14 @@ import java.net.InetAddress import akka.actor._ import akka.testkit.{TestKit, ImplicitSender} +import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.apache.flink.runtime.instance.{InstanceID, HardwareDescription, InstanceConnectionInfo} import org.apache.flink.runtime.messages.RegistrationMessages.{AlreadyRegistered, RefuseRegistration, AcknowledgeRegistration, RegisterTaskManager} import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat import org.apache.flink.runtime.testingUtils.TestingUtils import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} +import scala.concurrent.duration._ class TaskManagerRegistrationITCase(_system: ActorSystem) extends TestKit(_system) with ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll { @@ -82,6 +84,9 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll { val tm = TestingUtils.startTestingTaskManager(self) try { + ignoreMsg{ + case _: Heartbeat => true + } within(TestingUtils.TESTING_DURATION) { expectMsgType[RegisterTaskManager] @@ -90,14 +95,32 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll { tm ! RefuseRegistration("Should be ignored") // Check if the TaskManager is still alive - tm ! Identify - - expectMsgPF() { - // wait for actor identity - case x: ActorIdentity => true - // ignore heartbeats - case h: Heartbeat => false - } + tm ! Identify(1) + + expectMsgType[ActorIdentity] + + } + } finally { + tm ! Kill + } + } + + "shutdown after the maximum registration duration has been exceeded" in { + + val config = new Configuration() + config.setString(ConfigConstants.JOB_MANAGER_AKKA_URL, self.path.toString) + config.setString(ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, "1 second") + + val tm = TestingUtils.startTestingTaskManagerWithConfiguration("LOCALHOST", config) + + watch(tm) + + try { + ignoreMsg{ + case _: RegisterTaskManager => true + } + within(2 seconds) { + expectTerminated(tm) } } finally { tm ! Kill http://git-wip-us.apache.org/repos/asf/flink/blob/517289dc/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala index 42832b4..334de11 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala @@ -133,6 +133,18 @@ object Tasks { } class BlockingOnceReceiver extends Receiver { + import BlockingOnceReceiver.blocking + + override def invoke(): Unit = { + if(blocking) { + val o = new Object + o.synchronized{ + o.wait() + } + } else { + super.invoke() + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/517289dc/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala index bd5bc50..ad85547 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala @@ -18,27 +18,35 @@ package org.apache.flink.runtime.testingUtils -import akka.actor.{ActorRef, Props} +import akka.actor.{Cancellable, ActorRef, Props} import akka.pattern.{ask, pipe} import org.apache.flink.runtime.ActorLogMessages import org.apache.flink.runtime.execution.ExecutionState -import org.apache.flink.runtime.jobgraph.JobID +import org.apache.flink.runtime.jobgraph.{JobStatus, JobID} import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist} -import org.apache.flink.runtime.messages.ExecutionGraphMessages.ExecutionStateChanged +import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._ import scala.collection.convert.WrapAsScala import scala.concurrent.Future +import scala.concurrent.duration._ trait TestingJobManager extends ActorLogMessages with WrapAsScala { that: JobManager => + import context._ + val waitForAllVerticesToBeRunning = scala.collection.mutable.HashMap[JobID, Set[ActorRef]]() val waitForAllVerticesToBeRunningOrFinished = scala.collection.mutable.HashMap[JobID, Set[ActorRef]]() + var periodicCheck: Option[Cancellable] = None + + val waitForJobStatus = scala.collection.mutable.HashMap[JobID, + collection.mutable.HashMap[JobStatus, Set[ActorRef]]]() + override def archiveProps = Props(new MemoryArchivist(archiveCount) with TestingMemoryArchivist) abstract override def receiveWithLogMessages: Receive = { @@ -52,54 +60,42 @@ trait TestingJobManager extends ActorLogMessages with WrapAsScala { executionGraph) case None => archive.tell(RequestExecutionGraph(jobID), sender) } + case WaitForAllVerticesToBeRunning(jobID) => if(checkIfAllVerticesRunning(jobID)){ sender ! AllVerticesRunning(jobID) }else{ - currentJobs.get(jobID) match { - case Some((eg, _)) => eg.registerExecutionListener(self) - case None => - } val waiting = waitForAllVerticesToBeRunning.getOrElse(jobID, Set[ActorRef]()) waitForAllVerticesToBeRunning += jobID -> (waiting + sender) + + if(periodicCheck.isEmpty){ + periodicCheck = + Some(context.system.scheduler.schedule(0 seconds, 200 millis, self, NotifyListeners)) + } } case WaitForAllVerticesToBeRunningOrFinished(jobID) => if(checkIfAllVerticesRunningOrFinished(jobID)){ sender ! AllVerticesRunning(jobID) }else{ - currentJobs.get(jobID) match { - case Some((eg, _)) => eg.registerExecutionListener(self) - case None => - } val waiting = waitForAllVerticesToBeRunningOrFinished.getOrElse(jobID, Set[ActorRef]()) waitForAllVerticesToBeRunningOrFinished += jobID -> (waiting + sender) - } - case ExecutionStateChanged(jobID, _, _, _, _, _, _, _, _) => - val cleanupRunning = waitForAllVerticesToBeRunning.get(jobID) match { - case Some(listeners) if checkIfAllVerticesRunning(jobID) => - for(listener <- listeners){ - listener ! AllVerticesRunning(jobID) - } - true - case _ => false - } - if(cleanupRunning){ - waitForAllVerticesToBeRunning.remove(jobID) + if(periodicCheck.isEmpty){ + periodicCheck = + Some(context.system.scheduler.schedule(0 seconds, 200 millis, self, NotifyListeners)) + } } - val cleanupRunningOrFinished = waitForAllVerticesToBeRunningOrFinished.get(jobID) match { - case Some(listeners) if checkIfAllVerticesRunningOrFinished(jobID) => - for(listener <- listeners){ - listener ! AllVerticesRunning(jobID) - } - true - case _ => false + case NotifyListeners => + for(jobID <- currentJobs.keySet){ + notifyListeners(jobID) } - if (cleanupRunningOrFinished) { - waitForAllVerticesToBeRunningOrFinished.remove(jobID) + if(waitForAllVerticesToBeRunning.isEmpty && waitForAllVerticesToBeRunningOrFinished.isEmpty) { + periodicCheck foreach { _.cancel() } + periodicCheck = None } + case NotifyWhenJobRemoved(jobID) => { val tms = instanceManager.getAllRegisteredInstances.map(_.getTaskManager) @@ -113,6 +109,44 @@ trait TestingJobManager extends ActorLogMessages with WrapAsScala { Future.fold(responses)(true)(_ & _) pipeTo sender } + case RequestWorkingTaskManager(jobID) => + currentJobs.get(jobID) match { + case Some((eg, _)) => + if(eg.getAllExecutionVertices.isEmpty){ + sender ! WorkingTaskManager(ActorRef.noSender) + } else { + val resource = eg.getAllExecutionVertices.head.getCurrentAssignedResource + + if(resource == null){ + sender ! WorkingTaskManager(ActorRef.noSender) + } else { + sender ! WorkingTaskManager(resource.getInstance().getTaskManager) + } + } + case None => sender ! WorkingTaskManager(ActorRef.noSender) + } + + case NotifyWhenJobStatus(jobID, state) => + val jobStatusListener = waitForJobStatus.getOrElseUpdate(jobID, + scala.collection.mutable.HashMap[JobStatus, Set[ActorRef]]()) + + val listener = jobStatusListener.getOrElse(state, Set[ActorRef]()) + + jobStatusListener += state -> (listener + sender) + + case msg@JobStatusChanged(jobID, newJobStatus, _, _) => + super.receiveWithLogMessages(msg) + waitForJobStatus.get(jobID) match { + case Some(stateListener) => + stateListener.get(newJobStatus) match { + case Some(listeners) => + listeners foreach { + _ ! JobStatusIs(jobID, newJobStatus) + } + case _ => + } + case _ => + } } def checkIfAllVerticesRunning(jobID: JobID): Boolean = { @@ -134,4 +168,32 @@ trait TestingJobManager extends ActorLogMessages with WrapAsScala { case None => false } } + + def notifyListeners(jobID: JobID): Unit = { + val cleanupRunning = waitForAllVerticesToBeRunning.get(jobID) match { + case Some(listeners) if checkIfAllVerticesRunning(jobID) => + for(listener <- listeners){ + listener ! AllVerticesRunning(jobID) + } + true + case _ => false + } + + if(cleanupRunning){ + waitForAllVerticesToBeRunning.remove(jobID) + } + + val cleanupRunningOrFinished = waitForAllVerticesToBeRunningOrFinished.get(jobID) match { + case Some(listeners) if checkIfAllVerticesRunningOrFinished(jobID) => + for(listener <- listeners){ + listener ! AllVerticesRunning(jobID) + } + true + case _ => false + } + + if (cleanupRunningOrFinished) { + waitForAllVerticesToBeRunningOrFinished.remove(jobID) + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/517289dc/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala index cdd81cd..c59efdc 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala @@ -18,8 +18,9 @@ package org.apache.flink.runtime.testingUtils +import akka.actor.ActorRef import org.apache.flink.runtime.executiongraph.ExecutionGraph -import org.apache.flink.runtime.jobgraph.JobID +import org.apache.flink.runtime.jobgraph.{JobStatus, JobID} object TestingJobManagerMessages { @@ -39,4 +40,12 @@ object TestingJobManagerMessages { case class AllVerticesRunning(jobID: JobID) case class NotifyWhenJobRemoved(jobID: JobID) + + case class RequestWorkingTaskManager(jobID: JobID) + case class WorkingTaskManager(taskManager: ActorRef) + + case class NotifyWhenJobStatus(jobID: JobID, state: JobStatus) + case class JobStatusIs(jobID: JobID, state: JobStatus) + + case object NotifyListeners } http://git-wip-us.apache.org/repos/asf/flink/blob/517289dc/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala index b658279..5fd133f 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala @@ -44,10 +44,12 @@ object TestingUtils { val ioRWSerializerClass = classOf[IOReadableWritableSerializer].getCanonicalName val ioRWClass = classOf[IOReadableWritable].getCanonicalName + val logLevel = AkkaUtils.getLogLevel + s"""akka.daemonic = on |akka.test.timefactor = 10 |akka.loggers = ["akka.event.slf4j.Slf4jLogger"] - |akka.loglevel = "DEBUG" + |akka.loglevel = $logLevel |akka.stdout-loglevel = "OFF" |akka.jvm-exit-on-fata-error = off |akka.log-config-on-start = off
