[
https://issues.apache.org/jira/browse/FLINK-3134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15058021#comment-15058021
]
ASF GitHub Bot commented on FLINK-3134:
---------------------------------------
Github user mxm commented on a diff in the pull request:
https://github.com/apache/flink/pull/1450#discussion_r47632315
--- Diff:
flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala ---
@@ -754,6 +538,243 @@ class YarnJobManager(
memoryLimit
}
}
+
+ /**
+ * Heartbeats with the resource manager and handles container updates.
+ */
+ object AMRMClientAsyncHandler extends AMRMClientAsync.CallbackHandler {
+
+ private var client : AMRMClientAsync[ContainerRequest] = null
+
+ override def onError(e: Throwable): Unit = {
+ self ! decorateMessage(
+ StopYarnSession(
+ FinalApplicationStatus.FAILED,
+ "Error in communication with Yarn resource manager: " +
e.getMessage)
+ )
+ }
+
+ override def getProgress: Float = {
+ runningContainers.toFloat / numTaskManagers
+ }
+
+ override def onShutdownRequest(): Unit = {
+ }
+
+ override def onNodesUpdated(updatedNodes: JavaList[NodeReport]): Unit
= {
+ }
+
+ override def onContainersCompleted(statuses:
JavaList[ContainerStatus]): Unit = {
+
+ // TODO change this
+ log.debug(s"Processed Heartbeat with RMClient. Running containers
$runningContainers, " +
+ s"failed containers $failedContainers, " +
+ s"allocated containers ${allocatedContainersList.size}.")
+
+ val completedContainerStatuses = statuses.asScala
+ val idStatusMap = completedContainerStatuses
+ .map(status => (status.getContainerId, status)).toMap
+
+ completedContainerStatuses.foreach {
+ status => log.info(s"Container ${status.getContainerId} is
completed " +
+ s"with diagnostics: ${status.getDiagnostics}")
+ }
+
+ // get failed containers (returned containers are also completed, so
we have to
+ // distinguish if it was running before).
+ val (completedContainers, remainingRunningContainers) =
runningContainersList
+ .partition(idStatusMap contains _.getId)
+
+ completedContainers.foreach {
+ container =>
+ val status = idStatusMap(container.getId)
+ failedContainers += 1
+ runningContainers -= 1
+ log.info(s"Container ${status.getContainerId} was a running
container. " +
+ s"Total failed containers $failedContainers.")
+ val detail = status.getExitStatus match {
+ case -103 => "Vmem limit exceeded";
+ case -104 => "Pmem limit exceeded";
+ case _ => ""
+ }
+ messageListener foreach {
+ _ ! decorateMessage(
+ YarnMessage(s"Diagnostics for
containerID=${status.getContainerId} in " +
+ s"state=${status.getState}.\n${status.getDiagnostics}
$detail")
+ )
+ }
+ }
+
+ runningContainersList = remainingRunningContainers
+
+ // maxFailedContainers == -1 is infinite number of retries.
+ if (maxFailedContainers != -1 && failedContainers >=
maxFailedContainers) {
+ val msg = s"Stopping YARN session because the number of failed " +
+ s"containers ($failedContainers) exceeded the maximum failed
container " +
+ s"count ($maxFailedContainers). This number is controlled by " +
+ s"the '${ConfigConstants.YARN_MAX_FAILED_CONTAINERS}'
configuration " +
+ s"setting. By default its the number of requested containers"
+ log.error(msg)
+ self !
decorateMessage(StopYarnSession(FinalApplicationStatus.FAILED, msg))
+
+ }
+
+ allocateContainers()
+
+ }
+
+ override def onContainersAllocated(containers: JavaList[Container]):
Unit = {
+ println("onContainersAllocated")
--- End diff --
Yep. Thanks.
> Make YarnJobManager's allocate call asynchronous
> ------------------------------------------------
>
> Key: FLINK-3134
> URL: https://issues.apache.org/jira/browse/FLINK-3134
> Project: Flink
> Issue Type: Bug
> Components: YARN Client
> Affects Versions: 0.10.0, 1.0.0, 0.10.1
> Reporter: Maximilian Michels
> Assignee: Maximilian Michels
> Fix For: 1.0.0
>
>
> The {{allocate()}} call is used in the {{YarnJobManager}} to send a heartbeat
> to the YARN resource manager. This call may block the JobManager actor system
> for arbitrary time, e.g. if retry handlers are set up within the call to
> allocate.
> I propose to use the {{AMRMClientAsync}}'s callback methods to send
> heartbeats and update the container information. The API is available for our
> supported Hadoop versions (2.3.0 and above).
> https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.html
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)