Repository: samza Updated Branches: refs/heads/master 4aae9ad8c -> 9b0861f1e
SAMZA-1013: Add node labeling support for Yarn Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/9b0861f1 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/9b0861f1 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/9b0861f1 Branch: refs/heads/master Commit: 9b0861f1e776248f25519c04e875e97ecba0a389 Parents: 4aae9ad Author: Maxim Logvinenko <[email protected]> Authored: Thu Oct 20 17:57:40 2016 -0700 Committer: Xinyu Liu <[email protected]> Committed: Thu Oct 20 17:57:40 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/samza/config/YarnConfig.java | 18 ++++++++++++++++++ .../job/yarn/YarnClusterResourceManager.java | 7 +++++-- .../org/apache/samza/job/yarn/ClientHelper.scala | 9 +++++++++ 3 files changed, 32 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/9b0861f1/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java b/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java index 8f2dc48..2970905 100644 --- a/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java +++ b/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java @@ -46,6 +46,11 @@ public class YarnConfig extends MapConfig { private static final int DEFAULT_CPU_CORES = 1; /** + * Label to request from YARN for containers + */ + public static final String CONTAINER_LABEL = "yarn.container.label"; + + /** * Maximum number of times the AM tries to restart a failed container */ public static final String CONTAINER_RETRY_COUNT = "yarn.container.retry.count"; @@ -76,6 +81,11 @@ public class YarnConfig extends MapConfig { private static final int DEFAULT_AM_CONTAINER_MAX_MEMORY_MB = 1024; /** + * Label to request from YARN for running the AM + */ + public static final String AM_CONTAINER_LABEL = "yarn.am.container.label"; + + /** * Determines the interval for the Heartbeat between the AM and the Yarn RM */ public static final String AM_POLL_INTERVAL_MS = "yarn.am.poll.interval.ms"; @@ -154,6 +164,10 @@ public class YarnConfig extends MapConfig { return getInt(CONTAINER_MAX_CPU_CORES, DEFAULT_CPU_CORES); } + public String getContainerLabel() { + return get(CONTAINER_LABEL, null); + } + public boolean getJmxServerEnabled() { return getBoolean(AM_JMX_ENABLED, true); } @@ -170,6 +184,10 @@ public class YarnConfig extends MapConfig { return getInt(AM_CONTAINER_MAX_MEMORY_MB, DEFAULT_AM_CONTAINER_MAX_MEMORY_MB); } + public String getAMContainerLabel() { + return get(AM_CONTAINER_LABEL, null); + } + public String getAmOpts() { return get(AM_JVM_OPTIONS, ""); } http://git-wip-us.apache.org/repos/asf/samza/blob/9b0861f1/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java index 96d3d7c..c1b71bb 100644 --- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java +++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java @@ -184,6 +184,7 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement int memoryMb = resourceRequest.getMemoryMB(); int cpuCores = resourceRequest.getNumCores(); + String containerLabel = yarnConfig.getContainerLabel(); String preferredHost = resourceRequest.getPreferredHost(); Resource capability = Resource.newInstance(memoryMb, cpuCores); Priority priority = Priority.newInstance(DEFAULT_PRIORITY); @@ -193,7 +194,7 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement if (preferredHost.equals("ANY_HOST")) { log.info("Making a request for ANY_HOST " + preferredHost ); - issuedRequest = new AMRMClient.ContainerRequest(capability, null, null, priority); + issuedRequest = new AMRMClient.ContainerRequest(capability, null, null, priority, true, containerLabel); } else { @@ -202,7 +203,9 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement capability, new String[]{preferredHost}, null, - priority); + priority, + true, + containerLabel); } //ensure that updating the state and making the request are done atomically. synchronized (lock) { http://git-wip-us.apache.org/repos/asf/samza/blob/9b0861f1/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala index 0998c43..bed1892 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala @@ -94,6 +94,7 @@ class ClientHelper(conf: Configuration) extends Logging { val mem = yarnConfig.getAMContainerMaxMemoryMb val cpu = 1 val queueName = Option(yarnConfig.getQueueName) + val appMasterLabel = Option(yarnConfig.getAMContainerLabel) // If we are asking for memory more than the max allowed, shout out if (mem > newAppResponse.getMaximumResourceCapability().getMemory()) { @@ -123,6 +124,14 @@ class ClientHelper(conf: Configuration) extends Logging { case None => { appCtx.setApplicationName(appId.get.toString) } } + appMasterLabel match { + case Some(label) => { + appCtx.setNodeLabelExpression(label) + info("set yarn node label expression to %s" format queueName) + } + case None => + } + queueName match { case Some(queueName) => { appCtx.setQueue(queueName)
