[FLINK-7804][flip6] Run AMRMClientAsync callbacks in main thread

This closes #5675.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9538675c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9538675c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9538675c

Branch: refs/heads/master
Commit: 9538675caca24c83201e47bd26e4e725b2695217
Parents: 7caeefd
Author: gyao <[email protected]>
Authored: Fri Mar 9 14:36:33 2018 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Sun Mar 18 15:14:40 2018 +0100

----------------------------------------------------------------------
 .../apache/flink/yarn/YarnResourceManager.java  | 107 ++++++++++---------
 .../flink/yarn/YarnResourceManagerTest.java     |  15 +--
 2 files changed, 65 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9538675c/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index af789ba..97db2ad 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -316,7 +316,10 @@ public class YarnResourceManager extends 
ResourceManager<YarnWorkerNode> impleme
                return workerNodeMap.get(resourceID);
        }
 
-       // AMRMClientAsync CallbackHandler methods
+       // 
------------------------------------------------------------------------
+       //  AMRMClientAsync CallbackHandler methods
+       // 
------------------------------------------------------------------------
+
        @Override
        public float getProgress() {
                // Temporarily need not record the total size of asked and 
allocated containers
@@ -325,67 +328,68 @@ public class YarnResourceManager extends 
ResourceManager<YarnWorkerNode> impleme
 
        @Override
        public void onContainersCompleted(List<ContainerStatus> list) {
-               for (ContainerStatus container : list) {
-                       if (container.getExitStatus() < 0) {
-                               closeTaskManagerConnection(new ResourceID(
-                                       container.getContainerId().toString()), 
new Exception(container.getDiagnostics()));
+               runAsync(() -> {
+                               for (ContainerStatus container : list) {
+                                       if (container.getExitStatus() < 0) {
+                                               closeTaskManagerConnection(new 
ResourceID(
+                                                       
container.getContainerId().toString()), new 
Exception(container.getDiagnostics()));
+                                       }
+                                       workerNodeMap.remove(new 
ResourceID(container.getContainerId().toString()));
+                               }
                        }
-                       workerNodeMap.remove(new 
ResourceID(container.getContainerId().toString()));
-               }
+               );
        }
 
        @Override
        public void onContainersAllocated(List<Container> containers) {
-               for (Container container : containers) {
-                       log.info(
-                               "Received new container: {} - Remaining pending 
container requests: {}",
-                               container.getId(),
-                               numPendingContainerRequests);
-
-                       if (numPendingContainerRequests > 0) {
-                               numPendingContainerRequests--;
-
-                               final String containerIdStr = 
container.getId().toString();
-
-                               workerNodeMap.put(new 
ResourceID(containerIdStr), new YarnWorkerNode(container));
-
-                               try {
-                                       // Context information used to start a 
TaskExecutor Java process
-                                       ContainerLaunchContext 
taskExecutorLaunchContext = createTaskExecutorLaunchContext(
-                                               container.getResource(),
-                                               containerIdStr,
-                                               
container.getNodeId().getHost());
-
-                                       
nodeManagerClient.startContainer(container, taskExecutorLaunchContext);
-                               } catch (Throwable t) {
-                                       log.error("Could not start TaskManager 
in container {}.", container.getId(), t);
-
-                                       // release the failed container
+               runAsync(() -> {
+                       for (Container container : containers) {
+                               log.info(
+                                       "Received new container: {} - Remaining 
pending container requests: {}",
+                                       container.getId(),
+                                       numPendingContainerRequests);
+
+                               if (numPendingContainerRequests > 0) {
+                                       numPendingContainerRequests--;
+
+                                       final String containerIdStr = 
container.getId().toString();
+
+                                       workerNodeMap.put(new 
ResourceID(containerIdStr), new YarnWorkerNode(container));
+
+                                       try {
+                                               // Context information used to 
start a TaskExecutor Java process
+                                               ContainerLaunchContext 
taskExecutorLaunchContext = createTaskExecutorLaunchContext(
+                                                       container.getResource(),
+                                                       containerIdStr,
+                                                       
container.getNodeId().getHost());
+
+                                               
nodeManagerClient.startContainer(container, taskExecutorLaunchContext);
+                                       } catch (Throwable t) {
+                                               log.error("Could not start 
TaskManager in container {}.", container.getId(), t);
+
+                                               // release the failed container
+                                               
resourceManagerClient.releaseAssignedContainer(container.getId());
+                                               // and ask for a new one
+                                               
requestYarnContainer(container.getResource(), container.getPriority());
+                                       }
+                               } else {
+                                       // return the excessive containers
+                                       log.info("Returning excess container 
{}.", container.getId());
                                        
resourceManagerClient.releaseAssignedContainer(container.getId());
-                                       // and ask for a new one
-                                       
requestYarnContainer(container.getResource(), container.getPriority());
                                }
-                       } else {
-                               // return the excessive containers
-                               log.info("Returning excess container {}.", 
container.getId());
-                               
resourceManagerClient.releaseAssignedContainer(container.getId());
                        }
-               }
 
-               // if we are waiting for no further containers, we can go to the
-               // regular heartbeat interval
-               if (numPendingContainerRequests <= 0) {
-                       
resourceManagerClient.setHeartbeatInterval(yarnHeartbeatIntervalMillis);
-               }
+                       // if we are waiting for no further containers, we can 
go to the
+                       // regular heartbeat interval
+                       if (numPendingContainerRequests <= 0) {
+                               
resourceManagerClient.setHeartbeatInterval(yarnHeartbeatIntervalMillis);
+                       }
+               });
        }
 
        @Override
        public void onShutdownRequest() {
-               try {
-                       shutDown();
-               } catch (Exception e) {
-                       log.warn("Fail to shutdown the YARN resource manager.", 
e);
-               }
+               shutDown();
        }
 
        @Override
@@ -398,7 +402,10 @@ public class YarnResourceManager extends 
ResourceManager<YarnWorkerNode> impleme
                onFatalError(error);
        }
 
-       //Utility methods
+       // 
------------------------------------------------------------------------
+       //  Utility methods
+       // 
------------------------------------------------------------------------
+
        /**
         * Converts a Flink application status enum to a YARN application 
status enum.
         * @param status The Flink application status.

http://git-wip-us.apache.org/repos/asf/flink/blob/9538675c/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
index 455abc9..0d37b8e 100644
--- 
a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+++ 
b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -72,8 +72,6 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
@@ -105,14 +103,12 @@ import static org.mockito.Mockito.when;
  */
 public class YarnResourceManagerTest extends TestLogger {
 
-       private static final Logger LOG = 
LoggerFactory.getLogger(YarnResourceManagerTest.class);
+       private static final Time TIMEOUT = Time.seconds(10L);
 
        private static Configuration flinkConfig = new Configuration();
 
        private static Map<String, String> env = new HashMap<>();
 
-       private static final Time timeout = Time.seconds(10L);
-
        @Rule
        public TemporaryFolder folder = new TemporaryFolder();
 
@@ -178,7 +174,7 @@ public class YarnResourceManagerTest extends TestLogger {
                }
 
                public <T> CompletableFuture<T> runInMainThread(Callable<T> 
callable) {
-                       return callAsync(callable, timeout);
+                       return callAsync(callable, TIMEOUT);
                }
 
                public MainThreadExecutor getMainThreadExecutorForTesting() {
@@ -197,6 +193,11 @@ public class YarnResourceManagerTest extends TestLogger {
                protected NMClient 
createAndStartNodeManagerClient(YarnConfiguration yarnConfiguration) {
                        return mockNMClient;
                }
+
+               @Override
+               protected void runAsync(final Runnable runnable) {
+                       runnable.run();
+               }
        }
 
        static class Context {
@@ -292,7 +293,7 @@ public class YarnResourceManagerTest extends TestLogger {
 
                        public void grantLeadership() throws Exception {
                                rmLeaderSessionId = UUID.randomUUID();
-                               
rmLeaderElectionService.isLeader(rmLeaderSessionId).get(timeout.toMilliseconds(),
 TimeUnit.MILLISECONDS);
+                               
rmLeaderElectionService.isLeader(rmLeaderSessionId).get(TIMEOUT.toMilliseconds(),
 TimeUnit.MILLISECONDS);
                        }
                }
 

Reply via email to