Github user sihuazhou commented on a diff in the pull request:
https://github.com/apache/flink/pull/5881#discussion_r183265947
--- Diff:
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -356,17 +395,25 @@ public void onContainersAllocated(List<Container>
containers) {
workerNodeMap.put(new
ResourceID(containerIdStr), new YarnWorkerNode(container));
+ ResourceID resourceID = new
ResourceID(containerIdStr);
+
try {
// Context information used to
start a TaskExecutor Java process
ContainerLaunchContext
taskExecutorLaunchContext = createTaskExecutorLaunchContext(
container.getResource(),
containerIdStr,
container.getNodeId().getHost());
+ // remember the pending
container that need to be registered with ResourceManager.
+
pendingContainersExpectedToRegister.put(resourceID, container);
+
nodeManagerClient.startContainer(container, taskExecutorLaunchContext);
} catch (Throwable t) {
log.error("Could not start
TaskManager in container {}.", container.getId(), t);
+ // remove the failed container
+
pendingContainersExpectedToRegister.remove(resourceID);
--- End diff --
nice catch!
---