[
https://issues.apache.org/jira/browse/FLINK-8613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16364177#comment-16364177
]
ASF GitHub Bot commented on FLINK-8613:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5436#discussion_r168197381
--- Diff:
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -325,26 +325,43 @@ public void
onContainersCompleted(List<ContainerStatus> list) {
@Override
public void onContainersAllocated(List<Container> containers) {
for (Container container : containers) {
- numPendingContainerRequests = Math.max(0,
numPendingContainerRequests - 1);
- log.info("Received new container: {} - Remaining
pending container requests: {}",
- container.getId(),
numPendingContainerRequests);
- final String containerIdStr =
container.getId().toString();
- workerNodeMap.put(new ResourceID(containerIdStr),
- new YarnWorkerNode(container));
- try {
- /** Context information used to start a
TaskExecutor Java process */
- ContainerLaunchContext
taskExecutorLaunchContext =
- createTaskExecutorLaunchContext(
-
container.getResource(), containerIdStr, container.getNodeId().getHost());
- nodeManagerClient.startContainer(container,
taskExecutorLaunchContext);
- }
- catch (Throwable t) {
- // failed to launch the container, will release
the failed one and ask for a new one
- log.error("Could not start TaskManager in
container {},", container, t);
+ log.info(
+ "Received new container: {} - Remaining pending
container requests: {}",
+ container.getId(),
+ numPendingContainerRequests);
+
+ if (numPendingContainerRequests > 0) {
+ numPendingContainerRequests = Math.max(0,
numPendingContainerRequests - 1);
+
+ final String containerIdStr =
container.getId().toString();
+
+ workerNodeMap.put(new
ResourceID(containerIdStr), new YarnWorkerNode(container));
+
+ try {
+ // Context information used to start a
TaskExecutor Java process
+ ContainerLaunchContext
taskExecutorLaunchContext = createTaskExecutorLaunchContext(
+ container.getResource(),
+ containerIdStr,
+
container.getNodeId().getHost());
+
+
nodeManagerClient.startContainer(container, taskExecutorLaunchContext);
+ } catch (Throwable t) {
+ log.error("Could not start TaskManager
in container {}.", container.getId(), t);
+
+ // release the failed container
+
resourceManagerClient.releaseAssignedContainer(container.getId());
+ // and ask for a new one
+
requestYarnContainer(container.getResource(), container.getPriority());
+ }
+ } else {
+ // return the excessive containers
+ log.info("Returning excess container {}.",
container.getId());
resourceManagerClient.releaseAssignedContainer(container.getId());
- requestYarnContainer(container.getResource(),
container.getPriority());
}
}
+
+ // if we are waiting for no further containers, we can go to the
+ // regular heartbeat interval
if (numPendingContainerRequests <= 0) {
--- End diff --
yes indeed. However, the logic should not be affected by using `<=` as the
comparison operator.
> Return excess container in YarnResourceManager
> ----------------------------------------------
>
> Key: FLINK-8613
> URL: https://issues.apache.org/jira/browse/FLINK-8613
> Project: Flink
> Issue Type: Improvement
> Components: Distributed Coordination, YARN
> Affects Versions: 1.5.0
> Reporter: Till Rohrmann
> Assignee: Till Rohrmann
> Priority: Major
> Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{YarnResourceManager}} should return excess containers which the Yarn
> RessourceManager assigned wrongly.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)