Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5436#discussion_r168196542 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java --- @@ -325,26 +325,43 @@ public void onContainersCompleted(List<ContainerStatus> list) { @Override public void onContainersAllocated(List<Container> containers) { for (Container container : containers) { - numPendingContainerRequests = Math.max(0, numPendingContainerRequests - 1); - log.info("Received new container: {} - Remaining pending container requests: {}", - container.getId(), numPendingContainerRequests); - final String containerIdStr = container.getId().toString(); - workerNodeMap.put(new ResourceID(containerIdStr), - new YarnWorkerNode(container)); - try { - /** Context information used to start a TaskExecutor Java process */ - ContainerLaunchContext taskExecutorLaunchContext = - createTaskExecutorLaunchContext( - container.getResource(), containerIdStr, container.getNodeId().getHost()); - nodeManagerClient.startContainer(container, taskExecutorLaunchContext); - } - catch (Throwable t) { - // failed to launch the container, will release the failed one and ask for a new one - log.error("Could not start TaskManager in container {},", container, t); + log.info( + "Received new container: {} - Remaining pending container requests: {}", + container.getId(), + numPendingContainerRequests); + + if (numPendingContainerRequests > 0) { + numPendingContainerRequests = Math.max(0, numPendingContainerRequests - 1); --- End diff -- true
---