[FLINK-7804][flip6] Run AMRMClientAsync callbacks in main thread This closes #5675.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9538675c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9538675c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9538675c Branch: refs/heads/master Commit: 9538675caca24c83201e47bd26e4e725b2695217 Parents: 7caeefd Author: gyao <[email protected]> Authored: Fri Mar 9 14:36:33 2018 +0100 Committer: Till Rohrmann <[email protected]> Committed: Sun Mar 18 15:14:40 2018 +0100 ---------------------------------------------------------------------- .../apache/flink/yarn/YarnResourceManager.java | 107 ++++++++++--------- .../flink/yarn/YarnResourceManagerTest.java | 15 +-- 2 files changed, 65 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9538675c/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java index af789ba..97db2ad 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java @@ -316,7 +316,10 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme return workerNodeMap.get(resourceID); } - // AMRMClientAsync CallbackHandler methods + // ------------------------------------------------------------------------ + // AMRMClientAsync CallbackHandler methods + // ------------------------------------------------------------------------ + @Override public float getProgress() { // Temporarily need not record the total size of asked and allocated containers @@ -325,67 +328,68 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme @Override public void onContainersCompleted(List<ContainerStatus> list) { - for (ContainerStatus container : list) { - if (container.getExitStatus() < 0) { - closeTaskManagerConnection(new ResourceID( - container.getContainerId().toString()), new Exception(container.getDiagnostics())); + runAsync(() -> { + for (ContainerStatus container : list) { + if (container.getExitStatus() < 0) { + closeTaskManagerConnection(new ResourceID( + container.getContainerId().toString()), new Exception(container.getDiagnostics())); + } + workerNodeMap.remove(new ResourceID(container.getContainerId().toString())); + } } - workerNodeMap.remove(new ResourceID(container.getContainerId().toString())); - } + ); } @Override public void onContainersAllocated(List<Container> containers) { - for (Container container : containers) { - log.info( - "Received new container: {} - Remaining pending container requests: {}", - container.getId(), - numPendingContainerRequests); - - if (numPendingContainerRequests > 0) { - numPendingContainerRequests--; - - final String containerIdStr = container.getId().toString(); - - workerNodeMap.put(new ResourceID(containerIdStr), new YarnWorkerNode(container)); - - try { - // Context information used to start a TaskExecutor Java process - ContainerLaunchContext taskExecutorLaunchContext = createTaskExecutorLaunchContext( - container.getResource(), - containerIdStr, - container.getNodeId().getHost()); - - nodeManagerClient.startContainer(container, taskExecutorLaunchContext); - } catch (Throwable t) { - log.error("Could not start TaskManager in container {}.", container.getId(), t); - - // release the failed container + runAsync(() -> { + for (Container container : containers) { + log.info( + "Received new container: {} - Remaining pending container requests: {}", + container.getId(), + numPendingContainerRequests); + + if (numPendingContainerRequests > 0) { + numPendingContainerRequests--; + + final String containerIdStr = container.getId().toString(); + + workerNodeMap.put(new ResourceID(containerIdStr), new YarnWorkerNode(container)); + + try { + // Context information used to start a TaskExecutor Java process + ContainerLaunchContext taskExecutorLaunchContext = createTaskExecutorLaunchContext( + container.getResource(), + containerIdStr, + container.getNodeId().getHost()); + + nodeManagerClient.startContainer(container, taskExecutorLaunchContext); + } catch (Throwable t) { + log.error("Could not start TaskManager in container {}.", container.getId(), t); + + // release the failed container + resourceManagerClient.releaseAssignedContainer(container.getId()); + // and ask for a new one + requestYarnContainer(container.getResource(), container.getPriority()); + } + } else { + // return the excessive containers + log.info("Returning excess container {}.", container.getId()); resourceManagerClient.releaseAssignedContainer(container.getId()); - // and ask for a new one - requestYarnContainer(container.getResource(), container.getPriority()); } - } else { - // return the excessive containers - log.info("Returning excess container {}.", container.getId()); - resourceManagerClient.releaseAssignedContainer(container.getId()); } - } - // if we are waiting for no further containers, we can go to the - // regular heartbeat interval - if (numPendingContainerRequests <= 0) { - resourceManagerClient.setHeartbeatInterval(yarnHeartbeatIntervalMillis); - } + // if we are waiting for no further containers, we can go to the + // regular heartbeat interval + if (numPendingContainerRequests <= 0) { + resourceManagerClient.setHeartbeatInterval(yarnHeartbeatIntervalMillis); + } + }); } @Override public void onShutdownRequest() { - try { - shutDown(); - } catch (Exception e) { - log.warn("Fail to shutdown the YARN resource manager.", e); - } + shutDown(); } @Override @@ -398,7 +402,10 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme onFatalError(error); } - //Utility methods + // ------------------------------------------------------------------------ + // Utility methods + // ------------------------------------------------------------------------ + /** * Converts a Flink application status enum to a YARN application status enum. * @param status The Flink application status. http://git-wip-us.apache.org/repos/asf/flink/blob/9538675c/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java index 455abc9..0d37b8e 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java @@ -72,8 +72,6 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import javax.annotation.Nullable; @@ -105,14 +103,12 @@ import static org.mockito.Mockito.when; */ public class YarnResourceManagerTest extends TestLogger { - private static final Logger LOG = LoggerFactory.getLogger(YarnResourceManagerTest.class); + private static final Time TIMEOUT = Time.seconds(10L); private static Configuration flinkConfig = new Configuration(); private static Map<String, String> env = new HashMap<>(); - private static final Time timeout = Time.seconds(10L); - @Rule public TemporaryFolder folder = new TemporaryFolder(); @@ -178,7 +174,7 @@ public class YarnResourceManagerTest extends TestLogger { } public <T> CompletableFuture<T> runInMainThread(Callable<T> callable) { - return callAsync(callable, timeout); + return callAsync(callable, TIMEOUT); } public MainThreadExecutor getMainThreadExecutorForTesting() { @@ -197,6 +193,11 @@ public class YarnResourceManagerTest extends TestLogger { protected NMClient createAndStartNodeManagerClient(YarnConfiguration yarnConfiguration) { return mockNMClient; } + + @Override + protected void runAsync(final Runnable runnable) { + runnable.run(); + } } static class Context { @@ -292,7 +293,7 @@ public class YarnResourceManagerTest extends TestLogger { public void grantLeadership() throws Exception { rmLeaderSessionId = UUID.randomUUID(); - rmLeaderElectionService.isLeader(rmLeaderSessionId).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + rmLeaderElectionService.isLeader(rmLeaderSessionId).get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS); } }
