http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index d78a594..f974946 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -20,13 +20,14 @@ package org.apache.flink.runtime.taskmanager import java.io.{File, IOException} import java.net.{InetAddress, InetSocketAddress} +import java.util.UUID import java.util.concurrent.TimeUnit import java.lang.reflect.Method import java.lang.management.{OperatingSystemMXBean, ManagementFactory} -import akka.actor._ -import akka.pattern.ask -import akka.util.Timeout +import _root_.akka.actor._ +import _root_.akka.pattern.ask +import _root_.akka.util.Timeout import com.codahale.metrics.{Gauge, MetricFilter, MetricRegistry} import com.codahale.metrics.json.MetricsModule @@ -36,9 +37,10 @@ import com.fasterxml.jackson.databind.ObjectMapper import grizzled.slf4j.Logger import org.apache.flink.configuration.{Configuration, ConfigConstants, GlobalConfiguration, IllegalConfigurationException} + +import org.apache.flink.runtime.accumulators.AccumulatorSnapshot import org.apache.flink.runtime.messages.checkpoint.{NotifyCheckpointComplete, TriggerCheckpoint, AbstractCheckpointMessage} -import org.apache.flink.runtime.accumulators.{AccumulatorSnapshot, AccumulatorRegistry} -import org.apache.flink.runtime.{StreamingMode, ActorSynchronousLogging, ActorLogMessages} +import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessages, LogMessages, StreamingMode} import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.blob.{BlobService, BlobCache} import org.apache.flink.runtime.broadcast.BroadcastVariableManager @@ -46,7 +48,8 @@ import org.apache.flink.runtime.deployment.{InputChannelDeploymentDescriptor, Ta import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, FallbackLibraryCacheManager, LibraryCacheManager} import org.apache.flink.runtime.executiongraph.ExecutionAttemptID import org.apache.flink.runtime.filecache.FileCache -import org.apache.flink.runtime.instance.{HardwareDescription, InstanceConnectionInfo, InstanceID} +import org.apache.flink.runtime.instance.{AkkaActorGateway, HardwareDescription, +InstanceConnectionInfo, InstanceID} import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode import org.apache.flink.runtime.io.disk.iomanager.{IOManager, IOManagerAsync} import org.apache.flink.runtime.io.network.NetworkEnvironment @@ -114,15 +117,20 @@ import scala.language.postfixOps * - Exceptions releasing intermediate result resources. Critical resource leak, * requires a clean JVM. */ -class TaskManager(protected val config: TaskManagerConfiguration, - protected val connectionInfo: InstanceConnectionInfo, - protected val jobManagerAkkaURL: String, - protected val memoryManager: MemoryManager, - protected val ioManager: IOManager, - protected val network: NetworkEnvironment, - protected val numberOfSlots: Int) - -extends Actor with ActorLogMessages with ActorSynchronousLogging { +class TaskManager( + protected val config: TaskManagerConfiguration, + protected val connectionInfo: InstanceConnectionInfo, + protected val jobManagerAkkaURL: String, + protected val memoryManager: MemoryManager, + protected val ioManager: IOManager, + protected val network: NetworkEnvironment, + protected val numberOfSlots: Int) + extends FlinkActor + with LeaderSessionMessages // Mixin order is important: second we want to filter leader messages + with LogMessages // Mixin order is important: first we want to support message logging +{ + + override val log = Logger(getClass) /** The timeout for all actor ask futures */ protected val askTimeout = new Timeout(config.timeout) @@ -161,6 +169,10 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging { private var heartbeatScheduler: Option[Cancellable] = None + protected var leaderSessionID: Option[UUID] = None + + private var currentRegistrationSessionID: UUID = UUID.randomUUID() + // -------------------------------------------------------------------------- // Actor messages and life cycle // -------------------------------------------------------------------------- @@ -183,8 +195,15 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging { // kick off the registration val deadline: Option[Deadline] = config.maxRegistrationDuration.map(_.fromNow) - self.tell(TriggerTaskManagerRegistration(jobManagerAkkaURL, - TaskManager.INITIAL_REGISTRATION_TIMEOUT, deadline,1), ActorRef.noSender) + self ! decorateMessage( + TriggerTaskManagerRegistration( + currentRegistrationSessionID, + jobManagerAkkaURL, + TaskManager.INITIAL_REGISTRATION_TIMEOUT, + deadline, + 1) + ) + } /** @@ -236,8 +255,7 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging { * Central handling of actor messages. This method delegates to the more specialized * methods for handling certain classes of messages. */ - override def receiveWithLogMessages: Receive = { - + override def handleMessage: Receive = { // task messages are most common and critical, we handle them first case message: TaskMessage => handleTaskMessage(message) @@ -259,7 +277,7 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging { // its registration at the JobManager case NotifyWhenRegisteredAtJobManager => if (isConnected) { - sender ! RegisteredAtJobManager + sender ! decorateMessage(RegisteredAtJobManager) } else { waitForRegistration += sender } @@ -339,12 +357,12 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging { // was into a terminal state, or in case the JobManager cannot be informed of the // state transition - case updateMsg@UpdateTaskExecutionState(taskExecutionState: TaskExecutionState) => + case updateMsg@UpdateTaskExecutionState(taskExecutionState: TaskExecutionState) => // we receive these from our tasks and forward them to the JobManager currentJobManager foreach { jobManager => { - val futureResponse = (jobManager ? updateMsg)(askTimeout) + val futureResponse = (jobManager ? decorateMessage(updateMsg))(askTimeout) val executionID = taskExecutionState.getID @@ -353,13 +371,20 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging { // but only send messages to the TaskManager to do those changes case Success(result) => if (!result) { - self ! FailTask(executionID, + self ! decorateMessage( + FailTask( + executionID, new Exception("Task has been cancelled on the JobManager.")) + ) } case Failure(t) => - self ! FailTask(executionID, new Exception( - "Failed to send ExecutionStateChange notification to JobManager")) + self ! decorateMessage( + FailTask( + executionID, + new Exception( + "Failed to send ExecutionStateChange notification to JobManager")) + ) }(context.dispatcher) } } @@ -387,11 +412,15 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging { val task = runningTasks.get(executionID) if (task != null) { task.cancelExecution() - sender ! new TaskOperationResult(executionID, true) + sender ! decorateMessage(new TaskOperationResult(executionID, true)) } else { log.debug(s"Cannot find task to cancel for execution ${executionID})") - sender ! new TaskOperationResult(executionID, false, + sender ! decorateMessage( + new TaskOperationResult( + executionID, + false, "No task with that execution ID was found.") + ) } case PartitionState(taskExecutionId, taskResultId, partitionId, state) => @@ -413,7 +442,6 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging { private def handleCheckpointingMessage(actorMessage: AbstractCheckpointMessage): Unit = { actorMessage match { - case message: TriggerCheckpoint => val taskExecutionId = message.getTaskExecutionId val checkpointId = message.getCheckpointId @@ -457,118 +485,148 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging { * @param message The registration message. */ private def handleRegistrationMessage(message: RegistrationMessage): Unit = { + if(message.registrationSessionID.equals(currentRegistrationSessionID)) { + message match { + case TriggerTaskManagerRegistration( + registrationSessionID, + jobManagerURL, + timeout, + deadline, + attempt) => + + if (isConnected) { + // this may be the case, if we queue another attempt and + // in the meantime, the registration is acknowledged + log.debug( + "TaskManager was triggered to register at JobManager, but is already registered") + } + else if (deadline.exists(_.isOverdue())) { + // we failed to register in time. that means we should quit + log.error("Failed to register at the JobManager withing the defined maximum " + + "connect time. Shutting down ...") - message match { - - case TriggerTaskManagerRegistration(jobManagerURL, timeout, deadline, attempt) => - if (isConnected) { - // this may be the case, if we queue another attempt and - // in the meantime, the registration is acknowledged - log.debug( - "TaskManager was triggered to register at JobManager, but is already registered") - } - else if (deadline.exists(_.isOverdue())) { - // we failed to register in time. that means we should quit - log.error("Failed to register at the JobManager withing the defined maximum " + - "connect time. Shutting down ...") + // terminate ourselves (hasta la vista) + self ! decorateMessage(PoisonPill) + } + else { + log.info(s"Trying to register at JobManager ${jobManagerURL} " + + s"(attempt ${attempt}, timeout: ${timeout})") + + val jobManager = context.actorSelection(jobManagerAkkaURL) + jobManager ! decorateMessage( + RegisterTaskManager( + registrationSessionID, + self, + connectionInfo, + resources, + numberOfSlots) + ) + + // the next timeout computes via exponential backoff with cap + val nextTimeout = (timeout * 2).min(TaskManager.MAX_REGISTRATION_TIMEOUT) + + // schedule (with our timeout s delay) a check triggers a new registration + // attempt, if we are not registered by then + context.system.scheduler.scheduleOnce(timeout) { + if (!isConnected) { + self ! decorateMessage( + TriggerTaskManagerRegistration( + registrationSessionID, + jobManagerURL, + nextTimeout, + deadline, + attempt + 1) + ) + } + }(context.dispatcher) + } - // terminate ourselves (hasta la vista) - self ! PoisonPill - } - else { - log.info(s"Trying to register at JobManager ${jobManagerURL} " + - s"(attempt ${attempt}, timeout: ${timeout})") - - val jobManager = context.actorSelection(jobManagerAkkaURL) - jobManager ! RegisterTaskManager(self, connectionInfo, resources, numberOfSlots) - - // the next timeout computes via exponential backoff with cap - val nextTimeout = (timeout * 2).min(TaskManager.MAX_REGISTRATION_TIMEOUT) - - // schedule (with our timeout s delay) a check triggers a new registration - // attempt, if we are not registered by then - context.system.scheduler.scheduleOnce(timeout) { - if (!isConnected) { - self.tell(TriggerTaskManagerRegistration(jobManagerURL, - nextTimeout, deadline, attempt + 1), ActorRef.noSender) + // successful registration. associate with the JobManager + // we disambiguate duplicate or erroneous messages, to simplify debugging + case AcknowledgeRegistration(_, leaderSessionID, jobManager, id, blobPort) => + if (isConnected) { + if (jobManager == currentJobManager.orNull) { + log.debug("Ignoring duplicate registration acknowledgement.") + } else { + log.warn(s"Ignoring 'AcknowledgeRegistration' message from ${jobManager.path} , " + + s"because the TaskManager is already registered at ${currentJobManager.orNull}") } - }(context.dispatcher) - } - - // successful registration. associate with the JobManager - // we disambiguate duplicate or erroneous messages, to simplify debugging - case AcknowledgeRegistration(jobManager, id, blobPort) => - if (isConnected) { - if (jobManager == currentJobManager.orNull) { - log.debug("Ignoring duplicate registration acknowledgement.") - } else { - log.warn(s"Ignoring 'AcknowledgeRegistration' message from ${jobManager.path} , " + - s"because the TaskManager is already registered at ${currentJobManager.orNull}") } - } - else { - // not yet connected, so let's associate with that JobManager - try { - associateWithJobManager(jobManager, id, blobPort) - } catch { - case t: Throwable => - killTaskManagerFatal( - "Unable to start TaskManager components after registering at JobManager", t) + else { + // not yet connected, so let's associate with that JobManager + try { + associateWithJobManager(jobManager, id, blobPort, leaderSessionID) + } catch { + case t: Throwable => + killTaskManagerFatal( + "Unable to start TaskManager components after registering at JobManager", t) + } } - } - // we are already registered at that specific JobManager - duplicate answer, rare cases - case AlreadyRegistered(jobManager, id, blobPort) => - if (isConnected) { - if (jobManager == currentJobManager.orNull) { - log.debug("Ignoring duplicate registration acknowledgement.") - } else { - log.warn(s"Received 'AlreadyRegistered' message from JobManager ${jobManager.path}, " + - s"even through TaskManager is currently registered at ${currentJobManager.orNull}") + // we are already registered at that specific JobManager - duplicate answer, rare cases + case AlreadyRegistered(_, leaderSesssionID, jobManager, id, blobPort) => + if (isConnected) { + if (jobManager == currentJobManager.orNull) { + log.debug("Ignoring duplicate registration acknowledgement.") + } else { + log.warn(s"Received 'AlreadyRegistered' message from " + + s"JobManager ${jobManager.path}, even through TaskManager is currently " + + s"registered at ${currentJobManager.orNull}") + } } - } - else { - // not connected, yet, to let's associate - log.info("Received 'AlreadyRegistered' message before 'AcknowledgeRegistration'") - - try { - associateWithJobManager(jobManager, id, blobPort) - } catch { - case t: Throwable => - killTaskManagerFatal( - "Unable to start TaskManager components after registering at JobManager", t) + else { + // not connected, yet, to let's associate + log.info("Received 'AlreadyRegistered' message before 'AcknowledgeRegistration'") + + try { + associateWithJobManager(jobManager, id, blobPort, leaderSesssionID) + } catch { + case t: Throwable => + killTaskManagerFatal( + "Unable to start TaskManager components after registering at JobManager", t) + } } - } - case RefuseRegistration(reason) => - if (currentJobManager.isEmpty) { - log.error(s"The registration at JobManager ${jobManagerAkkaURL} was refused, " + - s"because: ${reason}. Retrying later...") + case RefuseRegistration(registrationSessionID, reason) => + if (currentJobManager.isEmpty) { + log.error(s"The registration at JobManager ${jobManagerAkkaURL} was refused, " + + s"because: ${reason}. Retrying later...") - // try the registration again after some time + // try the registration again after some time - val delay: FiniteDuration = TaskManager.DELAY_AFTER_REFUSED_REGISTRATION - val deadline: Option[Deadline] = config.maxRegistrationDuration.map { - timeout => timeout + delay fromNow - } + val delay: FiniteDuration = TaskManager.DELAY_AFTER_REFUSED_REGISTRATION + val deadline: Option[Deadline] = config.maxRegistrationDuration.map { + timeout => timeout + delay fromNow + } - context.system.scheduler.scheduleOnce(delay) { - self.tell(TriggerTaskManagerRegistration(jobManagerAkkaURL, - TaskManager.INITIAL_REGISTRATION_TIMEOUT, deadline, 1), ActorRef.noSender) - }(context.dispatcher) - } - else { - // ignore RefuseRegistration messages which arrived after AcknowledgeRegistration - if (sender() == currentJobManager.orNull) { - log.warn(s"Received 'RefuseRegistration' from the JobManager (${sender().path})" + - s" even though this TaskManager is already registered there.") + context.system.scheduler.scheduleOnce(delay) { + self ! decorateMessage( + TriggerTaskManagerRegistration( + registrationSessionID, + jobManagerAkkaURL, + TaskManager.INITIAL_REGISTRATION_TIMEOUT, + deadline, + 1) + ) + }(context.dispatcher) } else { - log.warn(s"Ignoring 'RefuseRegistration' from unrelated JobManager (${sender().path})") + // ignore RefuseRegistration messages which arrived after AcknowledgeRegistration + if (sender() == currentJobManager.orNull) { + log.warn(s"Received 'RefuseRegistration' from the JobManager (${sender().path})" + + s" even though this TaskManager is already registered there.") + } + else { + log.warn(s"Ignoring 'RefuseRegistration' from unrelated " + + s"JobManager (${sender().path})") + } } - } - case _ => unhandled(message) + case _ => unhandled(message) + } + } else { + log.debug(s"Discarded registration message ${message}, because the registration session " + + "ID was not correct.") } } @@ -593,20 +651,28 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging { * @param id The instanceID under which the TaskManager is registered * at the JobManager. * @param blobPort The JobManager's port for the BLOB server. + * @param newLeaderSessionID Leader session ID of the JobManager */ - private def associateWithJobManager(jobManager: ActorRef, - id: InstanceID, - blobPort: Int): Unit = { + private def associateWithJobManager( + jobManager: ActorRef, + id: InstanceID, + blobPort: Int, + newLeaderSessionID: UUID) + : Unit = { if (jobManager == null) { - throw new NullPointerException("jobManager may not be null") + throw new NullPointerException("jobManager must not be null.") } if (id == null) { - throw new NullPointerException("instance ID may not be null") + throw new NullPointerException("instance ID must not be null.") } if (blobPort <= 0 || blobPort > 65535) { throw new IllegalArgumentException("blob port is out of range: " + blobPort) } + if(newLeaderSessionID == null) { + throw new NullPointerException("Leader session ID must not be null.") + } + // sanity check that we are not currently registered with a different JobManager if (isConnected) { @@ -631,9 +697,18 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging { throw new IllegalStateException("JobManager-specific components are already initialized.") } + currentJobManager = Some(jobManager) + instanceID = id + leaderSessionID = Some(newLeaderSessionID) + // start the network stack, now that we have the JobManager actor reference try { - network.associateWithTaskManagerAndJobManager(jobManager, self) + network.associateWithTaskManagerAndJobManager( + new AkkaActorGateway(jobManager, leaderSessionID), + new AkkaActorGateway(self, leaderSessionID) + ) + + } catch { case e: Exception => @@ -665,16 +740,18 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging { libraryCacheManager = Some(new FallbackLibraryCacheManager) } - currentJobManager = Some(jobManager) - instanceID = id - // watch job manager to detect when it dies context.watch(jobManager) // schedule regular heartbeat message for oneself - heartbeatScheduler = Some(context.system.scheduler.schedule( - TaskManager.HEARTBEAT_INTERVAL, TaskManager.HEARTBEAT_INTERVAL, self, SendHeartbeat) - (context.dispatcher)) + heartbeatScheduler = Some( + context.system.scheduler.schedule( + TaskManager.HEARTBEAT_INTERVAL, + TaskManager.HEARTBEAT_INTERVAL, + self, + decorateMessage(SendHeartbeat) + )(context.dispatcher) + ) // notify all the actors that listen for a successful registration for (listener <- waitForRegistration) { @@ -710,7 +787,7 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging { // de-register from the JobManager (faster detection of disconnect) currentJobManager foreach { - _ ! Disconnect(s"TaskManager ${self.path} is shutting down.") + _ ! decorateMessage(Disconnect(s"TaskManager ${self.path} is shutting down.")) } currentJobManager = None @@ -748,8 +825,14 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging { // begin attempts to reconnect val deadline: Option[Deadline] = config.maxRegistrationDuration.map(_.fromNow) - self ! TriggerTaskManagerRegistration(jobManagerAkkaURL, - TaskManager.INITIAL_REGISTRATION_TIMEOUT, deadline, 1) + self ! decorateMessage( + TriggerTaskManagerRegistration( + currentRegistrationSessionID, + jobManagerAkkaURL, + TaskManager.INITIAL_REGISTRATION_TIMEOUT, + deadline, + 1) + ) } catch { // this is pretty bad, it leaves the TaskManager in a state where it cannot @@ -795,8 +878,21 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging { // create the task. this does not grab any TaskManager resources or download // and libraries - the operation does not block - val task = new Task(tdd, memoryManager, ioManager, network, bcVarManager, - self, jobManagerActor, config.timeout, libCache, fileCache) + + val jobManagerGateway = new AkkaActorGateway(jobManagerActor, leaderSessionID) + val selfGateway = new AkkaActorGateway(self, leaderSessionID) + + val task = new Task( + tdd, + memoryManager, + ioManager, + network, + bcVarManager, + selfGateway, + jobManagerGateway, + config.timeout, + libCache, + fileCache) log.info(s"Received task ${task.getTaskNameWithSubtasks}") @@ -812,12 +908,12 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging { // all good, we kick off the task, which performs its own initialization task.startTaskThread() - sender ! Acknowledge + sender ! decorateMessage(Acknowledge) } catch { case t: Throwable => log.error("SubmitTask failed", t) - sender ! Failure(t) + sender ! decorateMessage(Failure(t)) } } @@ -828,8 +924,9 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging { * @param partitionInfos The descriptor of the intermediate result partitions. */ private def updateTaskInputPartitions( - executionId: ExecutionAttemptID, - partitionInfos: Seq[(IntermediateDataSetID, InputChannelDeploymentDescriptor)]) : Unit = { + executionId: ExecutionAttemptID, + partitionInfos: Seq[(IntermediateDataSetID, InputChannelDeploymentDescriptor)]) + : Unit = { Option(runningTasks.get(executionId)) match { case Some(task) => @@ -867,15 +964,15 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging { } if (errors.isEmpty) { - sender ! Acknowledge + sender ! decorateMessage(Acknowledge) } else { - sender ! Failure(new Exception(errors.mkString("\n"))) + sender ! decorateMessage(Failure(new Exception(errors.mkString("\n")))) } case None => log.debug(s"Discard update for input partitions of task $executionId : " + s"task is no longer running.") - sender ! Acknowledge + sender ! decorateMessage(Acknowledge) } } @@ -919,9 +1016,16 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging { registry.getSnapshot } - self ! UpdateTaskExecutionState(new TaskExecutionState( - task.getJobID, task.getExecutionId, task.getExecutionState, task.getFailureCause, - accumulators)) + self ! decorateMessage( + UpdateTaskExecutionState( + new TaskExecutionState( + task.getJobID, + task.getExecutionId, + task.getExecutionState, + task.getFailureCause, + accumulators) + ) + ) } else { log.error(s"Cannot find task with ID $executionID to unregister.") @@ -952,7 +1056,7 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging { } currentJobManager foreach { - jm => jm ! Heartbeat(instanceID, metricsReport, accumulatorEvents) + jm => jm ! decorateMessage(Heartbeat(instanceID, metricsReport, accumulatorEvents)) } } catch { @@ -972,14 +1076,12 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging { try { val traces = Thread.getAllStackTraces.asScala - val stackTraceStr = traces.map( - (trace: (Thread, Array[StackTraceElement])) => { - val (thread, elements) = trace + val stackTraceStr = traces.map { + case (thread: Thread, elements: Array[StackTraceElement]) => "Thread: " + thread.getName + '\n' + elements.mkString("\n") - }) - .mkString("\n\n") + }.mkString("\n\n") - recipient ! StackTrace(instanceID, stackTraceStr) + recipient ! decorateMessage(StackTrace(instanceID, stackTraceStr)) } catch { case e: Exception => log.error("Failed to send stack trace to " + recipient.path, e) @@ -999,7 +1101,7 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging { "\n" + "A fatal error occurred, forcing the TaskManager to shut down: " + message, cause) - self ! Kill + self ! decorateMessage(Kill) } } @@ -1165,24 +1267,34 @@ object TaskManager { * Allows to use TaskManager subclasses for example for YARN. */ @throws(classOf[Exception]) - def selectNetworkInterfaceAndRunTaskManager(configuration: Configuration, - streamingMode: StreamingMode, - taskManagerClass: Class[_ <: TaskManager]) : Unit = { + def selectNetworkInterfaceAndRunTaskManager( + configuration: Configuration, + streamingMode: StreamingMode, + taskManagerClass: Class[_ <: TaskManager]) + : Unit = { val (jobManagerHostname, jobManagerPort) = getAndCheckJobManagerAddress(configuration) - val (taskManagerHostname, actorSystemPort) = - selectNetworkInterfaceAndPort(configuration, jobManagerHostname, jobManagerPort) + val (taskManagerHostname, actorSystemPort) = selectNetworkInterfaceAndPort( + configuration, + jobManagerHostname, + jobManagerPort) - runTaskManager(taskManagerHostname, actorSystemPort, configuration, - streamingMode, taskManagerClass) + runTaskManager( + taskManagerHostname, + actorSystemPort, + configuration, + streamingMode, + taskManagerClass) } @throws(classOf[IOException]) @throws(classOf[IllegalConfigurationException]) - def selectNetworkInterfaceAndPort(configuration: Configuration, - jobManagerHostname: String, - jobManagerPort: Int) : (String, Int) = { + def selectNetworkInterfaceAndPort( + configuration: Configuration, + jobManagerHostname: String, + jobManagerPort: Int) + : (String, Int) = { var taskManagerHostname = configuration.getString( ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, null) @@ -1243,13 +1355,19 @@ object TaskManager { * @param configuration The configuration for the TaskManager. */ @throws(classOf[Exception]) - def runTaskManager(taskManagerHostname: String, - actorSystemPort: Int, - configuration: Configuration, - streamingMode: StreamingMode) : Unit = { + def runTaskManager( + taskManagerHostname: String, + actorSystemPort: Int, + configuration: Configuration, + streamingMode: StreamingMode) + : Unit = { - runTaskManager(taskManagerHostname, actorSystemPort, configuration, - streamingMode, classOf[TaskManager]) + runTaskManager( + taskManagerHostname, + actorSystemPort, + configuration, + streamingMode, + classOf[TaskManager]) } /** @@ -1269,11 +1387,13 @@ object TaskManager { * subclasses for example for YARN. */ @throws(classOf[Exception]) - def runTaskManager(taskManagerHostname: String, - actorSystemPort: Int, - configuration: Configuration, - streamingMode: StreamingMode, - taskManagerClass: Class[_ <: TaskManager]) : Unit = { + def runTaskManager( + taskManagerHostname: String, + actorSystemPort: Int, + configuration: Configuration, + streamingMode: StreamingMode, + taskManagerClass: Class[_ <: TaskManager]) + : Unit = { LOG.info(s"Starting TaskManager in streaming mode $streamingMode") @@ -1282,8 +1402,10 @@ object TaskManager { LOG.info(s"Starting TaskManager actor system at $taskManagerHostname:$actorSystemPort") val taskManagerSystem = try { - val akkaConfig = AkkaUtils.getAkkaConfig(configuration, - Some((taskManagerHostname, actorSystemPort))) + val akkaConfig = AkkaUtils.getAkkaConfig( + configuration, + Some((taskManagerHostname, actorSystemPort)) + ) if (LOG.isDebugEnabled) { LOG.debug("Using akka configuration\n " + akkaConfig) } @@ -1307,13 +1429,14 @@ object TaskManager { // and the TaskManager actor try { LOG.info("Starting TaskManager actor") - val taskManager = startTaskManagerComponentsAndActor(configuration, - taskManagerSystem, - taskManagerHostname, - Some(TASK_MANAGER_NAME), - None, false, - streamingMode, - taskManagerClass) + val taskManager = startTaskManagerComponentsAndActor( + configuration, + taskManagerSystem, + taskManagerHostname, + Some(TASK_MANAGER_NAME), + None, false, + streamingMode, + taskManagerClass) // start a process reaper that watches the JobManager. If the TaskManager actor dies, // the process reaper will kill the JVM process (to ensure easy failure detection) @@ -1433,14 +1556,15 @@ object TaskManager { configuredMemory << 20 // megabytes to bytes } else { - val fraction = configuration.getFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, - ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION) + val fraction = configuration.getFloat( + ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, + ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION) checkConfigParameter(fraction > 0.0f && fraction < 1.0f, fraction, ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, "MemoryManager fraction of the free memory must be between 0.0 and 1.0") val relativeMemSize = (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * - fraction).toLong + fraction).toLong LOG.info(s"Using $fraction of the currently free heap space for Flink managed " + s"memory (${relativeMemSize >> 20} MB).") @@ -1452,10 +1576,11 @@ object TaskManager { // now start the memory manager val memoryManager = try { - new DefaultMemoryManager(memorySize, - taskManagerConfig.numberOfSlots, - netConfig.networkBufferSize, - preAllocateMemory) + new DefaultMemoryManager( + memorySize, + taskManagerConfig.numberOfSlots, + netConfig.networkBufferSize, + preAllocateMemory) } catch { case e: OutOfMemoryError => throw new Exception( @@ -1472,7 +1597,8 @@ object TaskManager { } // create the actor properties (which define the actor constructor parameters) - val tmProps = Props(taskManagerClass, + val tmProps = Props( + taskManagerClass, taskManagerConfig, connectionInfo, jobManagerAkkaUrl, @@ -1502,9 +1628,11 @@ object TaskManager { * @return The ActorRef to the TaskManager */ @throws(classOf[IOException]) - def getTaskManagerRemoteReference(taskManagerUrl: String, - system: ActorSystem, - timeout: FiniteDuration): ActorRef = { + def getTaskManagerRemoteReference( + taskManagerUrl: String, + system: ActorSystem, + timeout: FiniteDuration) + : ActorRef = { try { val future = AkkaUtils.getReference(taskManagerUrl, system, timeout) Await.result(future, timeout) @@ -1536,10 +1664,11 @@ object TaskManager { * InstanceConnectionInfo, JobManager actor Akka URL). */ @throws(classOf[IllegalArgumentException]) - def parseTaskManagerConfiguration(configuration: Configuration, - taskManagerHostname: String, - localTaskManagerCommunication: Boolean): - (TaskManagerConfiguration, + def parseTaskManagerConfiguration( + configuration: Configuration, + taskManagerHostname: String, + localTaskManagerCommunication: Boolean) + : (TaskManagerConfiguration, NetworkEnvironmentConfiguration, InstanceConnectionInfo) = { @@ -1579,10 +1708,10 @@ object TaskManager { ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY) val pageSizeNew: Int = configuration.getInteger( - ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, -1) + ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, -1) val pageSizeOld: Int = configuration.getInteger( - ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY, -1) + ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY, -1) val pageSize: Int = if (pageSizeNew != -1) { @@ -1617,7 +1746,7 @@ object TaskManager { val tmpDirs = configuration.getString( ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH) - .split(",|" + File.pathSeparator) + .split(",|" + File.pathSeparator) val nettyConfig = if (localTaskManagerCommunication) { None @@ -1633,7 +1762,10 @@ object TaskManager { val ioMode : IOMode = if (syncOrAsync == "async") IOMode.ASYNC else IOMode.SYNC val networkConfig = NetworkEnvironmentConfiguration( - numNetworkBuffers, pageSize, ioMode, nettyConfig) + numNetworkBuffers, + pageSize, + ioMode, + nettyConfig) // ----> timeouts, library caching, profiling @@ -1667,8 +1799,12 @@ object TaskManager { e) } - val taskManagerConfig = TaskManagerConfiguration(tmpDirs, cleanupInterval, timeout, - finiteRegistratioDuration, slots, + val taskManagerConfig = TaskManagerConfiguration( + tmpDirs, + cleanupInterval, + timeout, + finiteRegistratioDuration, + slots, configuration) (taskManagerConfig, networkConfig, connectionInfo) @@ -1720,10 +1856,12 @@ object TaskManager { * @throws IllegalConfigurationException Thrown if the condition is violated. */ @throws(classOf[IllegalConfigurationException]) - private def checkConfigParameter(condition: Boolean, - parameter: Any, - name: String, - errorMessage: String = ""): Unit = { + private def checkConfigParameter( + condition: Boolean, + parameter: Any, + name: String, + errorMessage: String = "") + : Unit = { if (!condition) { throw new IllegalConfigurationException( s"Invalid configuration value for '${name}' : ${parameter} - ${errorMessage}")
http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java new file mode 100644 index 0000000..324b014 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.akka; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Kill; +import akka.actor.Props; +import akka.testkit.JavaTestKit; +import akka.testkit.TestActorRef; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.messages.RequiresLeaderSessionID; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import scala.Option; + +import java.util.UUID; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class FlinkUntypedActorTest { + + private static ActorSystem actorSystem; + + @BeforeClass + public static void setup() { + actorSystem = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig()); + } + + @AfterClass + public static void teardown() { + JavaTestKit.shutdownActorSystem(actorSystem); + } + + @Test + public void testLeaderSessionMessageFilteringOfFlinkUntypedActor() { + final Option<UUID> leaderSessionID = Option.apply(UUID.randomUUID()); + final Option<UUID> oldSessionID = Option.apply(UUID.randomUUID()); + + TestActorRef<PlainFlinkUntypedActor> actor = null; + + try { + actor = TestActorRef.create( + actorSystem, Props.create(PlainFlinkUntypedActor.class, leaderSessionID)); + + final PlainFlinkUntypedActor underlyingActor = actor.underlyingActor(); + + actor.tell(new JobManagerMessages.LeaderSessionMessage(leaderSessionID, 1), ActorRef.noSender()); + actor.tell(new JobManagerMessages.LeaderSessionMessage(oldSessionID, 2), ActorRef.noSender()); + actor.tell(new JobManagerMessages.LeaderSessionMessage(leaderSessionID, 2), ActorRef.noSender()); + actor.tell(1, ActorRef.noSender()); + + assertEquals(3, underlyingActor.getMessageCounter()); + + } finally { + stopActor(actor); + } + } + + @Test + public void testThrowingExceptionWhenReceivingNonWrappedRequiresLeaderSessionIDMessage() { + final Option<UUID> leaderSessionID = Option.apply(UUID.randomUUID()); + + TestActorRef<PlainFlinkUntypedActor> actor = null; + + try{ + final Props props = Props.create(PlainFlinkUntypedActor.class, leaderSessionID); + actor = TestActorRef.create(actorSystem, props); + + actor.receive(new JobManagerMessages.LeaderSessionMessage(leaderSessionID, 1)); + + try { + actor.receive(new PlainRequiresLeaderSessionID()); + + fail("Expected an exception to be thrown, because a RequiresLeaderSessionID" + + "message was sent without being wrapped in LeaderSessionMessage."); + } catch (Exception e) { + assertEquals("Received a message PlainRequiresLeaderSessionID " + + "without a leader session ID, even though it requires to have one.", + e.getMessage()); + } + + } finally { + stopActor(actor); + } + } + + private static void stopActor(ActorRef actor) { + if(actor != null) { + actor.tell(Kill.getInstance(), ActorRef.noSender()); + } + } + + + static class PlainFlinkUntypedActor extends FlinkUntypedActor { + + private Option<UUID> leaderSessionID; + + private int messageCounter; + + public PlainFlinkUntypedActor(Option<UUID> leaderSessionID) { + this.leaderSessionID = leaderSessionID; + this.messageCounter = 0; + } + + @Override + protected void handleMessage(Object message) throws Exception { + messageCounter++; + } + + @Override + protected Option<UUID> getLeaderSessionID() { + return leaderSessionID; + } + + public int getMessageCounter() { + return messageCounter; + } + } + + static class PlainRequiresLeaderSessionID implements RequiresLeaderSessionID { + @Override + public String toString() { + return "PlainRequiresLeaderSessionID"; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java index 932e366..b124304 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java @@ -18,12 +18,10 @@ package org.apache.flink.runtime.checkpoint; -import akka.actor.ActorRef; -import akka.pattern.Patterns; - import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -62,18 +60,19 @@ public class CoordinatorShutdownTest { JobGraph testGraph = new JobGraph("test job", vertex); testGraph.setSnapshotSettings(new JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList, 5000)); - ActorRef jobManager = cluster.getJobManager(); + ActorGateway jobManager = cluster.getJobManagerGateway(); FiniteDuration timeout = new FiniteDuration(60, TimeUnit.SECONDS); JobManagerMessages.SubmitJob submitMessage = new JobManagerMessages.SubmitJob(testGraph, false); // submit is successful, but then the job dies because no TaskManager / slot is available - Future<Object> submitFuture = Patterns.ask(jobManager, submitMessage, timeout.toMillis()); + Future<Object> submitFuture = jobManager.ask(submitMessage, timeout); Await.result(submitFuture, timeout); // get the execution graph and make sure the coordinator is properly shut down - Future<Object> jobRequestFuture = Patterns.ask(jobManager, - new JobManagerMessages.RequestJob(testGraph.getJobID()), timeout.toMillis()); + Future<Object> jobRequestFuture = jobManager.ask( + new JobManagerMessages.RequestJob(testGraph.getJobID()), + timeout); ExecutionGraph graph = ((JobManagerMessages.JobFound) Await.result(jobRequestFuture, timeout)).executionGraph(); @@ -109,18 +108,19 @@ public class CoordinatorShutdownTest { JobGraph testGraph = new JobGraph("test job", vertex); testGraph.setSnapshotSettings(new JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList, 5000)); - ActorRef jobManager = cluster.getJobManager(); + ActorGateway jobManager = cluster.getJobManagerGateway(); FiniteDuration timeout = new FiniteDuration(60, TimeUnit.SECONDS); JobManagerMessages.SubmitJob submitMessage = new JobManagerMessages.SubmitJob(testGraph, false); // submit is successful, but then the job dies because no TaskManager / slot is available - Future<Object> submitFuture = Patterns.ask(jobManager, submitMessage, timeout.toMillis()); + Future<Object> submitFuture = jobManager.ask(submitMessage, timeout); Await.result(submitFuture, timeout); // get the execution graph and make sure the coordinator is properly shut down - Future<Object> jobRequestFuture = Patterns.ask(jobManager, - new JobManagerMessages.RequestJob(testGraph.getJobID()), timeout.toMillis()); + Future<Object> jobRequestFuture = jobManager.ask( + new JobManagerMessages.RequestJob(testGraph.getJobID()), + timeout); ExecutionGraph graph = ((JobManagerMessages.JobFound) Await.result(jobRequestFuture, timeout)).executionGraph(); http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java index cff7146..e3fc852 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java @@ -92,7 +92,7 @@ public class ExecutionGraphDeploymentTest { ExecutionJobVertex ejv = eg.getAllVertices().get(jid2); ExecutionVertex vertex = ejv.getTaskVertices()[3]; - ExecutionGraphTestUtils.SimpleInstanceGateway instanceGateway = new ExecutionGraphTestUtils.SimpleInstanceGateway(TestingUtils.directExecutionContext()); + ExecutionGraphTestUtils.SimpleActorGateway instanceGateway = new ExecutionGraphTestUtils.SimpleActorGateway(TestingUtils.directExecutionContext()); final Instance instance = getInstance(instanceGateway); @@ -295,7 +295,7 @@ public class ExecutionGraphDeploymentTest { for (int i = 0; i < dop1 + dop2; i++) { scheduler.newInstanceAvailable( ExecutionGraphTestUtils.getInstance( - new ExecutionGraphTestUtils.SimpleInstanceGateway( + new ExecutionGraphTestUtils.SimpleActorGateway( TestingUtils.directExecutionContext()))); } assertEquals(dop1 + dop2, scheduler.getNumberOfAvailableSlots()); http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java index 8a63060..64d4c44 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java @@ -24,18 +24,17 @@ import static org.mockito.Mockito.spy; import java.lang.reflect.Field; import java.net.InetAddress; -import java.util.LinkedList; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.instance.BaseTestingInstanceGateway; +import org.apache.flink.runtime.instance.BaseTestingActorGateway; import org.apache.flink.runtime.instance.HardwareDescription; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.InstanceConnectionInfo; -import org.apache.flink.runtime.instance.InstanceGateway; +import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.jobgraph.JobVertex; @@ -101,11 +100,11 @@ public class ExecutionGraphTestUtils { // utility mocking methods // -------------------------------------------------------------------------------------------- - public static Instance getInstance(final InstanceGateway gateway) throws Exception { + public static Instance getInstance(final ActorGateway gateway) throws Exception { return getInstance(gateway, 1); } - public static Instance getInstance(final InstanceGateway gateway, final int numberOfSlots) throws Exception { + public static Instance getInstance(final ActorGateway gateway, final int numberOfSlots) throws Exception { HardwareDescription hardwareDescription = new HardwareDescription(4, 2L*1024*1024*1024, 1024*1024*1024, 512*1024*1024); InetAddress address = InetAddress.getByName("127.0.0.1"); InstanceConnectionInfo connection = new InstanceConnectionInfo(address, 10001); @@ -113,10 +112,10 @@ public class ExecutionGraphTestUtils { return new Instance(gateway, connection, new InstanceID(), hardwareDescription, numberOfSlots); } - public static class SimpleInstanceGateway extends BaseTestingInstanceGateway { + public static class SimpleActorGateway extends BaseTestingActorGateway { public TaskDeploymentDescriptor lastTDD; - public SimpleInstanceGateway(ExecutionContext executionContext){ + public SimpleActorGateway(ExecutionContext executionContext){ super(executionContext); } @@ -140,8 +139,8 @@ public class ExecutionGraphTestUtils { } } - public static class SimpleFailingInstanceGateway extends BaseTestingInstanceGateway { - public SimpleFailingInstanceGateway(ExecutionContext executionContext) { + public static class SimpleFailingActorGateway extends BaseTestingActorGateway { + public SimpleFailingActorGateway(ExecutionContext executionContext) { super(executionContext); } http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java index f47e92c..89b82f3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionStateProgressTest.java @@ -63,7 +63,7 @@ public class ExecutionStateProgressTest { // mock resources and mock taskmanager for (ExecutionVertex ee : ejv.getTaskVertices()) { SimpleSlot slot = getInstance( - new SimpleInstanceGateway( + new SimpleActorGateway( TestingUtils.defaultExecutionContext()) ).allocateSimpleSlot(jid); ee.deployToSlot(slot); http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java index 9db330b..e9b67af 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java @@ -26,9 +26,9 @@ import java.io.IOException; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.instance.BaseTestingInstanceGateway; -import org.apache.flink.runtime.instance.DummyInstanceGateway; -import org.apache.flink.runtime.instance.InstanceGateway; +import org.apache.flink.runtime.instance.BaseTestingActorGateway; +import org.apache.flink.runtime.instance.DummyActorGateway; +import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.api.common.JobID; @@ -125,12 +125,12 @@ public class ExecutionVertexCancelTest { setVertexState(vertex, ExecutionState.SCHEDULED); assertEquals(ExecutionState.SCHEDULED, vertex.getExecutionState()); - InstanceGateway instanceGateway = new CancelSequenceInstanceGateway( + ActorGateway actorGateway = new CancelSequenceActorGateway( executionContext, new TaskOperationResult(execId, true), new TaskOperationResult(execId, false)); - Instance instance = getInstance(instanceGateway); + Instance instance = getInstance(actorGateway); SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); vertex.deployToSlot(slot); @@ -195,13 +195,13 @@ public class ExecutionVertexCancelTest { // task manager cancel sequence mock actor // first return NOT SUCCESS (task not found, cancel call overtook deploy call), then success (cancel call after deploy call) - InstanceGateway instanceGateway = new CancelSequenceInstanceGateway( + ActorGateway actorGateway = new CancelSequenceActorGateway( executionContext, new TaskOperationResult(execId, false), new TaskOperationResult(execId, true) ); - Instance instance = getInstance(instanceGateway); + Instance instance = getInstance(actorGateway); SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); vertex.deployToSlot(slot); @@ -258,11 +258,11 @@ public class ExecutionVertexCancelTest { AkkaUtils.getDefaultTimeout()); final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId(); - InstanceGateway instanceGateway = new CancelSequenceInstanceGateway( + ActorGateway actorGateway = new CancelSequenceActorGateway( TestingUtils.directExecutionContext(), new TaskOperationResult(execId, true)); - Instance instance = getInstance(instanceGateway); + Instance instance = getInstance(actorGateway); SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); setVertexState(vertex, ExecutionState.RUNNING); @@ -299,12 +299,12 @@ public class ExecutionVertexCancelTest { AkkaUtils.getDefaultTimeout()); final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId(); - final InstanceGateway instanceGateway = new CancelSequenceInstanceGateway( + final ActorGateway actorGateway = new CancelSequenceActorGateway( TestingUtils.directExecutionContext(), new TaskOperationResult(execId, true) ); - Instance instance = getInstance(instanceGateway); + Instance instance = getInstance(actorGateway); SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); setVertexState(vertex, ExecutionState.RUNNING); @@ -350,12 +350,12 @@ public class ExecutionVertexCancelTest { final ExecutionAttemptID execId = vertex.getCurrentExecutionAttempt().getAttemptId(); - final InstanceGateway instanceGateway = new CancelSequenceInstanceGateway( + final ActorGateway actorGateway = new CancelSequenceActorGateway( TestingUtils.directExecutionContext(), new TaskOperationResult(execId, false) ); - Instance instance = getInstance(instanceGateway); + Instance instance = getInstance(actorGateway); SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); setVertexState(vertex, ExecutionState.RUNNING); @@ -386,7 +386,7 @@ public class ExecutionVertexCancelTest { final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout()); - final InstanceGateway gateway = new CancelSequenceInstanceGateway(TestingUtils.directExecutionContext()); + final ActorGateway gateway = new CancelSequenceActorGateway(TestingUtils.directExecutionContext()); Instance instance = getInstance(gateway); SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); @@ -423,7 +423,7 @@ public class ExecutionVertexCancelTest { AkkaUtils.getDefaultTimeout()); final ExecutionAttemptID execID = vertex.getCurrentExecutionAttempt().getAttemptId(); - final InstanceGateway gateway = new CancelSequenceInstanceGateway( + final ActorGateway gateway = new CancelSequenceActorGateway( TestingUtils.defaultExecutionContext(), new TaskOperationResult(execID, true)); @@ -482,7 +482,7 @@ public class ExecutionVertexCancelTest { // deploying after canceling from CREATED needs to raise an exception, because // the scheduler (or any caller) needs to know that the slot should be released try { - Instance instance = getInstance(DummyInstanceGateway.INSTANCE); + Instance instance = getInstance(DummyActorGateway.INSTANCE); SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); vertex.deployToSlot(slot); @@ -525,7 +525,7 @@ public class ExecutionVertexCancelTest { AkkaUtils.getDefaultTimeout()); setVertexState(vertex, ExecutionState.CANCELING); - Instance instance = getInstance(DummyInstanceGateway.INSTANCE); + Instance instance = getInstance(DummyActorGateway.INSTANCE); SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); vertex.deployToSlot(slot); @@ -541,7 +541,7 @@ public class ExecutionVertexCancelTest { ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout()); - Instance instance = getInstance(DummyInstanceGateway.INSTANCE); + Instance instance = getInstance(DummyActorGateway.INSTANCE); SimpleSlot slot = instance.allocateSimpleSlot(new JobID()); setVertexResource(vertex, slot); @@ -562,11 +562,11 @@ public class ExecutionVertexCancelTest { } } - public static class CancelSequenceInstanceGateway extends BaseTestingInstanceGateway { + public static class CancelSequenceActorGateway extends BaseTestingActorGateway { private final TaskOperationResult[] results; private int index = -1; - public CancelSequenceInstanceGateway(ExecutionContext executionContext, TaskOperationResult ... result) { + public CancelSequenceActorGateway(ExecutionContext executionContext, TaskOperationResult... result) { super(executionContext); this.results = result; } http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java index 431c3a9..81ec6c9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java @@ -43,7 +43,7 @@ public class ExecutionVertexDeploymentTest { // mock taskmanager to simply accept the call Instance instance = getInstance( - new SimpleInstanceGateway(TestingUtils.directExecutionContext())); + new SimpleActorGateway(TestingUtils.directExecutionContext())); final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId()); final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], @@ -81,7 +81,7 @@ public class ExecutionVertexDeploymentTest { final ExecutionJobVertex ejv = getExecutionVertex(jid, TestingUtils.directExecutionContext()); final Instance instance = getInstance( - new SimpleInstanceGateway(TestingUtils.directExecutionContext())); + new SimpleActorGateway(TestingUtils.directExecutionContext())); final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId()); final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], @@ -124,7 +124,7 @@ public class ExecutionVertexDeploymentTest { AkkaUtils.getDefaultTimeout()); final Instance instance = getInstance( - new SimpleInstanceGateway(TestingUtils.defaultExecutionContext())); + new SimpleActorGateway(TestingUtils.defaultExecutionContext())); final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId()); assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); @@ -171,7 +171,7 @@ public class ExecutionVertexDeploymentTest { AkkaUtils.getDefaultTimeout()); final Instance instance = getInstance( - new SimpleFailingInstanceGateway(TestingUtils.directExecutionContext())); + new SimpleFailingActorGateway(TestingUtils.directExecutionContext())); final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId()); assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); @@ -201,7 +201,7 @@ public class ExecutionVertexDeploymentTest { AkkaUtils.getDefaultTimeout()); final Instance instance = getInstance( - new SimpleFailingInstanceGateway(TestingUtils.directExecutionContext())); + new SimpleFailingActorGateway(TestingUtils.directExecutionContext())); final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId()); assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); @@ -244,7 +244,7 @@ public class ExecutionVertexDeploymentTest { final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout()); - final Instance instance = getInstance(new SimpleInstanceGateway(TestingUtils.directExecutionContext())); + final Instance instance = getInstance(new SimpleActorGateway(TestingUtils.directExecutionContext())); final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId()); assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); @@ -286,7 +286,7 @@ public class ExecutionVertexDeploymentTest { final ExecutionAttemptID eid = vertex.getCurrentExecutionAttempt().getAttemptId(); final Instance instance = getInstance( - new ExecutionVertexCancelTest.CancelSequenceInstanceGateway( + new ExecutionVertexCancelTest.CancelSequenceActorGateway( context, new TaskOperationResult(eid, false), new TaskOperationResult(eid, true))); http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java index 8ea7017..5e9ee33 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java @@ -25,7 +25,7 @@ import static org.mockito.Mockito.*; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.instance.DummyInstanceGateway; +import org.apache.flink.runtime.instance.DummyActorGateway; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -47,7 +47,7 @@ public class ExecutionVertexSchedulingTest { AkkaUtils.getDefaultTimeout()); // a slot than cannot be deployed to - final Instance instance = getInstance(DummyInstanceGateway.INSTANCE); + final Instance instance = getInstance(DummyActorGateway.INSTANCE); final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId()); slot.releaseSlot(); @@ -77,7 +77,7 @@ public class ExecutionVertexSchedulingTest { AkkaUtils.getDefaultTimeout()); // a slot than cannot be deployed to - final Instance instance = getInstance(DummyInstanceGateway.INSTANCE); + final Instance instance = getInstance(DummyActorGateway.INSTANCE); final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId()); slot.releaseSlot(); @@ -113,7 +113,7 @@ public class ExecutionVertexSchedulingTest { final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout()); - final Instance instance = getInstance(new ExecutionGraphTestUtils.SimpleInstanceGateway(TestingUtils.defaultExecutionContext())); + final Instance instance = getInstance(new ExecutionGraphTestUtils.SimpleActorGateway(TestingUtils.defaultExecutionContext())); final SimpleSlot slot = instance.allocateSimpleSlot(ejv.getJobId()); Scheduler scheduler = mock(Scheduler.class); http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java index b4a7e63..2530a53 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LocalInputSplitsTest.java @@ -377,7 +377,7 @@ public class LocalInputSplitsTest { when(connection.getFQDNHostname()).thenReturn(hostname); return new Instance( - new ExecutionGraphTestUtils.SimpleInstanceGateway( + new ExecutionGraphTestUtils.SimpleActorGateway( TestingUtils.defaultExecutionContext()), connection, new InstanceID(), http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java index b779d79..f42543f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java @@ -21,7 +21,7 @@ package org.apache.flink.runtime.executiongraph; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.instance.DummyInstanceGateway; +import org.apache.flink.runtime.instance.DummyActorGateway; import org.apache.flink.runtime.instance.HardwareDescription; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.InstanceConnectionInfo; @@ -78,7 +78,7 @@ public class TerminalStateDeadlockTest { InstanceConnectionInfo ci = new InstanceConnectionInfo(address, 12345); HardwareDescription resources = new HardwareDescription(4, 4000000, 3000000, 2000000); - Instance instance = new Instance(DummyInstanceGateway.INSTANCE, ci, new InstanceID(), resources, 4); + Instance instance = new Instance(DummyActorGateway.INSTANCE, ci, new InstanceID(), resources, 4); this.resource = instance.allocateSimpleSlot(new JobID()); } http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java index 3305254..8604b63 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java @@ -26,7 +26,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.concurrent.TimeUnit; -import org.apache.flink.runtime.instance.DummyInstanceGateway; +import org.apache.flink.runtime.instance.DummyActorGateway; import org.apache.flink.runtime.instance.HardwareDescription; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.InstanceConnectionInfo; @@ -403,7 +403,7 @@ public class VertexLocationConstraintTest { ExecutionVertex ev = eg.getAllVertices().get(vertex.getID()).getTaskVertices()[0]; - Instance instance = ExecutionGraphTestUtils.getInstance(DummyInstanceGateway.INSTANCE); + Instance instance = ExecutionGraphTestUtils.getInstance(DummyActorGateway.INSTANCE); ev.setLocationConstraintHosts(Collections.singletonList(instance)); assertNotNull(ev.getPreferredLocations()); @@ -435,7 +435,7 @@ public class VertexLocationConstraintTest { when(connection.getFQDNHostname()).thenReturn(hostname); return new Instance( - new ExecutionGraphTestUtils.SimpleInstanceGateway( + new ExecutionGraphTestUtils.SimpleActorGateway( TestingUtils.defaultExecutionContext()), connection, new InstanceID(), http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingActorGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingActorGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingActorGateway.java new file mode 100644 index 0000000..2e62781 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingActorGateway.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.instance; + +import akka.actor.ActorRef; +import akka.dispatch.Futures; +import scala.Option; +import scala.concurrent.ExecutionContext; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import java.util.UUID; +import java.util.concurrent.Callable; + +/** + * Abstract base class for testing {@link ActorGateway} instances. The implementing subclass + * only has to provide an implementation for handleMessage which contains the logic to treat + * different messages. + */ +abstract public class BaseTestingActorGateway implements ActorGateway { + /** + * {@link ExecutionContext} which is used to execute the futures. + */ + private final ExecutionContext executionContext; + + public BaseTestingActorGateway(ExecutionContext executionContext) { + this.executionContext = executionContext; + } + + @Override + public Future<Object> ask(Object message, FiniteDuration timeout) { + try { + final Object result = handleMessage(message); + + return Futures.future(new Callable<Object>() { + @Override + public Object call() throws Exception { + return result; + } + }, executionContext); + + } catch (final Exception e) { + // if an exception occurred in the handleMessage method then return it as part of the future + return Futures.future(new Callable<Object>() { + @Override + public Object call() throws Exception { + throw e; + } + }, executionContext); + } + } + + /** + * Handles the supported messages by this InstanceGateway + * + * @param message Message to handle + * @return Result + * @throws Exception + */ + abstract public Object handleMessage(Object message) throws Exception; + + @Override + public void tell(Object message) { + try { + handleMessage(message); + } catch (Exception e) { + // discard exception because it happens on the "remote" instance + } + } + + @Override + public void tell(Object message, ActorGateway sender) { + try{ + handleMessage(message); + } catch (Exception e) { + // discard exception because it happens on the "remote" instance + } + } + + @Override + public void forward(Object message, ActorGateway sender) { + throw new UnsupportedOperationException(); + } + + @Override + public Future<Object> retry(Object message, int numberRetries, FiniteDuration timeout, ExecutionContext executionContext) { + return ask(message, timeout); + } + + @Override + public String path() { + return "BaseTestingInstanceGateway"; + } + + @Override + public ActorRef actor() { + return ActorRef.noSender(); + } + + @Override + public Option<UUID> leaderSessionID() { + return Option.empty(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingInstanceGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingInstanceGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingInstanceGateway.java deleted file mode 100644 index e9f8259..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/BaseTestingInstanceGateway.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.instance; - -import akka.actor.ActorPath; -import akka.actor.ActorRef; -import akka.dispatch.Futures; -import scala.concurrent.ExecutionContext; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; - -import java.util.concurrent.Callable; - -/** - * Abstract base class for testing {@link InstanceGateway} instances. The implementing subclass - * only has to provide an implementation for handleMessage which contains the logic to treat - * different messages. - */ -abstract public class BaseTestingInstanceGateway implements InstanceGateway { - /** - * {@link ExecutionContext} which is used to execute the futures. - */ - private final ExecutionContext executionContext; - - public BaseTestingInstanceGateway(ExecutionContext executionContext) { - this.executionContext = executionContext; - } - - @Override - public Future<Object> ask(Object message, FiniteDuration timeout) { - try { - final Object result = handleMessage(message); - - return Futures.future(new Callable<Object>() { - @Override - public Object call() throws Exception { - return result; - } - }, executionContext); - - } catch (final Exception e) { - // if an exception occurred in the handleMessage method then return it as part of the future - return Futures.future(new Callable<Object>() { - @Override - public Object call() throws Exception { - throw e; - } - }, executionContext); - } - } - - /** - * Handles the supported messages by this InstanceGateway - * - * @param message Message to handle - * @return Result - * @throws Exception - */ - abstract public Object handleMessage(Object message) throws Exception; - - @Override - public void tell(Object message) {} - - @Override - public void forward(Object message, ActorRef sender) { - throw new UnsupportedOperationException(); - } - - @Override - public Future<Object> retry(Object message, int numberRetries, FiniteDuration timeout, ExecutionContext executionContext) { - return ask(message, timeout); - } - - @Override - public String path() { - throw new UnsupportedOperationException(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyActorGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyActorGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyActorGateway.java new file mode 100644 index 0000000..10762f2 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyActorGateway.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.instance; + +import akka.actor.ActorRef; +import scala.Option; +import scala.concurrent.ExecutionContext; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import java.util.UUID; + +/** + * Dummy {@link ActorGateway} implementation used for testing. + */ +public class DummyActorGateway implements ActorGateway { + public static final DummyActorGateway INSTANCE = new DummyActorGateway(); + + @Override + public Future<Object> ask(Object message, FiniteDuration timeout) { + return null; + } + + @Override + public void tell(Object message) {} + + @Override + public void tell(Object message, ActorGateway sender) {} + + @Override + public void forward(Object message, ActorGateway sender) {} + + @Override + public Future<Object> retry(Object message, int numberRetries, FiniteDuration timeout, ExecutionContext executionContext) { + return null; + } + + @Override + public String path() { + return "DummyInstanceGateway"; + } + + @Override + public ActorRef actor() { + return ActorRef.noSender(); + } + + @Override + public Option<UUID> leaderSessionID() { + return Option.<UUID>empty(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyInstanceGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyInstanceGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyInstanceGateway.java deleted file mode 100644 index 5941201..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/DummyInstanceGateway.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.instance; - -import akka.actor.ActorPath; -import akka.actor.ActorRef; -import scala.concurrent.ExecutionContext; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; - -/** - * Dummy {@link InstanceGateway} implementation used for testing. - */ -public class DummyInstanceGateway implements InstanceGateway { - public static final DummyInstanceGateway INSTANCE = new DummyInstanceGateway(); - - @Override - public Future<Object> ask(Object message, FiniteDuration timeout) { - throw new UnsupportedOperationException(); - } - - @Override - public void tell(Object message) { - throw new UnsupportedOperationException(); - } - - @Override - public void forward(Object message, ActorRef sender) { - throw new UnsupportedOperationException(); - } - - @Override - public Future<Object> retry(Object message, int numberRetries, FiniteDuration timeout, ExecutionContext executionContext) { - throw new UnsupportedOperationException(); - } - - @Override - public String path() { - return "DummyInstanceGateway"; - } -}