http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala deleted file mode 100644 index a6963fe..0000000 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala +++ /dev/null @@ -1,248 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.testingUtils - -import akka.actor.{ActorRef, Terminated} -import org.apache.flink.api.common.JobID -import org.apache.flink.runtime.FlinkActor -import org.apache.flink.runtime.execution.ExecutionState -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID -import org.apache.flink.runtime.messages.JobManagerMessages.{RequestLeaderSessionID, ResponseLeaderSessionID} -import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect} -import org.apache.flink.runtime.messages.RegistrationMessages.{AcknowledgeRegistration, AlreadyRegistered} -import org.apache.flink.runtime.messages.TaskMessages.{SubmitTask, TaskInFinalState, UpdateTaskExecutionState} -import org.apache.flink.runtime.taskmanager.TaskManager -import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved -import org.apache.flink.runtime.testingUtils.TestingMessages._ -import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages._ - -import scala.concurrent.duration._ -import scala.language.postfixOps - -/** This mixin can be used to decorate a TaskManager with messages for testing purposes. */ -trait TestingTaskManagerLike extends FlinkActor { - that: TaskManager => - - import scala.collection.JavaConverters._ - - val waitForRemoval = scala.collection.mutable.HashMap[ExecutionAttemptID, Set[ActorRef]]() - val waitForJobManagerToBeTerminated = scala.collection.mutable.HashMap[String, Set[ActorRef]]() - val waitForRegisteredAtResourceManager = - scala.collection.mutable.HashMap[ActorRef, Set[ActorRef]]() - val waitForRunning = scala.collection.mutable.HashMap[ExecutionAttemptID, Set[ActorRef]]() - val unregisteredTasks = scala.collection.mutable.HashSet[ExecutionAttemptID]() - - /** Map of registered task submit listeners */ - val registeredSubmitTaskListeners = scala.collection.mutable.HashMap[JobID, ActorRef]() - - val waitForShutdown = scala.collection.mutable.HashSet[ActorRef]() - - var disconnectDisabled = false - - /** - * Handler for testing related messages - */ - abstract override def handleMessage: Receive = { - handleTestingMessage orElse super.handleMessage - } - - def handleTestingMessage: Receive = { - case Alive => sender() ! Acknowledge - - case NotifyWhenTaskIsRunning(executionID) => - Option(runningTasks.get(executionID)) match { - case Some(task) if task.getExecutionState == ExecutionState.RUNNING => - sender ! decorateMessage(true) - - case _ => - val listeners = waitForRunning.getOrElse(executionID, Set()) - waitForRunning += (executionID -> (listeners + sender)) - } - - case RequestRunningTasks => - sender ! decorateMessage(ResponseRunningTasks(runningTasks.asScala.toMap)) - - case NotifyWhenTaskRemoved(executionID) => - Option(runningTasks.get(executionID)) match { - case Some(_) => - val set = waitForRemoval.getOrElse(executionID, Set()) - waitForRemoval += (executionID -> (set + sender)) - case None => - if(unregisteredTasks.contains(executionID)) { - sender ! decorateMessage(true) - } else { - val set = waitForRemoval.getOrElse(executionID, Set()) - waitForRemoval += (executionID -> (set + sender)) - } - } - - case TaskInFinalState(executionID) => - super.handleMessage(TaskInFinalState(executionID)) - waitForRemoval.remove(executionID) match { - case Some(actors) => for(actor <- actors) actor ! decorateMessage(true) - case None => - } - - unregisteredTasks += executionID - - case RequestBroadcastVariablesWithReferences => - sender ! decorateMessage( - ResponseBroadcastVariablesWithReferences( - bcVarManager.getNumberOfVariablesWithReferences) - ) - - case RequestNumActiveConnections => - val numActive = if (!network.isShutdown) { - network.getConnectionManager.getNumberOfActiveConnections - } else { - 0 - } - sender ! decorateMessage(ResponseNumActiveConnections(numActive)) - - case NotifyWhenJobRemoved(jobID) => - if(runningTasks.values.asScala.exists(_.getJobID == jobID)){ - context.system.scheduler.scheduleOnce( - 200 milliseconds, - self, - decorateMessage(CheckIfJobRemoved(jobID)))( - context.dispatcher, - sender() - ) - }else{ - sender ! decorateMessage(true) - } - - case CheckIfJobRemoved(jobID) => - if(runningTasks.values.asScala.forall(_.getJobID != jobID)){ - sender ! decorateMessage(true) - } else { - context.system.scheduler.scheduleOnce( - 200 milliseconds, - self, - decorateMessage(CheckIfJobRemoved(jobID)))( - context.dispatcher, - sender() - ) - } - - case NotifyWhenJobManagerTerminated(jobManager) => - val waiting = waitForJobManagerToBeTerminated.getOrElse(jobManager.path.name, Set()) - waitForJobManagerToBeTerminated += jobManager.path.name -> (waiting + sender) - - case RegisterSubmitTaskListener(jobId) => - registeredSubmitTaskListeners.put(jobId, sender()) - - case msg@SubmitTask(tdd) => - registeredSubmitTaskListeners.get(tdd.getJobID) match { - case Some(listenerRef) => - listenerRef ! ResponseSubmitTaskListener(tdd) - case None => - // Nothing to do - } - - super.handleMessage(msg) - - /** - * Message from task manager that accumulator values changed and need to be reported immediately - * instead of lazily through the - * [[org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat]] message. We forward this - * message to the job manager that it knows it should report to the listeners. - */ - case msg: AccumulatorsChanged => - currentJobManager match { - case Some(jobManager) => - jobManager.forward(msg) - sendHeartbeatToJobManager() - sender ! true - case None => - } - - case msg@Terminated(jobManager) => - super.handleMessage(msg) - - waitForJobManagerToBeTerminated.remove(jobManager.path.name) foreach { - _ foreach { - _ ! decorateMessage(JobManagerTerminated(jobManager)) - } - } - - case msg:Disconnect => - if (!disconnectDisabled) { - super.handleMessage(msg) - - val jobManager = sender() - - waitForJobManagerToBeTerminated.remove(jobManager.path.name) foreach { - _ foreach { - _ ! decorateMessage(JobManagerTerminated(jobManager)) - } - } - } - - case DisableDisconnect => - disconnectDisabled = true - - case NotifyOfComponentShutdown => - waitForShutdown += sender() - - case msg @ UpdateTaskExecutionState(taskExecutionState) => - super.handleMessage(msg) - - if(taskExecutionState.getExecutionState == ExecutionState.RUNNING) { - waitForRunning.get(taskExecutionState.getID) foreach { - _ foreach (_ ! decorateMessage(true)) - } - } - - case RequestLeaderSessionID => - sender() ! ResponseLeaderSessionID(leaderSessionID.orNull) - - case NotifyWhenRegisteredAtJobManager(jobManager: ActorRef) => - if(isConnected && jobManager == currentJobManager.get) { - sender() ! true - } else { - val list = waitForRegisteredAtResourceManager.getOrElse( - jobManager, - Set[ActorRef]()) - - waitForRegisteredAtResourceManager += jobManager -> (list + sender()) - } - - case msg @ (_: AcknowledgeRegistration | _: AlreadyRegistered) => - super.handleMessage(msg) - - val jm = sender() - - waitForRegisteredAtResourceManager.remove(jm).foreach { - listeners => listeners.foreach{ - listener => - listener ! true - } - } - } - - /** - * No killing of the VM for testing. - */ - override protected def shutdown(): Unit = { - log.info("Shutting down TestingJobManager.") - waitForShutdown.foreach(_ ! ComponentShutdown(self)) - waitForShutdown.clear() - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala deleted file mode 100644 index 974e4e8..0000000 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.testingUtils - -import akka.actor.ActorRef -import org.apache.flink.api.common.JobID -import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID -import org.apache.flink.runtime.taskmanager.Task - -/** - * Additional messages that the [[TestingTaskManager]] understands. - */ -object TestingTaskManagerMessages { - - case class NotifyWhenTaskRemoved(executionID: ExecutionAttemptID) - - case class NotifyWhenTaskIsRunning(executionID: ExecutionAttemptID) - - case class ResponseRunningTasks(tasks: Map[ExecutionAttemptID, Task]){ - import collection.JavaConverters._ - def asJava: java.util.Map[ExecutionAttemptID, Task] = tasks.asJava - } - - case class ResponseBroadcastVariablesWithReferences(number: Int) - - case object RequestNumActiveConnections - case class ResponseNumActiveConnections(number: Int) - - case object RequestRunningTasks - - case object RequestBroadcastVariablesWithReferences - - case class NotifyWhenJobManagerTerminated(jobManager: ActorRef) - - case class JobManagerTerminated(jobManager: ActorRef) - - case class NotifyWhenRegisteredAtJobManager(resourceManager: ActorRef) - - /** - * Message to give a hint to the task manager that accumulator values were updated in the task. - * This message is forwarded to the job manager which knows that it needs to notify listeners - * of accumulator updates. - */ - case class AccumulatorsChanged(jobID: JobID) - - /** - * Registers a listener for all [[org.apache.flink.runtime.messages.TaskMessages.SubmitTask]] - * messages of the given job. - * - * If a task is submitted with the given job ID the task deployment - * descriptor is forwarded to the listener. - * - * @param jobId The job ID to listen for. - */ - case class RegisterSubmitTaskListener(jobId: JobID) - - /** - * A response to a listened job ID containing the submitted task deployment descriptor. - * - * @param tdd The submitted task deployment descriptor. - */ - case class ResponseSubmitTaskListener(tdd: TaskDeploymentDescriptor) - - // -------------------------------------------------------------------------- - // Utility methods to allow simpler case object access from Java - // -------------------------------------------------------------------------- - - def getRequestRunningTasksMessage: AnyRef = { - RequestRunningTasks - } - - def getRequestBroadcastVariablesWithReferencesMessage: AnyRef = { - RequestBroadcastVariablesWithReferences - } - -} - http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java index e596166..c143fe2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.leaderelection; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.testingUtils.TestingCluster; import scala.Option; @@ -86,7 +85,7 @@ public class LeaderElectionRetrievalTestingCluster extends TestingCluster { @Override public int getNumberOfJobManagers() { - return this.configuration().getInteger( + return this.originalConfiguration().getInteger( ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, ConfigConstants.DEFAULT_LOCAL_NUMBER_JOB_MANAGER); } http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingResourceManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingResourceManager.java new file mode 100644 index 0000000..495cacd --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingResourceManager.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.testutils; + +import akka.actor.ActorRef; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful; +import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.messages.Messages; +import org.apache.flink.runtime.testingUtils.TestingMessages; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + + +/** + * A testing resource manager which may alter the default standalone resource master's behavior. + */ +public class TestingResourceManager extends StandaloneResourceManager { + + /** Set of Actors which want to be informed of a connection to the job manager */ + private Set<ActorRef> waitForResourceManagerConnected = new HashSet<>(); + + /** Set of Actors which want to be informed of a shutdown */ + private Set<ActorRef> waitForShutdown = new HashSet<>(); + + /** Flag to signal a connection to the JobManager */ + private boolean isConnected = false; + + public TestingResourceManager(Configuration flinkConfig, LeaderRetrievalService leaderRetriever) { + super(flinkConfig, leaderRetriever); + } + + /** + * Overwrite messages here if desired + */ + @Override + protected void handleMessage(Object message) { + + if (message instanceof GetRegisteredResources) { + sender().tell(new GetRegisteredResourcesReply(getStartedTaskManagers()), self()); + } else if (message instanceof FailResource) { + ResourceID resourceID = ((FailResource) message).resourceID; + notifyWorkerFailed(resourceID, "Failed for test case."); + + } else if (message instanceof NotifyWhenResourceManagerConnected) { + if (isConnected) { + sender().tell( + Messages.getAcknowledge(), + self()); + } else { + waitForResourceManagerConnected.add(sender()); + } + } else if (message instanceof RegisterResourceManagerSuccessful) { + super.handleMessage(message); + + isConnected = true; + + for (ActorRef ref : waitForResourceManagerConnected) { + ref.tell( + Messages.getAcknowledge(), + self()); + } + waitForResourceManagerConnected.clear(); + + } else if (message instanceof TestingMessages.NotifyOfComponentShutdown$) { + waitForShutdown.add(sender()); + } else if (message instanceof TestingMessages.Alive$) { + sender().tell(Messages.getAcknowledge(), self()); + } else { + super.handleMessage(message); + } + } + + /** + * Testing messages + */ + public static class GetRegisteredResources {} + + public static class GetRegisteredResourcesReply { + + public Collection<ResourceID> resources; + + public GetRegisteredResourcesReply(Collection<ResourceID> resources) { + this.resources = resources; + } + + } + + /** + * Fails all resources that the resource manager has registered + */ + public static class FailResource { + + public ResourceID resourceID; + + public FailResource(ResourceID resourceID) { + this.resourceID = resourceID; + } + } + + /** + * The sender of this message will be informed of a connection to the Job Manager + */ + public static class NotifyWhenResourceManagerConnected {} + + /** + * Inform registered listeners about a shutdown of the application. + */ + @Override + protected void shutdownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) { + for (ActorRef listener : waitForShutdown) { + listener.tell(new TestingMessages.ComponentShutdown(self()), self()); + } + waitForShutdown.clear(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala index b4ba40b..c01a321 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala @@ -18,22 +18,32 @@ package org.apache.flink.runtime.testingUtils -import java.util.concurrent.TimeoutException +import java.util.concurrent.{ExecutorService, TimeUnit, TimeoutException} import akka.pattern.ask -import akka.actor.{ActorRef, Props, ActorSystem} +import akka.actor.{ActorRef, ActorSystem, Props} +import akka.pattern.Patterns._ import akka.testkit.CallingThreadDispatcher import org.apache.flink.configuration.{ConfigConstants, Configuration} +import org.apache.flink.runtime.akka.AkkaUtils +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory +import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore import org.apache.flink.runtime.clusterframework.FlinkResourceManager -import org.apache.flink.runtime.clusterframework.types.ResourceID -import org.apache.flink.runtime.executiongraph.restart.RestartStrategy -import org.apache.flink.runtime.jobmanager.JobManager +import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager +import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory +import org.apache.flink.runtime.instance.InstanceManager +import org.apache.flink.runtime.jobmanager.scheduler.Scheduler +import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist, SubmittedJobGraphStore} import org.apache.flink.runtime.leaderelection.LeaderElectionService -import org.apache.flink.runtime.minicluster.FlinkMiniCluster +import org.apache.flink.runtime.metrics.MetricRegistry +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster import org.apache.flink.runtime.testutils.TestingResourceManager import org.apache.flink.runtime.taskmanager.TaskManager import org.apache.flink.runtime.testingUtils.TestingMessages.Alive +import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager +import scala.concurrent.duration.FiniteDuration import scala.concurrent.{Await, Future} /** @@ -48,7 +58,7 @@ class TestingCluster( userConfiguration: Configuration, singleActorSystem: Boolean, synchronousDispatcher: Boolean) - extends FlinkMiniCluster( + extends LocalFlinkMiniCluster( userConfiguration, singleActorSystem) { @@ -59,133 +69,54 @@ class TestingCluster( // -------------------------------------------------------------------------- - override def generateConfiguration(userConfig: Configuration): Configuration = { - val cfg = new Configuration() - cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost") - cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0) - cfg.setInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY, 0) - cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 10) - cfg.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, -1) - - setDefaultCiConfig(cfg) - - cfg.addAll(userConfig) - cfg - } - - override def startJobManager(index: Int, actorSystem: ActorSystem): ActorRef = { - val config = configuration.clone() - - val jobManagerName = if(singleActorSystem) { - JobManager.JOB_MANAGER_NAME + "_" + (index + 1) - } else { - JobManager.JOB_MANAGER_NAME - } - - val archiveName = if(singleActorSystem) { - JobManager.ARCHIVE_NAME + "_" + (index + 1) - } else { - JobManager.ARCHIVE_NAME - } - - val jobManagerPort = config.getInteger( - ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, - ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT) - - if(jobManagerPort > 0) { - config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort + index) - } - - val (executionContext, - instanceManager, - scheduler, - libraryCacheManager, - restartStrategyFactory, - timeout, - archiveCount, - leaderElectionService, - submittedJobsGraphs, - checkpointRecoveryFactory, - savepointStore, - jobRecoveryTimeout, - metricRegistry) = JobManager.createJobManagerComponents( - config, - createLeaderElectionService()) - - val testArchiveProps = Props(new TestingMemoryArchivist(archiveCount)) - val archive = actorSystem.actorOf(testArchiveProps, archiveName) - - val jobManagerProps = Props( - new TestingJobManager( - configuration, - executionContext, - instanceManager, - scheduler, - libraryCacheManager, - archive, - restartStrategyFactory, - timeout, - leaderElectionService, - submittedJobsGraphs, - checkpointRecoveryFactory, - savepointStore, - jobRecoveryTimeout, - metricRegistry)) - - val dispatcherJobManagerProps = if (synchronousDispatcher) { - // disable asynchronous futures (e.g. accumulator update in Heartbeat) - jobManagerProps.withDispatcher(CallingThreadDispatcher.Id) - } else { - jobManagerProps - } - - actorSystem.actorOf(dispatcherJobManagerProps, jobManagerName) - } - - override def startResourceManager(index: Int, system: ActorSystem): ActorRef = { - val config = configuration.clone() - - val resourceManagerName = if(singleActorSystem) { - FlinkResourceManager.RESOURCE_MANAGER_NAME + "_" + (index + 1) + override val jobManagerClass: Class[_ <: JobManager] = classOf[TestingJobManager] + + override val resourceManagerClass: Class[_ <: FlinkResourceManager[_ <: ResourceIDRetrievable]] = + classOf[TestingResourceManager] + + override val taskManagerClass: Class[_ <: TaskManager] = classOf[TestingTaskManager] + + override val memoryArchivistClass: Class[_ <: MemoryArchivist] = classOf[TestingMemoryArchivist] + + override def getJobManagerProps( + jobManagerClass: Class[_ <: JobManager], + configuration: Configuration, + executorService: ExecutorService, + instanceManager: InstanceManager, + scheduler: Scheduler, + libraryCacheManager: BlobLibraryCacheManager, + archive: ActorRef, + restartStrategyFactory: RestartStrategyFactory, + timeout: FiniteDuration, + leaderElectionService: LeaderElectionService, + submittedJobGraphStore: SubmittedJobGraphStore, + checkpointRecoveryFactory: CheckpointRecoveryFactory, + savepointStore: SavepointStore, + jobRecoveryTimeout: FiniteDuration, + metricsRegistry: Option[MetricRegistry]): Props = { + + val props = super.getJobManagerProps( + jobManagerClass, + configuration, + executorService, + instanceManager, + scheduler, + libraryCacheManager, + archive, + restartStrategyFactory, + timeout, + leaderElectionService, + submittedJobGraphStore, + checkpointRecoveryFactory, + savepointStore, + jobRecoveryTimeout, + metricsRegistry) + + if (synchronousDispatcher) { + props.withDispatcher(CallingThreadDispatcher.Id) } else { - FlinkResourceManager.RESOURCE_MANAGER_NAME + props } - - val resourceManagerPort = config.getInteger( - ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY, - ConfigConstants.DEFAULT_RESOURCE_MANAGER_IPC_PORT) - - if(resourceManagerPort > 0) { - config.setInteger(ConfigConstants.RESOURCE_MANAGER_IPC_PORT_KEY, resourceManagerPort + index) - } - - val testResourceManagerProps = Props( - new TestingResourceManager( - config, - createLeaderRetrievalService() - )) - - system.actorOf(testResourceManagerProps, resourceManagerName) - } - - override def startTaskManager(index: Int, system: ActorSystem) = { - - val tmActorName = TaskManager.TASK_MANAGER_NAME + "_" + (index + 1) - - TaskManager.startTaskManagerComponentsAndActor( - configuration, - ResourceID.generate(), - system, - hostname, - Some(tmActorName), - Some(createLeaderRetrievalService()), - numTaskManagers == 1, - classOf[TestingTaskManager]) - } - - - def createLeaderElectionService(): Option[LeaderElectionService] = { - None } @throws(classOf[TimeoutException]) @@ -228,4 +159,131 @@ class TestingCluster( Await.ready(combinedFuture, timeout) } + + def waitForTaskManagersToBeRegisteredAtJobManager(jobManager: ActorRef): Unit = { + val futures = taskManagerActors.map { + _.map { + tm => (tm ? NotifyWhenRegisteredAtJobManager(jobManager))(timeout) + } + }.getOrElse(Seq()) + + try { + Await.ready(Future.sequence(futures), timeout) + } catch { + case t: TimeoutException => + throw new Exception("Timeout while waiting for TaskManagers to register at " + + s"${jobManager.path}") + } + + } + + def restartLeadingJobManager(): Unit = { + this.synchronized { + (jobManagerActorSystems, jobManagerActors) match { + case (Some(jmActorSystems), Some(jmActors)) => + val leader = getLeaderGateway(AkkaUtils.getTimeout(originalConfiguration)) + val index = getLeaderIndex(AkkaUtils.getTimeout(originalConfiguration)) + + // restart the leading job manager with the same port + val port = getLeaderRPCPort + val oldPort = originalConfiguration.getInteger( + ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, + 0) + + // we have to set the old port in the configuration file because this is used for startup + originalConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port) + + clearLeader() + + val stopped = gracefulStop(leader.actor(), TestingCluster.MAX_RESTART_DURATION) + Await.result(stopped, TestingCluster.MAX_RESTART_DURATION) + + if(!singleActorSystem) { + jmActorSystems(index).shutdown() + jmActorSystems(index).awaitTermination() + } + + val newJobManagerActorSystem = if(!singleActorSystem) { + startJobManagerActorSystem(index) + } else { + jmActorSystems.head + } + + // reset the original configuration + originalConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, oldPort) + + val newJobManagerActor = startJobManager(index, newJobManagerActorSystem) + + jobManagerActors = Some(jmActors.patch(index, Seq(newJobManagerActor), 1)) + jobManagerActorSystems = Some(jmActorSystems.patch( + index, + Seq(newJobManagerActorSystem), + 1)) + + val lrs = createLeaderRetrievalService() + + jobManagerLeaderRetrievalService = Some(lrs) + lrs.start(this) + + case _ => throw new Exception("The JobManager of the TestingCluster have not " + + "been started properly.") + } + } + } + + def restartTaskManager(index: Int): Unit = { + (taskManagerActorSystems, taskManagerActors) match { + case (Some(tmActorSystems), Some(tmActors)) => + val stopped = gracefulStop(tmActors(index), TestingCluster.MAX_RESTART_DURATION) + Await.result(stopped, TestingCluster.MAX_RESTART_DURATION) + + if(!singleActorSystem) { + tmActorSystems(index).shutdown() + tmActorSystems(index).awaitTermination() + } + + val taskManagerActorSystem = if(!singleActorSystem) { + startTaskManagerActorSystem(index) + } else { + tmActorSystems.head + } + + val taskManagerActor = startTaskManager(index, taskManagerActorSystem) + + taskManagerActors = Some(tmActors.patch(index, Seq(taskManagerActor), 1)) + taskManagerActorSystems = Some(tmActorSystems.patch(index, Seq(taskManagerActorSystem), 1)) + + case _ => throw new Exception("The TaskManager of the TestingCluster have not " + + "been started properly.") + } + } + + def addTaskManager(): Unit = { + if (useSingleActorSystem) { + (jobManagerActorSystems, taskManagerActors) match { + case (Some(jmSystems), Some(tmActors)) => + val index = numTaskManagers + taskManagerActors = Some(tmActors :+ startTaskManager(index, jmSystems(0))) + numTaskManagers += 1 + case _ => throw new IllegalStateException("Cluster has not been started properly.") + } + } else { + (taskManagerActorSystems, taskManagerActors) match { + case (Some(tmSystems), Some(tmActors)) => + val index = numTaskManagers + val newTmSystem = startTaskManagerActorSystem(index) + val newTmActor = startTaskManager(index, newTmSystem) + + taskManagerActorSystems = Some(tmSystems :+ newTmSystem) + taskManagerActors = Some(tmActors :+ newTmActor) + + numTaskManagers += 1 + case _ => throw new IllegalStateException("Cluster has not been started properly.") + } + } + } +} + +object TestingCluster { + val MAX_RESTART_DURATION = new FiniteDuration(2, TimeUnit.MINUTES) } http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala new file mode 100644 index 0000000..62349db --- /dev/null +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManager.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.testingUtils + +import java.util.concurrent.ExecutorService + +import akka.actor.ActorRef +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory +import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager +import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory +import org.apache.flink.runtime.instance.InstanceManager +import org.apache.flink.runtime.jobmanager.scheduler.Scheduler +import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore} +import org.apache.flink.runtime.leaderelection.LeaderElectionService +import org.apache.flink.runtime.metrics.MetricRegistry + +import scala.concurrent.duration._ +import scala.language.postfixOps + +/** JobManager implementation extended by testing messages + * + */ +class TestingJobManager( + flinkConfiguration: Configuration, + executorService: ExecutorService, + instanceManager: InstanceManager, + scheduler: Scheduler, + libraryCacheManager: BlobLibraryCacheManager, + archive: ActorRef, + restartStrategyFactory: RestartStrategyFactory, + timeout: FiniteDuration, + leaderElectionService: LeaderElectionService, + submittedJobGraphs : SubmittedJobGraphStore, + checkpointRecoveryFactory : CheckpointRecoveryFactory, + savepointStore : SavepointStore, + jobRecoveryTimeout : FiniteDuration, + metricRegistry : Option[MetricRegistry]) + extends JobManager( + flinkConfiguration, + executorService, + instanceManager, + scheduler, + libraryCacheManager, + archive, + restartStrategyFactory, + timeout, + leaderElectionService, + submittedJobGraphs, + checkpointRecoveryFactory, + savepointStore, + jobRecoveryTimeout, + metricRegistry) + with TestingJobManagerLike {} http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala new file mode 100644 index 0000000..5ba2790 --- /dev/null +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala @@ -0,0 +1,417 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.testingUtils + +import akka.actor.{ActorRef, Cancellable, Terminated} +import akka.pattern.{ask, pipe} +import org.apache.flink.api.common.JobID +import org.apache.flink.runtime.FlinkActor +import org.apache.flink.runtime.execution.ExecutionState +import org.apache.flink.runtime.jobgraph.JobStatus +import org.apache.flink.runtime.jobmanager.JobManager +import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged +import org.apache.flink.runtime.messages.JobManagerMessages.{GrantLeadership, RegisterJobClient, RequestClassloadingProps} +import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect} +import org.apache.flink.runtime.messages.RegistrationMessages.RegisterTaskManager +import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._ +import org.apache.flink.runtime.testingUtils.TestingMessages._ +import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged + +import scala.collection.mutable +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.language.postfixOps + +/** This mixin can be used to decorate a JobManager with messages for testing purpose. */ +trait TestingJobManagerLike extends FlinkActor { + that: JobManager => + + import context._ + + import scala.collection.JavaConverters._ + + val waitForAllVerticesToBeRunning = scala.collection.mutable.HashMap[JobID, Set[ActorRef]]() + val waitForTaskManagerToBeTerminated = scala.collection.mutable.HashMap[String, Set[ActorRef]]() + + val waitForAllVerticesToBeRunningOrFinished = + scala.collection.mutable.HashMap[JobID, Set[ActorRef]]() + + var periodicCheck: Option[Cancellable] = None + + val waitForJobStatus = scala.collection.mutable.HashMap[JobID, + collection.mutable.HashMap[JobStatus, Set[ActorRef]]]() + + val waitForAccumulatorUpdate = scala.collection.mutable.HashMap[JobID, (Boolean, Set[ActorRef])]() + + val waitForLeader = scala.collection.mutable.HashSet[ActorRef]() + + val waitForNumRegisteredTaskManagers = mutable.PriorityQueue.newBuilder( + new Ordering[(Int, ActorRef)] { + override def compare(x: (Int, ActorRef), y: (Int, ActorRef)): Int = y._1 - x._1 + }) + + val waitForClient = scala.collection.mutable.HashSet[ActorRef]() + + val waitForShutdown = scala.collection.mutable.HashSet[ActorRef]() + + var disconnectDisabled = false + + var postStopEnabled = true + + abstract override def postStop(): Unit = { + if (postStopEnabled) { + super.postStop() + } else { + // only stop leader election service to revoke the leadership of this JM so that a new JM + // can be elected leader + leaderElectionService.stop() + } + } + + abstract override def handleMessage: Receive = { + handleTestingMessage orElse super.handleMessage + } + + def handleTestingMessage: Receive = { + case Alive => sender() ! Acknowledge + + case RequestExecutionGraph(jobID) => + currentJobs.get(jobID) match { + case Some((executionGraph, jobInfo)) => sender() ! decorateMessage( + ExecutionGraphFound( + jobID, + executionGraph) + ) + + case None => archive.tell(decorateMessage(RequestExecutionGraph(jobID)), sender()) + } + + case WaitForAllVerticesToBeRunning(jobID) => + if(checkIfAllVerticesRunning(jobID)){ + sender() ! decorateMessage(AllVerticesRunning(jobID)) + }else{ + val waiting = waitForAllVerticesToBeRunning.getOrElse(jobID, Set[ActorRef]()) + waitForAllVerticesToBeRunning += jobID -> (waiting + sender()) + + if(periodicCheck.isEmpty){ + periodicCheck = + Some( + context.system.scheduler.schedule( + 0 seconds, + 200 millis, + self, + decorateMessage(NotifyListeners) + ) + ) + } + } + case WaitForAllVerticesToBeRunningOrFinished(jobID) => + if(checkIfAllVerticesRunningOrFinished(jobID)){ + sender() ! decorateMessage(AllVerticesRunning(jobID)) + }else{ + val waiting = waitForAllVerticesToBeRunningOrFinished.getOrElse(jobID, Set[ActorRef]()) + waitForAllVerticesToBeRunningOrFinished += jobID -> (waiting + sender()) + + if(periodicCheck.isEmpty){ + periodicCheck = + Some( + context.system.scheduler.schedule( + 0 seconds, + 200 millis, + self, + decorateMessage(NotifyListeners) + ) + ) + } + } + + case NotifyListeners => + for(jobID <- currentJobs.keySet){ + notifyListeners(jobID) + } + + if(waitForAllVerticesToBeRunning.isEmpty && waitForAllVerticesToBeRunningOrFinished.isEmpty) { + periodicCheck foreach { _.cancel() } + periodicCheck = None + } + + + case NotifyWhenJobRemoved(jobID) => + val gateways = instanceManager.getAllRegisteredInstances.asScala.map(_.getActorGateway) + + val responses = gateways.map{ + gateway => gateway.ask(NotifyWhenJobRemoved(jobID), timeout).mapTo[Boolean] + } + + val jobRemovedOnJobManager = (self ? CheckIfJobRemoved(jobID))(timeout).mapTo[Boolean] + + val allFutures = responses ++ Seq(jobRemovedOnJobManager) + + import context.dispatcher + Future.fold(allFutures)(true)(_ & _) map(decorateMessage(_)) pipeTo sender() + + case CheckIfJobRemoved(jobID) => + if(currentJobs.contains(jobID)) { + context.system.scheduler.scheduleOnce( + 200 milliseconds, + self, + decorateMessage(CheckIfJobRemoved(jobID)) + )(context.dispatcher, sender()) + } else { + sender() ! decorateMessage(true) + } + + case NotifyWhenTaskManagerTerminated(taskManager) => + val waiting = waitForTaskManagerToBeTerminated.getOrElse(taskManager.path.name, Set()) + waitForTaskManagerToBeTerminated += taskManager.path.name -> (waiting + sender) + + case msg@Terminated(taskManager) => + super.handleMessage(msg) + + waitForTaskManagerToBeTerminated.remove(taskManager.path.name) foreach { + _ foreach { + listener => + listener ! decorateMessage(TaskManagerTerminated(taskManager)) + } + } + + // see shutdown method for reply + case NotifyOfComponentShutdown => + waitForShutdown += sender() + + case NotifyWhenAccumulatorChange(jobID) => + + val (updated, registered) = waitForAccumulatorUpdate. + getOrElse(jobID, (false, Set[ActorRef]())) + waitForAccumulatorUpdate += jobID -> (updated, registered + sender) + sender ! true + + /** + * Notification from the task manager that changed accumulator are transferred on next + * Hearbeat. We need to keep this state to notify the listeners on next Heartbeat report. + */ + case AccumulatorsChanged(jobID: JobID) => + waitForAccumulatorUpdate.get(jobID) match { + case Some((updated, registered)) => + waitForAccumulatorUpdate.put(jobID, (true, registered)) + case None => + } + + /** + * Disabled async processing of accumulator values and send accumulators to the listeners if + * we previously received an [[AccumulatorsChanged]] message. + */ + case msg : Heartbeat => + super.handleMessage(msg) + + waitForAccumulatorUpdate foreach { + case (jobID, (updated, actors)) if updated => + currentJobs.get(jobID) match { + case Some((graph, jobInfo)) => + val flinkAccumulators = graph.getFlinkAccumulators + val userAccumulators = graph.aggregateUserAccumulators + actors foreach { + actor => actor ! UpdatedAccumulators(jobID, flinkAccumulators, userAccumulators) + } + case None => + } + waitForAccumulatorUpdate.put(jobID, (false, actors)) + case _ => + } + + case RequestWorkingTaskManager(jobID) => + currentJobs.get(jobID) match { + case Some((eg, _)) => + if(eg.getAllExecutionVertices.asScala.isEmpty){ + sender ! decorateMessage(WorkingTaskManager(None)) + } else { + val resource = eg.getAllExecutionVertices.asScala.head.getCurrentAssignedResource + + if(resource == null){ + sender ! decorateMessage(WorkingTaskManager(None)) + } else { + sender ! decorateMessage( + WorkingTaskManager( + Some(resource.getTaskManagerActorGateway()) + ) + ) + } + } + case None => sender ! decorateMessage(WorkingTaskManager(None)) + } + + case NotifyWhenJobStatus(jobID, state) => + val jobStatusListener = waitForJobStatus.getOrElseUpdate(jobID, + scala.collection.mutable.HashMap[JobStatus, Set[ActorRef]]()) + + val listener = jobStatusListener.getOrElse(state, Set[ActorRef]()) + + jobStatusListener += state -> (listener + sender) + + case msg@JobStatusChanged(jobID, newJobStatus, _, _) => + super.handleMessage(msg) + + val cleanup = waitForJobStatus.get(jobID) match { + case Some(stateListener) => + stateListener.remove(newJobStatus) match { + case Some(listeners) => + listeners foreach { + _ ! decorateMessage(JobStatusIs(jobID, newJobStatus)) + } + case _ => + } + stateListener.isEmpty + + case _ => false + } + + if (cleanup) { + waitForJobStatus.remove(jobID) + } + + case DisableDisconnect => + disconnectDisabled = true + + case DisablePostStop => + postStopEnabled = false + + case RequestSavepoint(savepointPath) => + try { + val savepoint = savepointStore.loadSavepoint(savepointPath) + sender ! ResponseSavepoint(savepoint) + } + catch { + case e: Exception => + sender ! ResponseSavepoint(null) + } + + case msg: Disconnect => + if (!disconnectDisabled) { + super.handleMessage(msg) + + val taskManager = sender() + + waitForTaskManagerToBeTerminated.remove(taskManager.path.name) foreach { + _ foreach { + listener => + listener ! decorateMessage(TaskManagerTerminated(taskManager)) + } + } + } + + case NotifyWhenLeader => + if (leaderElectionService.hasLeadership) { + sender() ! true + } else { + waitForLeader += sender() + } + + case msg: GrantLeadership => + super.handleMessage(msg) + + waitForLeader.foreach(_ ! true) + + waitForLeader.clear() + + case NotifyWhenClientConnects => + waitForClient += sender() + sender() ! true + + case msg: RegisterJobClient => + super.handleMessage(msg) + waitForClient.foreach(_ ! ClientConnected) + case msg: RequestClassloadingProps => + super.handleMessage(msg) + waitForClient.foreach(_ ! ClassLoadingPropsDelivered) + + case NotifyWhenAtLeastNumTaskManagerAreRegistered(numRegisteredTaskManager) => + if (that.instanceManager.getNumberOfRegisteredTaskManagers >= numRegisteredTaskManager) { + // there are already at least numRegisteredTaskManager registered --> send Acknowledge + sender() ! Acknowledge + } else { + // wait until we see at least numRegisteredTaskManager being registered at the JobManager + waitForNumRegisteredTaskManagers += ((numRegisteredTaskManager, sender())) + } + + // TaskManager may be registered on these two messages + case msg @ (_: RegisterTaskManager) => + super.handleMessage(msg) + + // dequeue all senders which wait for instanceManager.getNumberOfStartedTaskManagers or + // fewer registered TaskManagers + while (waitForNumRegisteredTaskManagers.nonEmpty && + waitForNumRegisteredTaskManagers.head._1 <= + instanceManager.getNumberOfRegisteredTaskManagers) { + val receiver = waitForNumRegisteredTaskManagers.dequeue()._2 + receiver ! Acknowledge + } + } + + def checkIfAllVerticesRunning(jobID: JobID): Boolean = { + currentJobs.get(jobID) match { + case Some((eg, _)) => + eg.getAllExecutionVertices.asScala.forall( _.getExecutionState == ExecutionState.RUNNING) + case None => false + } + } + + def checkIfAllVerticesRunningOrFinished(jobID: JobID): Boolean = { + currentJobs.get(jobID) match { + case Some((eg, _)) => + eg.getAllExecutionVertices.asScala.forall { + case vertex => + (vertex.getExecutionState == ExecutionState.RUNNING + || vertex.getExecutionState == ExecutionState.FINISHED) + } + case None => false + } + } + + def notifyListeners(jobID: JobID): Unit = { + if(checkIfAllVerticesRunning(jobID)) { + waitForAllVerticesToBeRunning.remove(jobID) match { + case Some(listeners) => + for (listener <- listeners) { + listener ! decorateMessage(AllVerticesRunning(jobID)) + } + case _ => + } + } + + if(checkIfAllVerticesRunningOrFinished(jobID)) { + waitForAllVerticesToBeRunningOrFinished.remove(jobID) match { + case Some(listeners) => + for (listener <- listeners) { + listener ! decorateMessage(AllVerticesRunning(jobID)) + } + case _ => + } + } + } + + /** + * No killing of the VM for testing. + */ + override protected def shutdown(): Unit = { + log.info("Shutting down TestingJobManager.") + waitForShutdown.foreach(_ ! ComponentShutdown(self)) + waitForShutdown.clear() + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala new file mode 100644 index 0000000..d07c48f --- /dev/null +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.testingUtils + +import java.util.Map + +import akka.actor.ActorRef +import org.apache.flink.api.common.JobID +import org.apache.flink.api.common.accumulators.Accumulator +import org.apache.flink.runtime.accumulators.AccumulatorRegistry +import org.apache.flink.runtime.checkpoint.savepoint.Savepoint +import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph} +import org.apache.flink.runtime.instance.ActorGateway +import org.apache.flink.runtime.jobgraph.JobStatus + +object TestingJobManagerMessages { + + case class RequestExecutionGraph(jobID: JobID) + + sealed trait ResponseExecutionGraph { + def jobID: JobID + } + + case class ExecutionGraphFound(jobID: JobID, executionGraph: ExecutionGraph) extends + ResponseExecutionGraph + + case class ExecutionGraphNotFound(jobID: JobID) extends ResponseExecutionGraph + + case class WaitForAllVerticesToBeRunning(jobID: JobID) + case class WaitForAllVerticesToBeRunningOrFinished(jobID: JobID) + case class AllVerticesRunning(jobID: JobID) + + case class NotifyWhenJobRemoved(jobID: JobID) + + case class RequestWorkingTaskManager(jobID: JobID) + case class WorkingTaskManager(gatewayOption: Option[ActorGateway]) + + case class NotifyWhenJobStatus(jobID: JobID, state: JobStatus) + case class JobStatusIs(jobID: JobID, state: JobStatus) + + case object NotifyListeners + + case class NotifyWhenTaskManagerTerminated(taskManager: ActorRef) + case class TaskManagerTerminated(taskManager: ActorRef) + + /** + * Registers a listener to receive a message when accumulators changed. + * The change must be explicitly triggered by the TestingTaskManager which can receive an + * [[org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged]] + * message by a task that changed the accumulators. This message is then + * forwarded to the JobManager which will send the accumulators in the [[UpdatedAccumulators]] + * message when the next Heartbeat occurs. + */ + case class NotifyWhenAccumulatorChange(jobID: JobID) + + /** + * Reports updated accumulators back to the listener. + */ + case class UpdatedAccumulators(jobID: JobID, + flinkAccumulators: Map[ExecutionAttemptID, Map[AccumulatorRegistry.Metric, Accumulator[_,_]]], + userAccumulators: Map[String, Accumulator[_,_]]) + + /** Notifies the sender when the [[TestingJobManager]] has been elected as the leader + * + */ + case object NotifyWhenLeader + + /** + * Notifies the sender when the [[TestingJobManager]] receives new clients for jobs + */ + case object NotifyWhenClientConnects + /** + * Notifes of client connect + */ + case object ClientConnected + /** + * Notifies when the client has requested class loading information + */ + case object ClassLoadingPropsDelivered + + /** + * Registers to be notified by an [[org.apache.flink.runtime.messages.Messages.Acknowledge]] + * message when at least numRegisteredTaskManager have registered at the JobManager. + * + * @param numRegisteredTaskManager minimum number of registered TMs before the sender is notified + */ + case class NotifyWhenAtLeastNumTaskManagerAreRegistered(numRegisteredTaskManager: Int) + + /** Disables the post stop method of the [[TestingJobManager]]. + * + * Only the leaderElectionService is stopped in the postStop method call to revoke the leadership + */ + case object DisablePostStop + + /** + * Requests a savepoint from the job manager. + * + * @param savepointPath The path of the savepoint to request. + */ + case class RequestSavepoint(savepointPath: String) + + /** + * Response to a savepoint request. + * + * @param savepoint The requested savepoint or null if none available. + */ + case class ResponseSavepoint(savepoint: Savepoint) + + def getNotifyWhenLeader(): AnyRef = NotifyWhenLeader + def getNotifyWhenClientConnects(): AnyRef = NotifyWhenClientConnects + def getDisablePostStop(): AnyRef = DisablePostStop + + def getClientConnected(): AnyRef = ClientConnected + def getClassLoadingPropsDelivered(): AnyRef = ClassLoadingPropsDelivered + +} http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala new file mode 100644 index 0000000..48a1ddd --- /dev/null +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMemoryArchivist.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.testingUtils + +import org.apache.flink.runtime.jobmanager.MemoryArchivist +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.{ExecutionGraphFound, ExecutionGraphNotFound, RequestExecutionGraph} + +/** Memory archivist extended by testing messages + * + * @param maxEntries number of maximum number of archived jobs + */ +class TestingMemoryArchivist(maxEntries: Int) extends MemoryArchivist(maxEntries) { + + override def handleMessage: Receive = { + handleTestingMessage orElse super.handleMessage + } + + def handleTestingMessage: Receive = { + case RequestExecutionGraph(jobID) => + val executionGraph = graphs.get(jobID) + + executionGraph match { + case Some(graph) => sender ! decorateMessage(ExecutionGraphFound(jobID, graph)) + case None => sender ! decorateMessage(ExecutionGraphNotFound(jobID)) + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala new file mode 100644 index 0000000..91d169a --- /dev/null +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingMessages.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.testingUtils + +import akka.actor.ActorRef +import org.apache.flink.api.common.JobID + +object TestingMessages { + + case class CheckIfJobRemoved(jobID: JobID) + + case object DisableDisconnect + + case object Alive + + def getAlive: AnyRef = Alive + + def getDisableDisconnect: AnyRef = DisableDisconnect + + case object NotifyOfComponentShutdown + case class ComponentShutdown(ref: ActorRef) + + def getNotifyOfComponentShutdown(): AnyRef = NotifyOfComponentShutdown +} http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala new file mode 100644 index 0000000..9b5a147 --- /dev/null +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.testingUtils + +import org.apache.flink.runtime.clusterframework.types.ResourceID +import org.apache.flink.runtime.io.disk.iomanager.IOManager +import org.apache.flink.runtime.io.network.NetworkEnvironment +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService +import org.apache.flink.runtime.memory.MemoryManager +import org.apache.flink.runtime.taskmanager.{TaskManagerLocation, TaskManager, TaskManagerConfiguration} + +import scala.language.postfixOps + +/** Subclass of the [[TaskManager]] to support testing messages + */ +class TestingTaskManager( + config: TaskManagerConfiguration, + resourceID: ResourceID, + connectionInfo: TaskManagerLocation, + memoryManager: MemoryManager, + ioManager: IOManager, + network: NetworkEnvironment, + numberOfSlots: Int, + leaderRetrievalService: LeaderRetrievalService) + extends TaskManager( + config, + resourceID, + connectionInfo, + memoryManager, + ioManager, + network, + numberOfSlots, + leaderRetrievalService) + with TestingTaskManagerLike { + + def this( + config: TaskManagerConfiguration, + connectionInfo: TaskManagerLocation, + memoryManager: MemoryManager, + ioManager: IOManager, + network: NetworkEnvironment, + numberOfSlots: Int, + leaderRetrievalService: LeaderRetrievalService) { + this( + config, + ResourceID.generate(), + connectionInfo, + memoryManager, + ioManager, + network, + numberOfSlots, + leaderRetrievalService) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala new file mode 100644 index 0000000..2498dbe --- /dev/null +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.testingUtils + +import akka.actor.{ActorRef, Terminated} +import org.apache.flink.api.common.JobID +import org.apache.flink.runtime.FlinkActor +import org.apache.flink.runtime.execution.ExecutionState +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID +import org.apache.flink.runtime.messages.JobManagerMessages.{RequestLeaderSessionID, ResponseLeaderSessionID} +import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect} +import org.apache.flink.runtime.messages.RegistrationMessages.{AcknowledgeRegistration, AlreadyRegistered} +import org.apache.flink.runtime.messages.TaskMessages.{SubmitTask, TaskInFinalState, UpdateTaskExecutionState} +import org.apache.flink.runtime.taskmanager.TaskManager +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved +import org.apache.flink.runtime.testingUtils.TestingMessages._ +import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages._ + +import scala.concurrent.duration._ +import scala.language.postfixOps + +/** This mixin can be used to decorate a TaskManager with messages for testing purposes. */ +trait TestingTaskManagerLike extends FlinkActor { + that: TaskManager => + + import scala.collection.JavaConverters._ + + val waitForRemoval = scala.collection.mutable.HashMap[ExecutionAttemptID, Set[ActorRef]]() + val waitForJobManagerToBeTerminated = scala.collection.mutable.HashMap[String, Set[ActorRef]]() + val waitForRegisteredAtResourceManager = + scala.collection.mutable.HashMap[ActorRef, Set[ActorRef]]() + val waitForRunning = scala.collection.mutable.HashMap[ExecutionAttemptID, Set[ActorRef]]() + val unregisteredTasks = scala.collection.mutable.HashSet[ExecutionAttemptID]() + + /** Map of registered task submit listeners */ + val registeredSubmitTaskListeners = scala.collection.mutable.HashMap[JobID, ActorRef]() + + val waitForShutdown = scala.collection.mutable.HashSet[ActorRef]() + + var disconnectDisabled = false + + /** + * Handler for testing related messages + */ + abstract override def handleMessage: Receive = { + handleTestingMessage orElse super.handleMessage + } + + def handleTestingMessage: Receive = { + case Alive => sender() ! Acknowledge + + case NotifyWhenTaskIsRunning(executionID) => + Option(runningTasks.get(executionID)) match { + case Some(task) if task.getExecutionState == ExecutionState.RUNNING => + sender ! decorateMessage(true) + + case _ => + val listeners = waitForRunning.getOrElse(executionID, Set()) + waitForRunning += (executionID -> (listeners + sender)) + } + + case RequestRunningTasks => + sender ! decorateMessage(ResponseRunningTasks(runningTasks.asScala.toMap)) + + case NotifyWhenTaskRemoved(executionID) => + Option(runningTasks.get(executionID)) match { + case Some(_) => + val set = waitForRemoval.getOrElse(executionID, Set()) + waitForRemoval += (executionID -> (set + sender)) + case None => + if(unregisteredTasks.contains(executionID)) { + sender ! decorateMessage(true) + } else { + val set = waitForRemoval.getOrElse(executionID, Set()) + waitForRemoval += (executionID -> (set + sender)) + } + } + + case TaskInFinalState(executionID) => + super.handleMessage(TaskInFinalState(executionID)) + waitForRemoval.remove(executionID) match { + case Some(actors) => for(actor <- actors) actor ! decorateMessage(true) + case None => + } + + unregisteredTasks += executionID + + case NotifyWhenJobRemoved(jobID) => + if(runningTasks.values.asScala.exists(_.getJobID == jobID)){ + context.system.scheduler.scheduleOnce( + 200 milliseconds, + self, + decorateMessage(CheckIfJobRemoved(jobID)))( + context.dispatcher, + sender() + ) + }else{ + sender ! decorateMessage(true) + } + + case CheckIfJobRemoved(jobID) => + if(runningTasks.values.asScala.forall(_.getJobID != jobID)){ + sender ! decorateMessage(true) + } else { + context.system.scheduler.scheduleOnce( + 200 milliseconds, + self, + decorateMessage(CheckIfJobRemoved(jobID)))( + context.dispatcher, + sender() + ) + } + + case NotifyWhenJobManagerTerminated(jobManager) => + val waiting = waitForJobManagerToBeTerminated.getOrElse(jobManager.path.name, Set()) + waitForJobManagerToBeTerminated += jobManager.path.name -> (waiting + sender) + + case RegisterSubmitTaskListener(jobId) => + registeredSubmitTaskListeners.put(jobId, sender()) + + case msg@SubmitTask(tdd) => + registeredSubmitTaskListeners.get(tdd.getJobID) match { + case Some(listenerRef) => + listenerRef ! ResponseSubmitTaskListener(tdd) + case None => + // Nothing to do + } + + super.handleMessage(msg) + + /** + * Message from task manager that accumulator values changed and need to be reported immediately + * instead of lazily through the + * [[org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat]] message. We forward this + * message to the job manager that it knows it should report to the listeners. + */ + case msg: AccumulatorsChanged => + currentJobManager match { + case Some(jobManager) => + jobManager.forward(msg) + sendHeartbeatToJobManager() + sender ! true + case None => + } + + case msg@Terminated(jobManager) => + super.handleMessage(msg) + + waitForJobManagerToBeTerminated.remove(jobManager.path.name) foreach { + _ foreach { + _ ! decorateMessage(JobManagerTerminated(jobManager)) + } + } + + case msg:Disconnect => + if (!disconnectDisabled) { + super.handleMessage(msg) + + val jobManager = sender() + + waitForJobManagerToBeTerminated.remove(jobManager.path.name) foreach { + _ foreach { + _ ! decorateMessage(JobManagerTerminated(jobManager)) + } + } + } + + case DisableDisconnect => + disconnectDisabled = true + + case NotifyOfComponentShutdown => + waitForShutdown += sender() + + case msg @ UpdateTaskExecutionState(taskExecutionState) => + super.handleMessage(msg) + + if(taskExecutionState.getExecutionState == ExecutionState.RUNNING) { + waitForRunning.get(taskExecutionState.getID) foreach { + _ foreach (_ ! decorateMessage(true)) + } + } + + case RequestLeaderSessionID => + sender() ! ResponseLeaderSessionID(leaderSessionID.orNull) + + case NotifyWhenRegisteredAtJobManager(jobManager: ActorRef) => + if(isConnected && jobManager == currentJobManager.get) { + sender() ! true + } else { + val list = waitForRegisteredAtResourceManager.getOrElse( + jobManager, + Set[ActorRef]()) + + waitForRegisteredAtResourceManager += jobManager -> (list + sender()) + } + + case msg @ (_: AcknowledgeRegistration | _: AlreadyRegistered) => + super.handleMessage(msg) + + val jm = sender() + + waitForRegisteredAtResourceManager.remove(jm).foreach { + listeners => listeners.foreach{ + listener => + listener ! true + } + } + } + + /** + * No killing of the VM for testing. + */ + override protected def shutdown(): Unit = { + log.info("Shutting down TestingJobManager.") + waitForShutdown.foreach(_ ! ComponentShutdown(self)) + waitForShutdown.clear() + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala new file mode 100644 index 0000000..32c3c55 --- /dev/null +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerMessages.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.testingUtils + +import akka.actor.ActorRef +import org.apache.flink.api.common.JobID +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID +import org.apache.flink.runtime.taskmanager.Task + +/** + * Additional messages that the [[TestingTaskManager]] understands. + */ +object TestingTaskManagerMessages { + + case class NotifyWhenTaskRemoved(executionID: ExecutionAttemptID) + + case class NotifyWhenTaskIsRunning(executionID: ExecutionAttemptID) + + case class ResponseRunningTasks(tasks: Map[ExecutionAttemptID, Task]){ + import collection.JavaConverters._ + def asJava: java.util.Map[ExecutionAttemptID, Task] = tasks.asJava + } + + case object RequestRunningTasks + + case class NotifyWhenJobManagerTerminated(jobManager: ActorRef) + + case class JobManagerTerminated(jobManager: ActorRef) + + case class NotifyWhenRegisteredAtJobManager(resourceManager: ActorRef) + + /** + * Message to give a hint to the task manager that accumulator values were updated in the task. + * This message is forwarded to the job manager which knows that it needs to notify listeners + * of accumulator updates. + */ + case class AccumulatorsChanged(jobID: JobID) + + /** + * Registers a listener for all [[org.apache.flink.runtime.messages.TaskMessages.SubmitTask]] + * messages of the given job. + * + * If a task is submitted with the given job ID the task deployment + * descriptor is forwarded to the listener. + * + * @param jobId The job ID to listen for. + */ + case class RegisterSubmitTaskListener(jobId: JobID) + + /** + * A response to a listened job ID containing the submitted task deployment descriptor. + * + * @param tdd The submitted task deployment descriptor. + */ + case class ResponseSubmitTaskListener(tdd: TaskDeploymentDescriptor) + + // -------------------------------------------------------------------------- + // Utility methods to allow simpler case object access from Java + // -------------------------------------------------------------------------- + + def getRequestRunningTasksMessage: AnyRef = { + RequestRunningTasks + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala ---------------------------------------------------------------------- diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala index ee1b264..00410cc 100644 --- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala +++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala @@ -22,9 +22,10 @@ import java.io._ import java.util.concurrent.TimeUnit import org.apache.flink.configuration.GlobalConfiguration -import org.apache.flink.test.util.{ForkableFlinkMiniCluster, TestBaseUtils} +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster +import org.apache.flink.test.util.TestBaseUtils import org.apache.flink.util.TestLogger -import org.junit.{AfterClass, BeforeClass, Test, Assert} +import org.junit.{AfterClass, Assert, BeforeClass, Test} import scala.concurrent.duration.FiniteDuration import scala.tools.nsc.Settings @@ -334,7 +335,7 @@ class ScalaShellITCase extends TestLogger { } object ScalaShellITCase { - var cluster: Option[ForkableFlinkMiniCluster] = None + var cluster: Option[LocalFlinkMiniCluster] = None val parallelism = 4 @BeforeClass http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java index 8bb440c..f94ff68 100644 --- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java +++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java @@ -40,6 +40,7 @@ import org.apache.flink.batch.connectors.cassandra.CassandraOutputFormat; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.runtime.testutils.CommonTestUtils; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; @@ -49,7 +50,6 @@ import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.TestStreamEnvironment; -import org.apache.flink.test.util.ForkableFlinkMiniCluster; import org.apache.flink.test.util.TestEnvironment; import org.junit.After; import org.junit.AfterClass; @@ -134,7 +134,7 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri } } - private static ForkableFlinkMiniCluster flinkCluster; + private static LocalFlinkMiniCluster flinkCluster; // ------------------------------------------------------------------------ // Cluster Setup (Cassandra & Flink) @@ -205,7 +205,7 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri Configuration config = new Configuration(); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4); - flinkCluster = new ForkableFlinkMiniCluster(config); + flinkCluster = new LocalFlinkMiniCluster(config); flinkCluster.start(); } http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java index 9e3c33b..c4949ff 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.TypeInfoParser; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -30,7 +31,6 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; -import org.apache.flink.test.util.ForkableFlinkMiniCluster; import org.apache.flink.util.InstantiationUtil; import org.junit.AfterClass; @@ -58,7 +58,7 @@ public class KafkaShortRetentionTestBase implements Serializable { private static KafkaTestEnvironment kafkaServer; private static Properties standardProps; - private static ForkableFlinkMiniCluster flink; + private static LocalFlinkMiniCluster flink; @BeforeClass public static void prepare() throws IOException, ClassNotFoundException { @@ -88,7 +88,7 @@ public class KafkaShortRetentionTestBase implements Serializable { flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16); flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s"); - flink = new ForkableFlinkMiniCluster(flinkConfig, false); + flink = new LocalFlinkMiniCluster(flinkConfig, false); flink.start(); } http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java index eddb57c..771db17 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java @@ -22,8 +22,8 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.jmx.JMXReporter; import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.test.util.ForkableFlinkMiniCluster; import org.apache.flink.test.util.SuccessException; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.TestLogger; @@ -65,7 +65,7 @@ public abstract class KafkaTestBase extends TestLogger { protected static Properties standardProps; - protected static ForkableFlinkMiniCluster flink; + protected static LocalFlinkMiniCluster flink; protected static int flinkPort; @@ -105,7 +105,7 @@ public abstract class KafkaTestBase extends TestLogger { flinkConfig.setString(ConfigConstants.METRICS_REPORTERS_LIST, "my_reporter"); flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName()); - flink = new ForkableFlinkMiniCluster(flinkConfig, false); + flink = new LocalFlinkMiniCluster(flinkConfig, false); flink.start(); flinkPort = flink.getLeaderRPCPort(); http://git-wip-us.apache.org/repos/asf/flink/blob/02b852e3/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java index 3705943..2e452c1 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java @@ -21,11 +21,11 @@ import com.amazonaws.services.kinesis.model.DescribeStreamResult; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants; import org.apache.flink.streaming.connectors.kinesis.testutils.ExactlyOnceValidatingConsumerThread; import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisEventsGeneratorProducerThread; import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; -import org.apache.flink.test.util.ForkableFlinkMiniCluster; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,7 +80,7 @@ public class ManualExactlyOnceTest { flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16); flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s"); - ForkableFlinkMiniCluster flink = new ForkableFlinkMiniCluster(flinkConfig, false); + LocalFlinkMiniCluster flink = new LocalFlinkMiniCluster(flinkConfig, false); flink.start(); final int flinkPort = flink.getLeaderRPCPort();
