Repository: flink
Updated Branches:
  refs/heads/master fa78be6df -> 222245428


http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
index 3fb5e30..e99f8d2 100644
--- 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
+++ 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
@@ -25,7 +25,7 @@ import java.util.Collections
 import akka.actor.ActorRef
 import org.apache.flink.api.common.JobID
 import org.apache.flink.configuration.ConfigConstants
-import org.apache.flink.runtime.ActorLogMessages
+import org.apache.flink.runtime.FlinkActor
 import org.apache.flink.runtime.jobgraph.JobStatus
 import org.apache.flink.runtime.jobmanager.JobManager
 import org.apache.flink.runtime.messages.JobManagerMessages.{CurrentJobStatus, 
JobNotFound, RequestJobStatus}
@@ -52,7 +52,7 @@ import scala.language.postfixOps
 import scala.util.Try
 
 
-trait ApplicationMasterActor extends ActorLogMessages {
+trait ApplicationMasterActor extends FlinkActor {
   that: JobManager =>
 
   import context._
@@ -92,18 +92,17 @@ trait ApplicationMasterActor extends ActorLogMessages {
   var allocatedContainersList: mutable.MutableList[Container] = new 
mutable.MutableList[Container]
   var runningContainersList: mutable.MutableList[Container] = new 
mutable.MutableList[Container]
 
-
-  abstract override def receiveWithLogMessages: Receive = {
-    receiveYarnMessages orElse super.receiveWithLogMessages
+  abstract override def handleMessage: Receive = {
+    handleYarnMessage orElse super.handleMessage
   }
 
-  def receiveYarnMessages: Receive = {
+  def handleYarnMessage: Receive = {
     case StopYarnSession(status, diag) =>
       log.info(s"Stopping YARN JobManager with status $status and diagnostic 
$diag.")
 
       instanceManager.getAllRegisteredInstances.asScala foreach {
         instance =>
-          instance.getInstanceGateway.tell(StopYarnSession(status, diag))
+          instance.getActorGateway.tell(StopYarnSession(status, diag))
       }
 
       rmClientOption foreach {
@@ -128,7 +127,7 @@ trait ApplicationMasterActor extends ActorLogMessages {
 
       nmClientOption = None
       messageListener foreach {
-          _ ! JobManagerStopped
+          _ ! decorateMessage(JobManagerStopped)
       }
 
       context.system.shutdown()
@@ -136,7 +135,7 @@ trait ApplicationMasterActor extends ActorLogMessages {
     case RegisterClient(client) =>
       log.info(s"Register ${client.path} as client.")
       messageListener = Some(client)
-      sender ! Acknowledge
+      sender ! decorateMessage(Acknowledge)
 
     case UnregisterClient =>
       messageListener = None
@@ -145,12 +144,15 @@ trait ApplicationMasterActor extends ActorLogMessages {
       val jobId = msg.jobId
       log.info(s"ApplicatonMaster will shut down YARN session when job $jobId 
has finished.")
       stopWhenJobFinished = jobId
-      sender() ! Acknowledge
+      sender() ! decorateMessage(Acknowledge)
 
 
     case PollYarnClusterStatus =>
-      sender() ! new 
FlinkYarnClusterStatus(instanceManager.getNumberOfRegisteredTaskManagers,
-        instanceManager.getTotalNumberOfSlots)
+      sender() ! decorateMessage(
+        new FlinkYarnClusterStatus(
+          instanceManager.getNumberOfRegisteredTaskManagers,
+          instanceManager.getTotalNumberOfSlots)
+      )
 
     case StartYarnSession(conf, actorSystemPort, webServerPort) =>
       startYarnSession(conf, actorSystemPort, webServerPort)
@@ -173,11 +175,17 @@ trait ApplicationMasterActor extends ActorLogMessages {
             log.info(s"Job with ID ${jobStatus.jobID} is in terminal state 
${jobStatus.status}. " +
               s"Shutting down YARN session")
             if (jobStatus.status == JobStatus.FINISHED) {
-              self ! StopYarnSession(FinalApplicationStatus.SUCCEEDED,
-                s"The monitored job with ID ${jobStatus.jobID} has finished.")
+              self ! decorateMessage(
+                StopYarnSession(
+                  FinalApplicationStatus.SUCCEEDED,
+                  s"The monitored job with ID ${jobStatus.jobID} has 
finished.")
+              )
             } else {
-              self ! StopYarnSession(FinalApplicationStatus.FAILED,
-                s"The monitored job with ID ${jobStatus.jobID} has failed to 
complete.")
+              self ! decorateMessage(
+                StopYarnSession(
+                  FinalApplicationStatus.FAILED,
+                  s"The monitored job with ID ${jobStatus.jobID} has failed to 
complete.")
+              )
             }
           } else {
             log.debug(s"Monitored job with ID ${jobStatus.jobID} is in state 
${jobStatus.status}")
@@ -188,7 +196,7 @@ trait ApplicationMasterActor extends ActorLogMessages {
     case HeartbeatWithYarn =>
       // piggyback on the YARN heartbeat to check if the job has finished
       if(stopWhenJobFinished != null) {
-        self ! RequestJobStatus(stopWhenJobFinished)
+        self ! decorateMessage(RequestJobStatus(stopWhenJobFinished))
       }
       rmClientOption match {
         case Some(rmClient) =>
@@ -226,8 +234,10 @@ trait ApplicationMasterActor extends ActorLogMessages {
                   case _ => ""
                 }
                 messageListener foreach {
-                  _ ! YarnMessage(s"Diagnostics for 
containerID=${status.getContainerId} in " +
+                  _ ! decorateMessage(
+                    YarnMessage(s"Diagnostics for 
containerID=${status.getContainerId} in " +
                     s"state=${status.getState}.\n${status.getDiagnostics} 
$detail")
+                  )
                 }
               }
               // return
@@ -281,7 +291,7 @@ trait ApplicationMasterActor extends ActorLogMessages {
                             containersLaunched += 1
                             runningContainersList += container
                             messageListener foreach {
-                              _ ! YarnMessage(message)
+                              _ ! decorateMessage(YarnMessage(message))
                             }
                           } catch {
                             case e: YarnException =>
@@ -290,13 +300,19 @@ trait ApplicationMasterActor extends ActorLogMessages {
                         }
                         case None =>
                           log.error("The ContainerLaunchContext was not set.")
-                          self ! StopYarnSession(FinalApplicationStatus.FAILED,
-                            "Fatal error in AM: The ContainerLaunchContext was 
not set.")
+                          self ! decorateMessage(
+                            StopYarnSession(
+                              FinalApplicationStatus.FAILED,
+                              "Fatal error in AM: The ContainerLaunchContext 
was not set.")
+                          )
                       }
                     case None =>
                       log.error("The NMClient was not set.")
-                      self ! StopYarnSession(FinalApplicationStatus.FAILED,
-                        "Fatal error in AM: The NMClient was not set.")
+                      self ! decorateMessage(
+                        StopYarnSession(
+                          FinalApplicationStatus.FAILED,
+                          "Fatal error in AM: The NMClient was not set.")
+                      )
                   }
                   // dropping condition
                   true
@@ -343,21 +359,30 @@ trait ApplicationMasterActor extends ActorLogMessages {
               s"the '${ConfigConstants.YARN_MAX_FAILED_CONTAINERS}' 
configuration " +
               s"setting. By default its the number of requested containers"
             log.error(msg)
-            self ! StopYarnSession(FinalApplicationStatus.FAILED, msg)
+            self ! 
decorateMessage(StopYarnSession(FinalApplicationStatus.FAILED, msg))
           }
 
           // schedule next heartbeat:
           if (runningContainers < numTaskManager) {
             // we don't have the requested number of containers. Do fast 
polling
-            context.system.scheduler.scheduleOnce(FAST_YARN_HEARTBEAT_DELAY, 
self,HeartbeatWithYarn)
+            context.system.scheduler.scheduleOnce(
+              FAST_YARN_HEARTBEAT_DELAY,
+              self,
+              decorateMessage(HeartbeatWithYarn))
           } else {
             // everything is good, slow down polling
-            context.system.scheduler.scheduleOnce(YARN_HEARTBEAT_DELAY, self, 
HeartbeatWithYarn)
+            context.system.scheduler.scheduleOnce(
+              YARN_HEARTBEAT_DELAY,
+              self,
+              decorateMessage(HeartbeatWithYarn))
           }
         case None =>
           log.error("The AMRMClient was not set.")
-          self ! StopYarnSession(FinalApplicationStatus.FAILED, "Fatal error 
in AM: AMRMClient " +
-            "was not set")
+          self ! decorateMessage(
+            StopYarnSession(
+              FinalApplicationStatus.FAILED,
+              "Fatal error in AM: AMRMClient was not set")
+          )
       }
       log.debug(s"Processed Heartbeat with RMClient. Running containers 
$runningContainers, " +
         s"failed containers $failedContainers, " +
@@ -371,9 +396,12 @@ trait ApplicationMasterActor extends ActorLogMessages {
     allocatedContainersList map { runningCont => runningCont.getId}
   }
 
-  private def startYarnSession(conf: Configuration,
-                               actorSystemPort: Int,
-                               webServerPort: Int): Unit = {
+  private def startYarnSession(
+      conf: Configuration,
+      actorSystemPort: Int,
+      webServerPort: Int)
+    : Unit = {
+
     Try {
       log.info("Start yarn session.")
       memoryPerTaskManager = env.get(FlinkYarnClient.ENV_TM_MEMORY).toInt
@@ -383,8 +411,10 @@ trait ApplicationMasterActor extends ActorLogMessages {
       require(applicationMasterHost != null, s"Application master 
(${Environment.NM_HOST} not set.")
 
       val yarnExpiryInterval: FiniteDuration = FiniteDuration(
-        conf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
-          YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS), MILLISECONDS)
+        conf.getInt(
+          YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
+          YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS),
+        MILLISECONDS)
 
       if(YARN_HEARTBEAT_DELAY.gteq(yarnExpiryInterval)) {
         log.warn(s"The heartbeat interval of the Flink Application master " +
@@ -468,16 +498,30 @@ trait ApplicationMasterActor extends ActorLogMessages {
       failedContainers = 0
 
       val hs = ApplicationMaster.hasStreamingMode(env)
-      containerLaunchContext = Some(createContainerLaunchContext(heapLimit, 
hasLogback, hasLog4j,
-        yarnClientUsername, conf, taskManagerLocalResources, hs))
-
-
-      context.system.scheduler.scheduleOnce(FAST_YARN_HEARTBEAT_DELAY, self, 
HeartbeatWithYarn)
+      containerLaunchContext = Some(
+        createContainerLaunchContext(
+          heapLimit,
+          hasLogback,
+          hasLog4j,
+          yarnClientUsername,
+          conf,
+          taskManagerLocalResources,
+          hs)
+      )
+
+
+      context.system.scheduler.scheduleOnce(
+        FAST_YARN_HEARTBEAT_DELAY,
+        self,
+        decorateMessage(HeartbeatWithYarn))
     } recover {
       case t: Throwable =>
         log.error("Could not start yarn session.", t)
-        self ! StopYarnSession(FinalApplicationStatus.FAILED,
-          s"ApplicationMaster failed while starting. Exception Message: 
${t.getMessage}")
+        self ! decorateMessage(
+          StopYarnSession(
+            FinalApplicationStatus.FAILED,
+            s"ApplicationMaster failed while starting. Exception Message: 
${t.getMessage}")
+        )
     }
   }
 
@@ -505,11 +549,15 @@ trait ApplicationMasterActor extends ActorLogMessages {
     new ContainerRequest(capability, null, null, priority)
   }
 
-  private def createContainerLaunchContext(heapLimit: Int, hasLogback: 
Boolean, hasLog4j: Boolean,
-                                   yarnClientUsername: String, yarnConf: 
Configuration,
-                                   taskManagerLocalResources: Map[String, 
LocalResource],
-                                   streamingMode: Boolean):
-  ContainerLaunchContext = {
+  private def createContainerLaunchContext(
+      heapLimit: Int,
+      hasLogback: Boolean,
+      hasLog4j: Boolean,
+      yarnClientUsername: String,
+      yarnConf: Configuration,
+      taskManagerLocalResources: Map[String, LocalResource],
+      streamingMode: Boolean)
+    : ContainerLaunchContext = {
     log.info("Create container launch context.")
     val ctx = Records.newRecord(classOf[ContainerLaunchContext])
 

http://git-wip-us.apache.org/repos/asf/flink/blob/22224542/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
index de82716..5216030 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
@@ -25,26 +25,31 @@ import 
org.apache.flink.runtime.memorymanager.DefaultMemoryManager
 import org.apache.flink.runtime.taskmanager.{NetworkEnvironmentConfiguration, 
TaskManagerConfiguration, TaskManager}
 import org.apache.flink.yarn.Messages.StopYarnSession
 
-/**
- * An extension of the TaskManager that listens for additional YARN related
- * messages.
- */
-class YarnTaskManager(config: TaskManagerConfiguration,
-                      connectionInfo: InstanceConnectionInfo,
-                      jobManagerAkkaURL: String,
-                      memoryManager: DefaultMemoryManager,
-                      ioManager: IOManager,
-                      network: NetworkEnvironment,
-                      numberOfSlots: Int)
-  extends TaskManager(config, connectionInfo, jobManagerAkkaURL,
-                      memoryManager, ioManager, network, numberOfSlots) {
-
+/** An extension of the TaskManager that listens for additional YARN related
+  * messages.
+  */
+class YarnTaskManager(
+    config: TaskManagerConfiguration,
+    connectionInfo: InstanceConnectionInfo,
+    jobManagerAkkaURL: String,
+    memoryManager: DefaultMemoryManager,
+    ioManager: IOManager,
+    network: NetworkEnvironment,
+    numberOfSlots: Int)
+  extends TaskManager(
+    config,
+    connectionInfo,
+    jobManagerAkkaURL,
+    memoryManager,
+    ioManager,
+    network,
+    numberOfSlots) {
 
-  override def receiveWithLogMessages: Receive = {
-    receiveYarnMessages orElse super.receiveWithLogMessages
+  override def handleMessage: Receive = {
+    handleYarnMessages orElse super.handleMessage
   }
 
-  def receiveYarnMessages: Receive = {
+  def handleYarnMessages: Receive = {
     case StopYarnSession(status, diagnostics) =>
       log.info(s"Stopping YARN TaskManager with final application status 
$status " +
         s"and diagnostics: $diagnostics")

Reply via email to