[tests] Cleanup style and timeouts in recovery restart tests

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a51c02f6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a51c02f6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a51c02f6

Branch: refs/heads/master
Commit: a51c02f6e8be948d71a00c492808115d622379a7
Parents: 665e601
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Feb 19 20:31:56 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Feb 19 20:31:56 2015 +0100

----------------------------------------------------------------------
 .../runtime/jobmanager/RecoveryITCase.scala     | 23 +++++++++++----
 .../runtime/testingUtils/TestingUtils.scala     |  3 +-
 flink-tests/src/test/resources/log4j.properties | 27 -----------------
 .../jobmanager/JobManagerFailsITCase.scala      | 24 ++++++++-------
 .../taskmanager/TaskManagerFailsITCase.scala    | 31 +++++++++-----------
 5 files changed, 46 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a51c02f6/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
index 2c1f82f..e7d1d83 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/RecoveryITCase.scala
@@ -21,13 +21,13 @@ package org.apache.flink.runtime.jobmanager
 import akka.actor.Status.Success
 import akka.actor.{ActorRef, PoisonPill, ActorSystem}
 import akka.testkit.{ImplicitSender, TestKit}
-import org.apache.flink.runtime.jobgraph.{JobStatus, JobGraph, 
DistributionPattern,
-AbstractJobVertex}
+import org.apache.flink.configuration.{ConfigConstants, Configuration}
+import org.apache.flink.runtime.jobgraph.{JobStatus, JobGraph, 
DistributionPattern,AbstractJobVertex}
 import org.apache.flink.runtime.jobmanager.Tasks.{BlockingOnceReceiver, 
FailingOnceReceiver}
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup
 import org.apache.flink.runtime.messages.JobManagerMessages.{JobResultSuccess, 
SubmitJob}
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
-import org.apache.flink.runtime.testingUtils.TestingUtils
+import org.apache.flink.runtime.testingUtils.{TestingCluster, TestingUtils}
 import org.junit.runner.RunWith
 import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
 import org.scalatest.junit.JUnitRunner
@@ -35,12 +35,23 @@ import org.scalatest.junit.JUnitRunner
 @RunWith(classOf[JUnitRunner])
 class RecoveryITCase(_system: ActorSystem) extends TestKit(_system) with 
ImplicitSender with
 WordSpecLike with Matchers with BeforeAndAfterAll {
+
   def this() = this(ActorSystem("TestingActorSystem", TestingUtils.testConfig))
 
   override def afterAll(): Unit = {
     TestKit.shutdownActorSystem(system)
   }
 
+  def startTestClusterWithHeartbeatTimeout(numSlots: Int,
+                                                numTaskManagers: Int,
+                                                heartbeatTimeout: String): 
TestingCluster = {
+    val config = new Configuration()
+    config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
+    
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 
numTaskManagers)
+    config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, 
heartbeatTimeout)
+    new TestingCluster(config)
+  }
+
   val NUM_TASKS = 31
 
   "The recovery" must {
@@ -61,7 +72,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
       val jobGraph = new JobGraph("Pointwise job", sender, receiver)
       jobGraph.setNumberOfExecutionRetries(1)
 
-      val cluster = TestingUtils.startTestingCluster(2 * NUM_TASKS)
+      val cluster = startTestClusterWithHeartbeatTimeout(2 * NUM_TASKS, 1, "2 
s")
       val jm = cluster.getJobManager
 
       try {
@@ -104,7 +115,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
       val jobGraph = new JobGraph("Pointwise job", sender, receiver)
       jobGraph.setNumberOfExecutionRetries(1)
 
-      val cluster = TestingUtils.startTestingCluster(NUM_TASKS)
+      val cluster = startTestClusterWithHeartbeatTimeout(NUM_TASKS, 1, "2 s")
       val jm = cluster.getJobManager
 
       try {
@@ -147,7 +158,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {
       val jobGraph = new JobGraph("Pointwise job", sender, receiver)
       jobGraph.setNumberOfExecutionRetries(1)
 
-      val cluster = TestingUtils.startTestingCluster(NUM_TASKS, 2)
+      val cluster = startTestClusterWithHeartbeatTimeout(NUM_TASKS, 2, "2 s")
 
       val jm = cluster.getJobManager
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a51c02f6/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 72ed9e7..147cc8a 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -96,8 +96,7 @@ object TestingUtils {
   }
 
   def startTestingCluster(numSlots: Int, numTMs: Int = 1,
-                          timeout: String = DEFAULT_AKKA_ASK_TIMEOUT):
-  TestingCluster = {
+                          timeout: String = DEFAULT_AKKA_ASK_TIMEOUT): 
TestingCluster = {
     val config = new Configuration()
     config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
     
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 
numTMs)

http://git-wip-us.apache.org/repos/asf/flink/blob/a51c02f6/flink-tests/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/log4j.properties 
b/flink-tests/src/test/resources/log4j.properties
deleted file mode 100644
index 6bf344a..0000000
--- a/flink-tests/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,27 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-# This file ensures that tests executed from the IDE show log output
-
-log4j.rootLogger=INFO, console
-
-# Log all infos in the given file
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.target = System.err
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x 
- %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/a51c02f6/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
index ac1864c..416470f 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
@@ -48,16 +48,9 @@ with WordSpecLike with Matchers with BeforeAndAfterAll {
 
   "A TaskManager" should {
     "detect a lost connection to the JobManager and try to reconnect to it" in 
{
-      val num_slots = 11
-
-      val config = new Configuration()
-      config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, num_slots)
-      
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1)
-      config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 
ms")
-      config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "4000 ms")
-      config.setDouble(ConfigConstants.AKKA_WATCH_THRESHOLD, 5)
 
-      val cluster = new ForkableFlinkMiniCluster(config, singleActorSystem = 
false)
+      val num_slots = 11
+      val cluster = startDeathwatchCluster(num_slots, 1)
 
       val tm = cluster.getTaskManagers(0)
       val jm = cluster.getJobManager
@@ -100,7 +93,7 @@ with WordSpecLike with Matchers with BeforeAndAfterAll {
       noOp.setInvokableClass(classOf[NoOpInvokable])
       val jobGraph2 = new JobGraph("NoOp Testjob", noOp)
 
-      val cluster = ForkableFlinkMiniCluster.startClusterDeathWatch(num_slots 
/ 2, 2)
+      val cluster = startDeathwatchCluster(num_slots / 2, 2)
 
       var jm = cluster.getJobManager
       val tm = cluster.getTaskManagers(0)
@@ -135,4 +128,15 @@ with WordSpecLike with Matchers with BeforeAndAfterAll {
       }
     }
   }
+
+  def startDeathwatchCluster(numSlots: Int, numTaskmanagers: Int): 
ForkableFlinkMiniCluster = {
+    val config = new Configuration()
+    config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
+    
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 
numTaskmanagers)
+    config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms")
+    config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "4000 ms")
+    config.setDouble(ConfigConstants.AKKA_WATCH_THRESHOLD, 5)
+
+    new ForkableFlinkMiniCluster(config, singleActorSystem = false)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a51c02f6/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
index c81ec88..245bcd9 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
@@ -52,16 +52,9 @@ with WordSpecLike with Matchers with BeforeAndAfterAll {
   "The JobManager" should {
 
     "detect a failing task manager" in {
-      val num_slots = 11
-
-      val config = new Configuration()
-      config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, num_slots)
-      
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 2)
-      config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 
ms")
-      config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "4000 ms")
-      config.setDouble(ConfigConstants.AKKA_WATCH_THRESHOLD, 5)
 
-      val cluster = new ForkableFlinkMiniCluster(config, singleActorSystem = 
false)
+      val num_slots = 11
+      val cluster = startDeathwatchCluster(num_slots, 2)
 
       val taskManagers = cluster.getTaskManagers
       val jm = cluster.getJobManager
@@ -89,6 +82,7 @@ with WordSpecLike with Matchers with BeforeAndAfterAll {
     }
 
     "handle gracefully failing task manager" in {
+
       val num_tasks = 31
       val sender = new AbstractJobVertex("Sender")
       val receiver = new AbstractJobVertex("Receiver")
@@ -190,14 +184,7 @@ with WordSpecLike with Matchers with BeforeAndAfterAll {
       noOp.setInvokableClass(classOf[NoOpInvokable])
       val jobGraph2 = new JobGraph("NoOp Testjob", noOp)
 
-      val config = new Configuration()
-      config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
num_slots/2)
-      
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 2)
-      config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 
ms")
-      config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "4000 ms")
-      config.setDouble(ConfigConstants.AKKA_WATCH_THRESHOLD, 5)
-
-      val cluster = new ForkableFlinkMiniCluster(config, singleActorSystem = 
false)
+      val cluster = startDeathwatchCluster(num_slots/2, 2)
 
       var tm = cluster.getTaskManagers(0)
       val jm = cluster.getJobManager
@@ -239,4 +226,14 @@ with WordSpecLike with Matchers with BeforeAndAfterAll {
     }
   }
 
+  def startDeathwatchCluster(numSlots: Int, numTaskmanagers: Int): 
ForkableFlinkMiniCluster = {
+    val config = new Configuration()
+    config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
+    
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 
numTaskmanagers)
+    config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms")
+    config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "4000 ms")
+    config.setDouble(ConfigConstants.AKKA_WATCH_THRESHOLD, 5)
+
+    new ForkableFlinkMiniCluster(config, singleActorSystem = false)
+  }
 }

Reply via email to