Repository: flink
Updated Branches:
  refs/heads/flip-6 55e94c3c6 -> 8c448e8f5


[FLINK-4927] [yarn] refine YARN Resource manager according to till's comments


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8c448e8f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8c448e8f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8c448e8f

Branch: refs/heads/flip-6
Commit: 8c448e8f5d091258a2eec5348f879007ac72e288
Parents: 55e94c3
Author: shuai.xus <[email protected]>
Authored: Mon Dec 5 15:36:16 2016 +0800
Committer: Stephan Ewen <[email protected]>
Committed: Mon Dec 5 19:10:35 2016 +0100

----------------------------------------------------------------------
 .../apache/flink/yarn/YarnResourceManager.java  | 68 +++++++++++++++-----
 1 file changed, 51 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8c448e8f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 6280bdf..9b9ea39 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.util.Records;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.FiniteDuration;
+import org.apache.flink.util.ExceptionUtils;
 
 import java.io.File;
 import java.io.IOException;
@@ -79,21 +80,24 @@ public class YarnResourceManager extends 
ResourceManager<ResourceID> implements
        /** The process environment variables */
        private final Map<String, String> ENV;
 
+       /** The default registration timeout for task executor in seconds. */
+       private final static int DEFAULT_TASK_MANAGER_REGISTRATION_DURATION = 
300;
+
        /** The heartbeat interval while the resource master is waiting for 
containers */
        private static final int FAST_YARN_HEARTBEAT_INTERVAL_MS = 500;
 
        /** The default heartbeat interval during regular operation */
        private static final int DEFAULT_YARN_HEARTBEAT_INTERVAL_MS = 5000;
 
-       /** The maximum time that TaskExecutors may be waiting to register at 
the ResourceManager before they quit */
-       private static final FiniteDuration TASKEXECUTOR_REGISTRATION_TIMEOUT = 
new FiniteDuration(5, TimeUnit.MINUTES);
+       /** The default memory of task executor to allocate (in MB) */
+       private static final int DEFAULT_TSK_EXECUTOR_MEMORY_SIZE = 1024;
 
        /** Environment variable name of the final container id used by the 
YarnResourceManager.
         * Container ID generation may vary across Hadoop versions. */
        final static String ENV_FLINK_CONTAINER_ID = "_FLINK_CONTAINER_ID";
        
-       /** Environment variable name of the hostname used by the Yarn.
-        * TaskExecutor use this host name to start port. */
+       /** Environment variable name of the hostname given by the YARN.
+        * In task executor we use the hostnames given by YARN consistently 
throughout akka */
        final static String ENV_FLINK_NODE_ID = "_FLINK_NODE_ID";
 
        /** Default heartbeat interval between this resource manager and the 
YARN ResourceManager */
@@ -112,6 +116,8 @@ public class YarnResourceManager extends 
ResourceManager<ResourceID> implements
        /** The number of containers requested, but not yet granted */
        private int numPendingContainerRequests;
 
+       final private Map<ResourceProfile, Integer> resourcePriorities = new 
HashMap<>();
+
        public YarnResourceManager(
                        Configuration flinkConfig,
                        Map<String, String> env,
@@ -173,20 +179,28 @@ public class YarnResourceManager extends 
ResourceManager<ResourceID> implements
        @Override
        public void shutDown() throws Exception {
                // shut down all components
+               Throwable firstException = null;
                if (resourceManagerClient != null) {
                        try {
                                resourceManagerClient.stop();
                        } catch (Throwable t) {
-                               LOG.error("Could not cleanly shut down the 
Asynchronous Resource Manager Client", t);
+                               firstException = t;
                        }
                }
                if (nodeManagerClient != null) {
                        try {
                                nodeManagerClient.stop();
                        } catch (Throwable t) {
-                               LOG.error("Could not cleanly shut down the Node 
Manager Client", t);
+                               if (firstException == null) {
+                                       firstException = t;
+                               } else {
+                                       firstException.addSuppressed(t);
+                               }
                        }
                }
+               if (firstException != null) {
+                       ExceptionUtils.rethrowException(firstException, "Error 
while shutting down YARN resource manager");
+               }
                super.shutDown();
        }
 
@@ -207,13 +221,10 @@ public class YarnResourceManager extends 
ResourceManager<ResourceID> implements
        public void startNewWorker(ResourceProfile resourceProfile) {
                // Priority for worker containers - priorities are 
intra-application
                //TODO: set priority according to the resource allocated
-               Priority priority = Priority.newInstance(0);
-               int mem = resourceProfile.getMemoryInMB() <= Integer.MAX_VALUE 
? (int)resourceProfile.getMemoryInMB() : Integer.MAX_VALUE;
-               if (mem < 0) {
-                       mem = 1024;
-               }
-               int vcore = resourceProfile.getCpuCores() < 1 ? 1 : 
(int)resourceProfile.getCpuCores() + 1;
-               Resource capability = Resource.newInstance(mem , vcore);
+               Priority priority = 
Priority.newInstance(generatePriority(resourceProfile));
+               int mem = resourceProfile.getMemoryInMB() < 0 ? 
DEFAULT_TSK_EXECUTOR_MEMORY_SIZE : (int)resourceProfile.getMemoryInMB();
+               int vcore = resourceProfile.getCpuCores() < 1 ? 1 : 
(int)resourceProfile.getCpuCores();
+               Resource capability = Resource.newInstance(mem, vcore);
                requestYarnContainer(capability, priority);
        }
 
@@ -234,7 +245,6 @@ public class YarnResourceManager extends 
ResourceManager<ResourceID> implements
                for (ContainerStatus container : list) {
                        if (container.getExitStatus() < 0) {
                                notifyWorkerFailed(new 
ResourceID(container.getContainerId().toString()), container.getDiagnostics());
-                               // TODO: notice job master slot fail
                        }
                }
        }
@@ -253,7 +263,7 @@ public class YarnResourceManager extends 
ResourceManager<ResourceID> implements
                        }
                        catch (Throwable t) {
                                // failed to launch the container, will release 
the failed one and ask for a new one
-                               LOG.error("Could not start TaskManager in 
container " + container, t);
+                               LOG.error("Could not start TaskManager in 
container {},", container, t);
                                
resourceManagerClient.releaseAssignedContainer(container.getId());
                                requestYarnContainer(container.getResource(), 
container.getPriority());
                        }
@@ -265,7 +275,11 @@ public class YarnResourceManager extends 
ResourceManager<ResourceID> implements
 
        @Override
        public void onShutdownRequest() {
-               // Nothing to do
+               try {
+                       shutDown();
+               } catch (Exception e) {
+                       LOG.warn("Fail to shutdown the YARN resource manager.", 
e);
+               }
        }
 
        @Override
@@ -336,8 +350,11 @@ public class YarnResourceManager extends 
ResourceManager<ResourceID> implements
                                
taskManagerParameters.taskManagerTotalMemoryMB(),
                                taskManagerParameters.taskManagerHeapSizeMB(),
                                
taskManagerParameters.taskManagerDirectMemoryLimitMB());
+               int timeout = 
flinkConfig.getInteger(ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, 
+                               DEFAULT_TASK_MANAGER_REGISTRATION_DURATION);
+               FiniteDuration teRegistrationTimeout = new 
FiniteDuration(timeout, TimeUnit.SECONDS);
                final Configuration taskManagerConfig = 
BootstrapTools.generateTaskManagerConfiguration(
-                               flinkConfig, "", 0, 1, 
TASKEXECUTOR_REGISTRATION_TIMEOUT);
+                               flinkConfig, "", 0, 1, teRegistrationTimeout);
                LOG.debug("TaskManager configuration: {}", taskManagerConfig);
 
                ContainerLaunchContext taskExecutorLaunchContext = 
createTaskExecutorContext(
@@ -549,4 +566,21 @@ public class YarnResourceManager extends 
ResourceManager<ResourceID> implements
 
                return ctx;
        }
+       
+       /**
+        * Generate priority by given resouce profile. 
+        * Priority is only used for distinguishing request of different 
resource.
+        * @param resourceProfile The resource profile of a request
+        * @return The priority of this resource profile.
+        */
+       private int generatePriority(ResourceProfile resourceProfile) {
+               if (resourcePriorities.containsKey(resourceProfile)) {
+                       return resourcePriorities.get(resourceProfile);
+               } else {
+                       int priority = resourcePriorities.size();
+                       resourcePriorities.put(resourceProfile, priority);
+                       return priority;
+               }
+       }
+
 }

Reply via email to