Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1450#discussion_r47632315
  
    --- Diff: 
flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala ---
    @@ -754,6 +538,243 @@ class YarnJobManager(
           memoryLimit
         }
       }
    +
    +  /**
    +    * Heartbeats with the resource manager and handles container updates.
    +    */
    +  object AMRMClientAsyncHandler extends AMRMClientAsync.CallbackHandler {
    +
    +    private var client : AMRMClientAsync[ContainerRequest] = null
    +
    +    override def onError(e: Throwable): Unit = {
    +      self ! decorateMessage(
    +        StopYarnSession(
    +          FinalApplicationStatus.FAILED,
    +          "Error in communication with Yarn resource manager: " + 
e.getMessage)
    +      )
    +    }
    +
    +    override def getProgress: Float = {
    +      runningContainers.toFloat / numTaskManagers
    +    }
    +
    +    override def onShutdownRequest(): Unit = {
    +    }
    +
    +    override def onNodesUpdated(updatedNodes: JavaList[NodeReport]): Unit 
= {
    +    }
    +
    +    override def onContainersCompleted(statuses: 
JavaList[ContainerStatus]): Unit = {
    +
    +      // TODO change this
    +      log.debug(s"Processed Heartbeat with RMClient. Running containers 
$runningContainers, " +
    +        s"failed containers $failedContainers, " +
    +        s"allocated containers ${allocatedContainersList.size}.")
    +
    +      val completedContainerStatuses = statuses.asScala
    +      val idStatusMap = completedContainerStatuses
    +        .map(status => (status.getContainerId, status)).toMap
    +
    +      completedContainerStatuses.foreach {
    +        status => log.info(s"Container ${status.getContainerId} is 
completed " +
    +          s"with diagnostics: ${status.getDiagnostics}")
    +      }
    +
    +      // get failed containers (returned containers are also completed, so 
we have to
    +      // distinguish if it was running before).
    +      val (completedContainers, remainingRunningContainers) = 
runningContainersList
    +        .partition(idStatusMap contains _.getId)
    +
    +      completedContainers.foreach {
    +        container =>
    +          val status = idStatusMap(container.getId)
    +          failedContainers += 1
    +          runningContainers -= 1
    +          log.info(s"Container ${status.getContainerId} was a running 
container. " +
    +            s"Total failed containers $failedContainers.")
    +          val detail = status.getExitStatus match {
    +            case -103 => "Vmem limit exceeded";
    +            case -104 => "Pmem limit exceeded";
    +            case _ => ""
    +          }
    +          messageListener foreach {
    +            _ ! decorateMessage(
    +              YarnMessage(s"Diagnostics for 
containerID=${status.getContainerId} in " +
    +                s"state=${status.getState}.\n${status.getDiagnostics} 
$detail")
    +            )
    +          }
    +      }
    +
    +      runningContainersList = remainingRunningContainers
    +
    +      // maxFailedContainers == -1 is infinite number of retries.
    +      if (maxFailedContainers != -1 && failedContainers >= 
maxFailedContainers) {
    +        val msg = s"Stopping YARN session because the number of failed " +
    +          s"containers ($failedContainers) exceeded the maximum failed 
container " +
    +          s"count ($maxFailedContainers). This number is controlled by " +
    +          s"the '${ConfigConstants.YARN_MAX_FAILED_CONTAINERS}' 
configuration " +
    +          s"setting. By default its the number of requested containers"
    +        log.error(msg)
    +        self ! 
decorateMessage(StopYarnSession(FinalApplicationStatus.FAILED, msg))
    +
    +      }
    +
    +      allocateContainers()
    +
    +    }
    +
    +    override def onContainersAllocated(containers: JavaList[Container]): 
Unit = {
    +      println("onContainersAllocated")
    --- End diff --
    
    Yep. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to