[
https://issues.apache.org/jira/browse/FLINK-7804?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16400614#comment-16400614
]
ASF GitHub Bot commented on FLINK-7804:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5675#discussion_r174834515
--- Diff:
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -325,67 +328,74 @@ public float getProgress() {
@Override
public void onContainersCompleted(List<ContainerStatus> list) {
- for (ContainerStatus container : list) {
- if (container.getExitStatus() < 0) {
- closeTaskManagerConnection(new ResourceID(
- container.getContainerId().toString()),
new Exception(container.getDiagnostics()));
+ runAsync(() -> {
+ for (ContainerStatus container : list) {
+ if (container.getExitStatus() < 0) {
+ closeTaskManagerConnection(new
ResourceID(
+
container.getContainerId().toString()), new
Exception(container.getDiagnostics()));
+ }
+ workerNodeMap.remove(new
ResourceID(container.getContainerId().toString()));
+ }
}
- workerNodeMap.remove(new
ResourceID(container.getContainerId().toString()));
- }
+ );
}
@Override
public void onContainersAllocated(List<Container> containers) {
- for (Container container : containers) {
- log.info(
- "Received new container: {} - Remaining pending
container requests: {}",
- container.getId(),
- numPendingContainerRequests);
-
- if (numPendingContainerRequests > 0) {
- numPendingContainerRequests--;
-
- final String containerIdStr =
container.getId().toString();
-
- workerNodeMap.put(new
ResourceID(containerIdStr), new YarnWorkerNode(container));
-
- try {
- // Context information used to start a
TaskExecutor Java process
- ContainerLaunchContext
taskExecutorLaunchContext = createTaskExecutorLaunchContext(
- container.getResource(),
- containerIdStr,
-
container.getNodeId().getHost());
-
-
nodeManagerClient.startContainer(container, taskExecutorLaunchContext);
- } catch (Throwable t) {
- log.error("Could not start TaskManager
in container {}.", container.getId(), t);
-
- // release the failed container
+ runAsync(() -> {
+ for (Container container : containers) {
+ log.info(
+ "Received new container: {} - Remaining
pending container requests: {}",
+ container.getId(),
+ numPendingContainerRequests);
+
+ if (numPendingContainerRequests > 0) {
+ numPendingContainerRequests--;
+
+ final String containerIdStr =
container.getId().toString();
+
+ workerNodeMap.put(new
ResourceID(containerIdStr), new YarnWorkerNode(container));
+
+ try {
+ // Context information used to
start a TaskExecutor Java process
+ ContainerLaunchContext
taskExecutorLaunchContext = createTaskExecutorLaunchContext(
+ container.getResource(),
+ containerIdStr,
+
container.getNodeId().getHost());
+
+
nodeManagerClient.startContainer(container, taskExecutorLaunchContext);
+ } catch (Throwable t) {
+ log.error("Could not start
TaskManager in container {}.", container.getId(), t);
+
+ // release the failed container
+
resourceManagerClient.releaseAssignedContainer(container.getId());
+ // and ask for a new one
+
requestYarnContainer(container.getResource(), container.getPriority());
+ }
+ } else {
+ // return the excessive containers
+ log.info("Returning excess container
{}.", container.getId());
resourceManagerClient.releaseAssignedContainer(container.getId());
- // and ask for a new one
-
requestYarnContainer(container.getResource(), container.getPriority());
}
- } else {
- // return the excessive containers
- log.info("Returning excess container {}.",
container.getId());
-
resourceManagerClient.releaseAssignedContainer(container.getId());
}
- }
- // if we are waiting for no further containers, we can go to the
- // regular heartbeat interval
- if (numPendingContainerRequests <= 0) {
-
resourceManagerClient.setHeartbeatInterval(yarnHeartbeatIntervalMillis);
- }
+ // if we are waiting for no further containers, we can
go to the
+ // regular heartbeat interval
+ if (numPendingContainerRequests <= 0) {
+
resourceManagerClient.setHeartbeatInterval(yarnHeartbeatIntervalMillis);
+ }
+ });
}
@Override
public void onShutdownRequest() {
- try {
- shutDown();
- } catch (Exception e) {
- log.warn("Fail to shutdown the YARN resource manager.",
e);
- }
+ runAsync(() -> {
+ try {
+ shutDown();
--- End diff --
`shutDown` can be directly called
> YarnResourceManager does not execute AMRMClientAsync callbacks in main thread
> -----------------------------------------------------------------------------
>
> Key: FLINK-7804
> URL: https://issues.apache.org/jira/browse/FLINK-7804
> Project: Flink
> Issue Type: Bug
> Components: Distributed Coordination, YARN
> Affects Versions: 1.4.0, 1.5.0
> Reporter: Till Rohrmann
> Assignee: Gary Yao
> Priority: Blocker
> Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{YarnResourceManager}} registers callbacks at a {{AMRMClientAsync}}
> which it uses to react to Yarn container allocations. These callbacks (e.g.
> {{onContainersAllocated}} modify the internal state of the
> {{YarnResourceManager}}. This can lead to race conditions with the
> {{requestYarnContainer}} method.
> In order to solve this problem we have to execute the state changing
> operations in the main thread of the {{YarnResourceManager}}.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)