[ 
https://issues.apache.org/jira/browse/FLINK-3134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15057656#comment-15057656
 ] 

ASF GitHub Bot commented on FLINK-3134:
---------------------------------------

Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1450#discussion_r47611363
  
    --- Diff: 
flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala ---
    @@ -754,6 +538,243 @@ class YarnJobManager(
           memoryLimit
         }
       }
    +
    +  /**
    +    * Heartbeats with the resource manager and handles container updates.
    +    */
    +  object AMRMClientAsyncHandler extends AMRMClientAsync.CallbackHandler {
    +
    +    private var client : AMRMClientAsync[ContainerRequest] = null
    +
    +    override def onError(e: Throwable): Unit = {
    +      self ! decorateMessage(
    +        StopYarnSession(
    +          FinalApplicationStatus.FAILED,
    +          "Error in communication with Yarn resource manager: " + 
e.getMessage)
    +      )
    +    }
    +
    +    override def getProgress: Float = {
    +      runningContainers.toFloat / numTaskManagers
    +    }
    +
    +    override def onShutdownRequest(): Unit = {
    +    }
    +
    +    override def onNodesUpdated(updatedNodes: JavaList[NodeReport]): Unit 
= {
    +    }
    +
    +    override def onContainersCompleted(statuses: 
JavaList[ContainerStatus]): Unit = {
    +
    +      // TODO change this
    +      log.debug(s"Processed Heartbeat with RMClient. Running containers 
$runningContainers, " +
    +        s"failed containers $failedContainers, " +
    +        s"allocated containers ${allocatedContainersList.size}.")
    +
    +      val completedContainerStatuses = statuses.asScala
    +      val idStatusMap = completedContainerStatuses
    +        .map(status => (status.getContainerId, status)).toMap
    +
    +      completedContainerStatuses.foreach {
    +        status => log.info(s"Container ${status.getContainerId} is 
completed " +
    +          s"with diagnostics: ${status.getDiagnostics}")
    +      }
    +
    +      // get failed containers (returned containers are also completed, so 
we have to
    +      // distinguish if it was running before).
    +      val (completedContainers, remainingRunningContainers) = 
runningContainersList
    +        .partition(idStatusMap contains _.getId)
    +
    +      completedContainers.foreach {
    +        container =>
    +          val status = idStatusMap(container.getId)
    +          failedContainers += 1
    +          runningContainers -= 1
    +          log.info(s"Container ${status.getContainerId} was a running 
container. " +
    +            s"Total failed containers $failedContainers.")
    +          val detail = status.getExitStatus match {
    +            case -103 => "Vmem limit exceeded";
    +            case -104 => "Pmem limit exceeded";
    +            case _ => ""
    +          }
    +          messageListener foreach {
    +            _ ! decorateMessage(
    +              YarnMessage(s"Diagnostics for 
containerID=${status.getContainerId} in " +
    +                s"state=${status.getState}.\n${status.getDiagnostics} 
$detail")
    +            )
    +          }
    +      }
    +
    +      runningContainersList = remainingRunningContainers
    +
    +      // maxFailedContainers == -1 is infinite number of retries.
    +      if (maxFailedContainers != -1 && failedContainers >= 
maxFailedContainers) {
    +        val msg = s"Stopping YARN session because the number of failed " +
    +          s"containers ($failedContainers) exceeded the maximum failed 
container " +
    +          s"count ($maxFailedContainers). This number is controlled by " +
    +          s"the '${ConfigConstants.YARN_MAX_FAILED_CONTAINERS}' 
configuration " +
    +          s"setting. By default its the number of requested containers"
    +        log.error(msg)
    +        self ! 
decorateMessage(StopYarnSession(FinalApplicationStatus.FAILED, msg))
    +
    +      }
    +
    +      allocateContainers()
    +
    +    }
    +
    +    override def onContainersAllocated(containers: JavaList[Container]): 
Unit = {
    +      println("onContainersAllocated")
    --- End diff --
    
    Looks like a leftover debug message


> Make YarnJobManager's allocate call asynchronous
> ------------------------------------------------
>
>                 Key: FLINK-3134
>                 URL: https://issues.apache.org/jira/browse/FLINK-3134
>             Project: Flink
>          Issue Type: Bug
>          Components: YARN Client
>    Affects Versions: 0.10.0, 1.0.0, 0.10.1
>            Reporter: Maximilian Michels
>            Assignee: Maximilian Michels
>             Fix For: 1.0.0
>
>
> The {{allocate()}} call is used in the {{YarnJobManager}} to send a heartbeat 
> to the YARN resource manager. This call may block the JobManager actor system 
> for arbitrary time, e.g. if retry handlers are set up within the call to 
> allocate.
> I propose to use the {{AMRMClientAsync}}'s callback methods to send 
> heartbeats and update the container information. The API is available for our 
> supported Hadoop versions (2.3.0 and above).
> https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to