Repository: flink Updated Branches: refs/heads/master fa78be6df -> 222245428
http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala index 3fb5e30..e99f8d2 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala @@ -25,7 +25,7 @@ import java.util.Collections import akka.actor.ActorRef import org.apache.flink.api.common.JobID import org.apache.flink.configuration.ConfigConstants -import org.apache.flink.runtime.ActorLogMessages +import org.apache.flink.runtime.FlinkActor import org.apache.flink.runtime.jobgraph.JobStatus import org.apache.flink.runtime.jobmanager.JobManager import org.apache.flink.runtime.messages.JobManagerMessages.{CurrentJobStatus, JobNotFound, RequestJobStatus} @@ -52,7 +52,7 @@ import scala.language.postfixOps import scala.util.Try -trait ApplicationMasterActor extends ActorLogMessages { +trait ApplicationMasterActor extends FlinkActor { that: JobManager => import context._ @@ -92,18 +92,17 @@ trait ApplicationMasterActor extends ActorLogMessages { var allocatedContainersList: mutable.MutableList[Container] = new mutable.MutableList[Container] var runningContainersList: mutable.MutableList[Container] = new mutable.MutableList[Container] - - abstract override def receiveWithLogMessages: Receive = { - receiveYarnMessages orElse super.receiveWithLogMessages + abstract override def handleMessage: Receive = { + handleYarnMessage orElse super.handleMessage } - def receiveYarnMessages: Receive = { + def handleYarnMessage: Receive = { case StopYarnSession(status, diag) => log.info(s"Stopping YARN JobManager with status $status and diagnostic $diag.") instanceManager.getAllRegisteredInstances.asScala foreach { instance => - instance.getInstanceGateway.tell(StopYarnSession(status, diag)) + instance.getActorGateway.tell(StopYarnSession(status, diag)) } rmClientOption foreach { @@ -128,7 +127,7 @@ trait ApplicationMasterActor extends ActorLogMessages { nmClientOption = None messageListener foreach { - _ ! JobManagerStopped + _ ! decorateMessage(JobManagerStopped) } context.system.shutdown() @@ -136,7 +135,7 @@ trait ApplicationMasterActor extends ActorLogMessages { case RegisterClient(client) => log.info(s"Register ${client.path} as client.") messageListener = Some(client) - sender ! Acknowledge + sender ! decorateMessage(Acknowledge) case UnregisterClient => messageListener = None @@ -145,12 +144,15 @@ trait ApplicationMasterActor extends ActorLogMessages { val jobId = msg.jobId log.info(s"ApplicatonMaster will shut down YARN session when job $jobId has finished.") stopWhenJobFinished = jobId - sender() ! Acknowledge + sender() ! decorateMessage(Acknowledge) case PollYarnClusterStatus => - sender() ! new FlinkYarnClusterStatus(instanceManager.getNumberOfRegisteredTaskManagers, - instanceManager.getTotalNumberOfSlots) + sender() ! decorateMessage( + new FlinkYarnClusterStatus( + instanceManager.getNumberOfRegisteredTaskManagers, + instanceManager.getTotalNumberOfSlots) + ) case StartYarnSession(conf, actorSystemPort, webServerPort) => startYarnSession(conf, actorSystemPort, webServerPort) @@ -173,11 +175,17 @@ trait ApplicationMasterActor extends ActorLogMessages { log.info(s"Job with ID ${jobStatus.jobID} is in terminal state ${jobStatus.status}. " + s"Shutting down YARN session") if (jobStatus.status == JobStatus.FINISHED) { - self ! StopYarnSession(FinalApplicationStatus.SUCCEEDED, - s"The monitored job with ID ${jobStatus.jobID} has finished.") + self ! decorateMessage( + StopYarnSession( + FinalApplicationStatus.SUCCEEDED, + s"The monitored job with ID ${jobStatus.jobID} has finished.") + ) } else { - self ! StopYarnSession(FinalApplicationStatus.FAILED, - s"The monitored job with ID ${jobStatus.jobID} has failed to complete.") + self ! decorateMessage( + StopYarnSession( + FinalApplicationStatus.FAILED, + s"The monitored job with ID ${jobStatus.jobID} has failed to complete.") + ) } } else { log.debug(s"Monitored job with ID ${jobStatus.jobID} is in state ${jobStatus.status}") @@ -188,7 +196,7 @@ trait ApplicationMasterActor extends ActorLogMessages { case HeartbeatWithYarn => // piggyback on the YARN heartbeat to check if the job has finished if(stopWhenJobFinished != null) { - self ! RequestJobStatus(stopWhenJobFinished) + self ! decorateMessage(RequestJobStatus(stopWhenJobFinished)) } rmClientOption match { case Some(rmClient) => @@ -226,8 +234,10 @@ trait ApplicationMasterActor extends ActorLogMessages { case _ => "" } messageListener foreach { - _ ! YarnMessage(s"Diagnostics for containerID=${status.getContainerId} in " + + _ ! decorateMessage( + YarnMessage(s"Diagnostics for containerID=${status.getContainerId} in " + s"state=${status.getState}.\n${status.getDiagnostics} $detail") + ) } } // return @@ -281,7 +291,7 @@ trait ApplicationMasterActor extends ActorLogMessages { containersLaunched += 1 runningContainersList += container messageListener foreach { - _ ! YarnMessage(message) + _ ! decorateMessage(YarnMessage(message)) } } catch { case e: YarnException => @@ -290,13 +300,19 @@ trait ApplicationMasterActor extends ActorLogMessages { } case None => log.error("The ContainerLaunchContext was not set.") - self ! StopYarnSession(FinalApplicationStatus.FAILED, - "Fatal error in AM: The ContainerLaunchContext was not set.") + self ! decorateMessage( + StopYarnSession( + FinalApplicationStatus.FAILED, + "Fatal error in AM: The ContainerLaunchContext was not set.") + ) } case None => log.error("The NMClient was not set.") - self ! StopYarnSession(FinalApplicationStatus.FAILED, - "Fatal error in AM: The NMClient was not set.") + self ! decorateMessage( + StopYarnSession( + FinalApplicationStatus.FAILED, + "Fatal error in AM: The NMClient was not set.") + ) } // dropping condition true @@ -343,21 +359,30 @@ trait ApplicationMasterActor extends ActorLogMessages { s"the '${ConfigConstants.YARN_MAX_FAILED_CONTAINERS}' configuration " + s"setting. By default its the number of requested containers" log.error(msg) - self ! StopYarnSession(FinalApplicationStatus.FAILED, msg) + self ! decorateMessage(StopYarnSession(FinalApplicationStatus.FAILED, msg)) } // schedule next heartbeat: if (runningContainers < numTaskManager) { // we don't have the requested number of containers. Do fast polling - context.system.scheduler.scheduleOnce(FAST_YARN_HEARTBEAT_DELAY, self,HeartbeatWithYarn) + context.system.scheduler.scheduleOnce( + FAST_YARN_HEARTBEAT_DELAY, + self, + decorateMessage(HeartbeatWithYarn)) } else { // everything is good, slow down polling - context.system.scheduler.scheduleOnce(YARN_HEARTBEAT_DELAY, self, HeartbeatWithYarn) + context.system.scheduler.scheduleOnce( + YARN_HEARTBEAT_DELAY, + self, + decorateMessage(HeartbeatWithYarn)) } case None => log.error("The AMRMClient was not set.") - self ! StopYarnSession(FinalApplicationStatus.FAILED, "Fatal error in AM: AMRMClient " + - "was not set") + self ! decorateMessage( + StopYarnSession( + FinalApplicationStatus.FAILED, + "Fatal error in AM: AMRMClient was not set") + ) } log.debug(s"Processed Heartbeat with RMClient. Running containers $runningContainers, " + s"failed containers $failedContainers, " + @@ -371,9 +396,12 @@ trait ApplicationMasterActor extends ActorLogMessages { allocatedContainersList map { runningCont => runningCont.getId} } - private def startYarnSession(conf: Configuration, - actorSystemPort: Int, - webServerPort: Int): Unit = { + private def startYarnSession( + conf: Configuration, + actorSystemPort: Int, + webServerPort: Int) + : Unit = { + Try { log.info("Start yarn session.") memoryPerTaskManager = env.get(FlinkYarnClient.ENV_TM_MEMORY).toInt @@ -383,8 +411,10 @@ trait ApplicationMasterActor extends ActorLogMessages { require(applicationMasterHost != null, s"Application master (${Environment.NM_HOST} not set.") val yarnExpiryInterval: FiniteDuration = FiniteDuration( - conf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, - YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS), MILLISECONDS) + conf.getInt( + YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, + YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS), + MILLISECONDS) if(YARN_HEARTBEAT_DELAY.gteq(yarnExpiryInterval)) { log.warn(s"The heartbeat interval of the Flink Application master " + @@ -468,16 +498,30 @@ trait ApplicationMasterActor extends ActorLogMessages { failedContainers = 0 val hs = ApplicationMaster.hasStreamingMode(env) - containerLaunchContext = Some(createContainerLaunchContext(heapLimit, hasLogback, hasLog4j, - yarnClientUsername, conf, taskManagerLocalResources, hs)) - - - context.system.scheduler.scheduleOnce(FAST_YARN_HEARTBEAT_DELAY, self, HeartbeatWithYarn) + containerLaunchContext = Some( + createContainerLaunchContext( + heapLimit, + hasLogback, + hasLog4j, + yarnClientUsername, + conf, + taskManagerLocalResources, + hs) + ) + + + context.system.scheduler.scheduleOnce( + FAST_YARN_HEARTBEAT_DELAY, + self, + decorateMessage(HeartbeatWithYarn)) } recover { case t: Throwable => log.error("Could not start yarn session.", t) - self ! StopYarnSession(FinalApplicationStatus.FAILED, - s"ApplicationMaster failed while starting. Exception Message: ${t.getMessage}") + self ! decorateMessage( + StopYarnSession( + FinalApplicationStatus.FAILED, + s"ApplicationMaster failed while starting. Exception Message: ${t.getMessage}") + ) } } @@ -505,11 +549,15 @@ trait ApplicationMasterActor extends ActorLogMessages { new ContainerRequest(capability, null, null, priority) } - private def createContainerLaunchContext(heapLimit: Int, hasLogback: Boolean, hasLog4j: Boolean, - yarnClientUsername: String, yarnConf: Configuration, - taskManagerLocalResources: Map[String, LocalResource], - streamingMode: Boolean): - ContainerLaunchContext = { + private def createContainerLaunchContext( + heapLimit: Int, + hasLogback: Boolean, + hasLog4j: Boolean, + yarnClientUsername: String, + yarnConf: Configuration, + taskManagerLocalResources: Map[String, LocalResource], + streamingMode: Boolean) + : ContainerLaunchContext = { log.info("Create container launch context.") val ctx = Records.newRecord(classOf[ContainerLaunchContext]) http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala index de82716..5216030 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala @@ -25,26 +25,31 @@ import org.apache.flink.runtime.memorymanager.DefaultMemoryManager import org.apache.flink.runtime.taskmanager.{NetworkEnvironmentConfiguration, TaskManagerConfiguration, TaskManager} import org.apache.flink.yarn.Messages.StopYarnSession -/** - * An extension of the TaskManager that listens for additional YARN related - * messages. - */ -class YarnTaskManager(config: TaskManagerConfiguration, - connectionInfo: InstanceConnectionInfo, - jobManagerAkkaURL: String, - memoryManager: DefaultMemoryManager, - ioManager: IOManager, - network: NetworkEnvironment, - numberOfSlots: Int) - extends TaskManager(config, connectionInfo, jobManagerAkkaURL, - memoryManager, ioManager, network, numberOfSlots) { - +/** An extension of the TaskManager that listens for additional YARN related + * messages. + */ +class YarnTaskManager( + config: TaskManagerConfiguration, + connectionInfo: InstanceConnectionInfo, + jobManagerAkkaURL: String, + memoryManager: DefaultMemoryManager, + ioManager: IOManager, + network: NetworkEnvironment, + numberOfSlots: Int) + extends TaskManager( + config, + connectionInfo, + jobManagerAkkaURL, + memoryManager, + ioManager, + network, + numberOfSlots) { - override def receiveWithLogMessages: Receive = { - receiveYarnMessages orElse super.receiveWithLogMessages + override def handleMessage: Receive = { + handleYarnMessages orElse super.handleMessage } - def receiveYarnMessages: Receive = { + def handleYarnMessages: Receive = { case StopYarnSession(status, diagnostics) => log.info(s"Stopping YARN TaskManager with final application status $status " + s"and diagnostics: $diagnostics")