This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 953d42e [FLINK-10405] [tests] Port JobManagerFailsITCase to new code
base (#6841)
953d42e is described below
commit 953d42e0851ddc070b1d178c8bac4f2b563247d8
Author: Tzu-Li Chen <[email protected]>
AuthorDate: Wed Oct 17 06:26:24 2018 +0800
[FLINK-10405] [tests] Port JobManagerFailsITCase to new code base (#6841)
* [FLINK-10405] [tests] Port JobManagerFailsITCase to new code base
* revert redundant assertion
---
... JobManagerHAProcessFailureRecoveryITCase.java} | 10 +-
.../runtime/jobmanager/JobManagerFailsITCase.scala | 148 ---------------------
2 files changed, 5 insertions(+), 153 deletions(-)
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java
similarity index 98%
rename from
flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
rename to
flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java
index 9e9ce07..ec3f1e1 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java
@@ -93,7 +93,7 @@ import static org.junit.Assert.fail;
*/
@SuppressWarnings("serial")
@RunWith(Parameterized.class)
-public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
+public class JobManagerHAProcessFailureRecoveryITCase extends TestLogger {
private static ZooKeeperTestEnvironment zooKeeper;
@@ -131,7 +131,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase
extends TestLogger {
private final ExecutionMode executionMode;
- public JobManagerHAProcessFailureBatchRecoveryITCase(ExecutionMode
executionMode) {
+ public JobManagerHAProcessFailureRecoveryITCase(ExecutionMode
executionMode) {
this.executionMode = executionMode;
}
@@ -369,9 +369,9 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase
extends TestLogger {
leaderRetrievalService.stop();
}
- for (DispatcherProcess jmProces : dispatcherProcesses) {
- if (jmProces != null) {
- jmProces.destroy();
+ for (DispatcherProcess dispatcherProcess :
dispatcherProcesses) {
+ if (dispatcherProcess != null) {
+ dispatcherProcess.destroy();
}
}
diff --git
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
deleted file mode 100644
index 5c9b1fb..0000000
---
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.runtime.jobmanager
-
-import akka.actor.ActorSystem
-import akka.testkit.{ImplicitSender, TestKit}
-import org.apache.flink.configuration.{ConfigConstants, Configuration,
JobManagerOptions, TaskManagerOptions}
-import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
-import org.apache.flink.runtime.jobgraph.{JobGraph, JobVertex}
-import org.apache.flink.runtime.messages.Acknowledge
-import org.apache.flink.runtime.messages.JobManagerMessages._
-import
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered
-import org.apache.flink.runtime.testingUtils.TestingMessages.DisableDisconnect
-import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils,
TestingCluster, TestingUtils}
-import org.apache.flink.runtime.testtasks.{BlockingNoOpInvokable,
NoOpInvokable}
-import org.junit.runner.RunWith
-import org.scalatest.junit.JUnitRunner
-import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
-
-@RunWith(classOf[JUnitRunner])
-class JobManagerFailsITCase(_system: ActorSystem)
- extends TestKit(_system)
- with ImplicitSender
- with WordSpecLike
- with Matchers
- with BeforeAndAfterAll
- with ScalaTestingUtils {
-
- def this() = this(ActorSystem("TestingActorSystem",
AkkaUtils.getDefaultAkkaConfig))
-
- override def afterAll(): Unit = {
- TestKit.shutdownActorSystem(system)
- }
-
- "A TaskManager" should {
- "detect a lost connection to the JobManager and try to reconnect to it" in
{
-
- val num_slots = 4
- val cluster = startDeathwatchCluster(num_slots, 1)
-
- try {
- val tm = cluster.getTaskManagers(0)
- val jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION)
-
- // disable disconnect message to test death watch
- tm ! DisableDisconnect
-
- within(TestingUtils.TESTING_DURATION) {
- jmGateway.tell(RequestNumberRegisteredTaskManager, self)
- expectMsg(1)
-
- // stop the current leader and make sure that he is gone
- TestingUtils.stopActorGracefully(jmGateway)
-
- cluster.restartLeadingJobManager()
-
- cluster.waitForTaskManagersToBeRegistered()
-
- cluster.getLeaderGateway(TestingUtils.TESTING_DURATION)
- .tell(RequestNumberRegisteredTaskManager, self)
-
- expectMsg(1)
- }
- } finally {
- cluster.stop()
- }
- }
-
- "go into a clean state in case of a JobManager failure" in {
- val num_slots = 4
-
- val sender = new JobVertex("BlockingSender")
- sender.setParallelism(num_slots)
- sender.setInvokableClass(classOf[BlockingNoOpInvokable])
- val jobGraph = new JobGraph("Blocking Testjob", sender)
-
- val noOp = new JobVertex("NoOpInvokable")
- noOp.setParallelism(num_slots)
- noOp.setInvokableClass(classOf[NoOpInvokable])
- val jobGraph2 = new JobGraph("NoOp Testjob", noOp)
-
- val cluster = startDeathwatchCluster(num_slots / 2, 2)
-
- try {
- var jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION)
- val tm = cluster.getTaskManagers(0)
-
- within(TestingUtils.TESTING_DURATION) {
- jmGateway.tell(SubmitJob(jobGraph, ListeningBehaviour.DETACHED),
self)
- expectMsg(JobSubmitSuccess(jobGraph.getJobID))
-
- // stop the current leader and make sure that he is gone
- TestingUtils.stopActorGracefully(jmGateway)
-
- cluster.restartLeadingJobManager()
-
- jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION)
-
- // Ask the job manager for the TMs. Don't ask the TMs, because they
- // can still have state associated with the old job manager.
- jmGateway.tell(NotifyWhenAtLeastNumTaskManagerAreRegistered(2), self)
- expectMsg(Acknowledge.get())
-
- jmGateway.tell(SubmitJob(jobGraph2,
ListeningBehaviour.EXECUTION_RESULT), self)
-
- expectMsg(JobSubmitSuccess(jobGraph2.getJobID()))
-
- val result = expectMsgType[JobResultSuccess]
-
- result.result.getJobId() should equal(jobGraph2.getJobID)
- }
- } finally {
- cluster.stop()
- }
- }
- }
-
- def startDeathwatchCluster(numSlots: Int, numTaskmanagers: Int):
TestingCluster = {
- val config = new Configuration()
- config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, numSlots)
- config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
numTaskmanagers)
- config.setInteger(JobManagerOptions.PORT, 0)
- config.setString(TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF, "50 ms")
- config.setString(TaskManagerOptions.REGISTRATION_MAX_BACKOFF, "100 ms")
-
- val cluster = new TestingCluster(config, singleActorSystem = false)
-
- cluster.start()
-
- cluster
- }
-}