Repository: samza
Updated Branches:
  refs/heads/master 4aae9ad8c -> 9b0861f1e


SAMZA-1013: Add node labeling support for Yarn


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/9b0861f1
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/9b0861f1
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/9b0861f1

Branch: refs/heads/master
Commit: 9b0861f1e776248f25519c04e875e97ecba0a389
Parents: 4aae9ad
Author: Maxim Logvinenko <[email protected]>
Authored: Thu Oct 20 17:57:40 2016 -0700
Committer: Xinyu Liu <[email protected]>
Committed: Thu Oct 20 17:57:40 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/samza/config/YarnConfig.java  | 18 ++++++++++++++++++
 .../job/yarn/YarnClusterResourceManager.java      |  7 +++++--
 .../org/apache/samza/job/yarn/ClientHelper.scala  |  9 +++++++++
 3 files changed, 32 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/9b0861f1/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java 
b/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java
index 8f2dc48..2970905 100644
--- a/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java
+++ b/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java
@@ -46,6 +46,11 @@ public class YarnConfig extends MapConfig {
   private static final int DEFAULT_CPU_CORES = 1;
 
   /**
+   * Label to request from YARN for containers
+   */
+  public static final String CONTAINER_LABEL = "yarn.container.label";
+
+  /**
    * Maximum number of times the AM tries to restart a failed container
    */
   public static final String CONTAINER_RETRY_COUNT = 
"yarn.container.retry.count";
@@ -76,6 +81,11 @@ public class YarnConfig extends MapConfig {
   private static final int DEFAULT_AM_CONTAINER_MAX_MEMORY_MB = 1024;
 
   /**
+   * Label to request from YARN for running the AM
+   */
+  public static final String AM_CONTAINER_LABEL = "yarn.am.container.label";
+
+  /**
    * Determines the interval for the Heartbeat between the AM and the Yarn RM
    */
   public static final String AM_POLL_INTERVAL_MS = "yarn.am.poll.interval.ms";
@@ -154,6 +164,10 @@ public class YarnConfig extends MapConfig {
     return getInt(CONTAINER_MAX_CPU_CORES, DEFAULT_CPU_CORES);
   }
 
+  public String getContainerLabel() {
+    return get(CONTAINER_LABEL, null);
+  }
+
   public boolean getJmxServerEnabled() {
     return getBoolean(AM_JMX_ENABLED, true);
   }
@@ -170,6 +184,10 @@ public class YarnConfig extends MapConfig {
     return getInt(AM_CONTAINER_MAX_MEMORY_MB, 
DEFAULT_AM_CONTAINER_MAX_MEMORY_MB);
   }
 
+  public String getAMContainerLabel() {
+    return get(AM_CONTAINER_LABEL, null);
+  }
+
   public String getAmOpts() {
     return get(AM_JVM_OPTIONS, "");
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/9b0861f1/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
 
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
index 96d3d7c..c1b71bb 100644
--- 
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
+++ 
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
@@ -184,6 +184,7 @@ public class YarnClusterResourceManager extends 
ClusterResourceManager implement
 
     int memoryMb = resourceRequest.getMemoryMB();
     int cpuCores = resourceRequest.getNumCores();
+    String containerLabel = yarnConfig.getContainerLabel();
     String preferredHost = resourceRequest.getPreferredHost();
     Resource capability = Resource.newInstance(memoryMb, cpuCores);
     Priority priority =  Priority.newInstance(DEFAULT_PRIORITY);
@@ -193,7 +194,7 @@ public class YarnClusterResourceManager extends 
ClusterResourceManager implement
     if (preferredHost.equals("ANY_HOST"))
     {
       log.info("Making a request for ANY_HOST " + preferredHost );
-      issuedRequest = new AMRMClient.ContainerRequest(capability, null, null, 
priority);
+      issuedRequest = new AMRMClient.ContainerRequest(capability, null, null, 
priority, true, containerLabel);
     }
     else
     {
@@ -202,7 +203,9 @@ public class YarnClusterResourceManager extends 
ClusterResourceManager implement
               capability,
               new String[]{preferredHost},
               null,
-              priority);
+              priority,
+              true,
+              containerLabel);
     }
     //ensure that updating the state and making the request are done 
atomically.
     synchronized (lock) {

http://git-wip-us.apache.org/repos/asf/samza/blob/9b0861f1/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
index 0998c43..bed1892 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
@@ -94,6 +94,7 @@ class ClientHelper(conf: Configuration) extends Logging {
     val mem = yarnConfig.getAMContainerMaxMemoryMb
     val cpu = 1
     val queueName = Option(yarnConfig.getQueueName)
+    val appMasterLabel = Option(yarnConfig.getAMContainerLabel)
 
     // If we are asking for memory more than the max allowed, shout out
     if (mem > newAppResponse.getMaximumResourceCapability().getMemory()) {
@@ -123,6 +124,14 @@ class ClientHelper(conf: Configuration) extends Logging {
       case None => { appCtx.setApplicationName(appId.get.toString) }
     }
 
+    appMasterLabel match {
+      case Some(label) => {
+        appCtx.setNodeLabelExpression(label)
+        info("set yarn node label expression to %s" format queueName)
+      }
+      case None =>
+    }
+
     queueName match {
       case Some(queueName) => {
         appCtx.setQueue(queueName)

Reply via email to