Repository: flink Updated Branches: refs/heads/flip-6 55e94c3c6 -> 8c448e8f5
[FLINK-4927] [yarn] refine YARN Resource manager according to till's comments Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8c448e8f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8c448e8f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8c448e8f Branch: refs/heads/flip-6 Commit: 8c448e8f5d091258a2eec5348f879007ac72e288 Parents: 55e94c3 Author: shuai.xus <[email protected]> Authored: Mon Dec 5 15:36:16 2016 +0800 Committer: Stephan Ewen <[email protected]> Committed: Mon Dec 5 19:10:35 2016 +0100 ---------------------------------------------------------------------- .../apache/flink/yarn/YarnResourceManager.java | 68 +++++++++++++++----- 1 file changed, 51 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8c448e8f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java index 6280bdf..9b9ea39 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java @@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.util.Records; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.duration.FiniteDuration; +import org.apache.flink.util.ExceptionUtils; import java.io.File; import java.io.IOException; @@ -79,21 +80,24 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements /** The process environment variables */ private final Map<String, String> ENV; + /** The default registration timeout for task executor in seconds. */ + private final static int DEFAULT_TASK_MANAGER_REGISTRATION_DURATION = 300; + /** The heartbeat interval while the resource master is waiting for containers */ private static final int FAST_YARN_HEARTBEAT_INTERVAL_MS = 500; /** The default heartbeat interval during regular operation */ private static final int DEFAULT_YARN_HEARTBEAT_INTERVAL_MS = 5000; - /** The maximum time that TaskExecutors may be waiting to register at the ResourceManager before they quit */ - private static final FiniteDuration TASKEXECUTOR_REGISTRATION_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + /** The default memory of task executor to allocate (in MB) */ + private static final int DEFAULT_TSK_EXECUTOR_MEMORY_SIZE = 1024; /** Environment variable name of the final container id used by the YarnResourceManager. * Container ID generation may vary across Hadoop versions. */ final static String ENV_FLINK_CONTAINER_ID = "_FLINK_CONTAINER_ID"; - /** Environment variable name of the hostname used by the Yarn. - * TaskExecutor use this host name to start port. */ + /** Environment variable name of the hostname given by the YARN. + * In task executor we use the hostnames given by YARN consistently throughout akka */ final static String ENV_FLINK_NODE_ID = "_FLINK_NODE_ID"; /** Default heartbeat interval between this resource manager and the YARN ResourceManager */ @@ -112,6 +116,8 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements /** The number of containers requested, but not yet granted */ private int numPendingContainerRequests; + final private Map<ResourceProfile, Integer> resourcePriorities = new HashMap<>(); + public YarnResourceManager( Configuration flinkConfig, Map<String, String> env, @@ -173,20 +179,28 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements @Override public void shutDown() throws Exception { // shut down all components + Throwable firstException = null; if (resourceManagerClient != null) { try { resourceManagerClient.stop(); } catch (Throwable t) { - LOG.error("Could not cleanly shut down the Asynchronous Resource Manager Client", t); + firstException = t; } } if (nodeManagerClient != null) { try { nodeManagerClient.stop(); } catch (Throwable t) { - LOG.error("Could not cleanly shut down the Node Manager Client", t); + if (firstException == null) { + firstException = t; + } else { + firstException.addSuppressed(t); + } } } + if (firstException != null) { + ExceptionUtils.rethrowException(firstException, "Error while shutting down YARN resource manager"); + } super.shutDown(); } @@ -207,13 +221,10 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements public void startNewWorker(ResourceProfile resourceProfile) { // Priority for worker containers - priorities are intra-application //TODO: set priority according to the resource allocated - Priority priority = Priority.newInstance(0); - int mem = resourceProfile.getMemoryInMB() <= Integer.MAX_VALUE ? (int)resourceProfile.getMemoryInMB() : Integer.MAX_VALUE; - if (mem < 0) { - mem = 1024; - } - int vcore = resourceProfile.getCpuCores() < 1 ? 1 : (int)resourceProfile.getCpuCores() + 1; - Resource capability = Resource.newInstance(mem , vcore); + Priority priority = Priority.newInstance(generatePriority(resourceProfile)); + int mem = resourceProfile.getMemoryInMB() < 0 ? DEFAULT_TSK_EXECUTOR_MEMORY_SIZE : (int)resourceProfile.getMemoryInMB(); + int vcore = resourceProfile.getCpuCores() < 1 ? 1 : (int)resourceProfile.getCpuCores(); + Resource capability = Resource.newInstance(mem, vcore); requestYarnContainer(capability, priority); } @@ -234,7 +245,6 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements for (ContainerStatus container : list) { if (container.getExitStatus() < 0) { notifyWorkerFailed(new ResourceID(container.getContainerId().toString()), container.getDiagnostics()); - // TODO: notice job master slot fail } } } @@ -253,7 +263,7 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements } catch (Throwable t) { // failed to launch the container, will release the failed one and ask for a new one - LOG.error("Could not start TaskManager in container " + container, t); + LOG.error("Could not start TaskManager in container {},", container, t); resourceManagerClient.releaseAssignedContainer(container.getId()); requestYarnContainer(container.getResource(), container.getPriority()); } @@ -265,7 +275,11 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements @Override public void onShutdownRequest() { - // Nothing to do + try { + shutDown(); + } catch (Exception e) { + LOG.warn("Fail to shutdown the YARN resource manager.", e); + } } @Override @@ -336,8 +350,11 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements taskManagerParameters.taskManagerTotalMemoryMB(), taskManagerParameters.taskManagerHeapSizeMB(), taskManagerParameters.taskManagerDirectMemoryLimitMB()); + int timeout = flinkConfig.getInteger(ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, + DEFAULT_TASK_MANAGER_REGISTRATION_DURATION); + FiniteDuration teRegistrationTimeout = new FiniteDuration(timeout, TimeUnit.SECONDS); final Configuration taskManagerConfig = BootstrapTools.generateTaskManagerConfiguration( - flinkConfig, "", 0, 1, TASKEXECUTOR_REGISTRATION_TIMEOUT); + flinkConfig, "", 0, 1, teRegistrationTimeout); LOG.debug("TaskManager configuration: {}", taskManagerConfig); ContainerLaunchContext taskExecutorLaunchContext = createTaskExecutorContext( @@ -549,4 +566,21 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements return ctx; } + + /** + * Generate priority by given resouce profile. + * Priority is only used for distinguishing request of different resource. + * @param resourceProfile The resource profile of a request + * @return The priority of this resource profile. + */ + private int generatePriority(ResourceProfile resourceProfile) { + if (resourcePriorities.containsKey(resourceProfile)) { + return resourcePriorities.get(resourceProfile); + } else { + int priority = resourcePriorities.size(); + resourcePriorities.put(resourceProfile, priority); + return priority; + } + } + }
