SAMZA-767 - yarn.queue option is not used anywhere
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/e8a2ef5e Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/e8a2ef5e Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/e8a2ef5e Branch: refs/heads/samza-sql Commit: e8a2ef5efac39f56120c19abb50d84433df91b02 Parents: 092e381 Author: Aleksandar Pejakovic <[email protected]> Authored: Thu Nov 19 14:52:42 2015 -0800 Committer: Navina <[email protected]> Committed: Thu Nov 19 14:52:42 2015 -0800 ---------------------------------------------------------------------- .../src/main/java/org/apache/samza/config/YarnConfig.java | 8 ++++++++ .../scala/org/apache/samza/job/yarn/ClientHelper.scala | 10 +++++++++- .../main/scala/org/apache/samza/job/yarn/YarnJob.scala | 4 ++-- 3 files changed, 19 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/e8a2ef5e/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java b/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java index a572aa2..c556d83 100644 --- a/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java +++ b/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java @@ -35,6 +35,11 @@ public class YarnConfig extends MapConfig { private static final int DEFAULT_CONTAINER_MEM = 1024; /** + * Name of YARN queue to run jobs on + */ + public static final String QUEUE_NAME = "yarn.queue"; + + /** * Number of CPU cores to request from YARN per container */ public static final String CONTAINER_MAX_CPU_CORES = "yarn.container.cpu.cores"; @@ -144,6 +149,9 @@ public class YarnConfig extends MapConfig { return get(AM_JVM_OPTIONS, ""); } + public String getQueueName() { + return get(QUEUE_NAME, null); + } public String getAMJavaHome() { return get(AM_JAVA_HOME, null); http://git-wip-us.apache.org/repos/asf/samza/blob/e8a2ef5e/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala index a2b9279..74a0676 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala @@ -66,7 +66,7 @@ class ClientHelper(conf: Configuration) extends Logging { /** * Generate an application and submit it to the resource manager to start an application master */ - def submitApplication(packagePath: Path, memoryMb: Int, cpuCore: Int, cmds: List[String], env: Option[Map[String, String]], name: Option[String]): Option[ApplicationId] = { + def submitApplication(packagePath: Path, memoryMb: Int, cpuCore: Int, cmds: List[String], env: Option[Map[String, String]], name: Option[String], queueName: Option[String]): Option[ApplicationId] = { val app = yarnClient.createApplication val newAppResponse = app.getNewApplicationResponse var mem = memoryMb @@ -106,6 +106,14 @@ class ClientHelper(conf: Configuration) extends Logging { case None => None } + queueName match { + case Some(queueName) => { + appCtx.setQueue(queueName) + info("set yarn queue name to %s" format queueName) + } + case None => None + } + // set the local package so that the containers and app master are provisioned with it val packageUrl = ConverterUtils.getYarnUrlFromPath(packagePath) val fileStatus = packagePath.getFileSystem(conf).getFileStatus(packagePath) http://git-wip-us.apache.org/repos/asf/samza/blob/e8a2ef5e/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala index 02f46a1..1aa26bb 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala @@ -73,8 +73,8 @@ class YarnJob(config: Config, hadoopConfig: Configuration) extends StreamJob { } envMapWithJavaHome }), - Some("%s_%s" format (config.getName.get, config.getJobId.getOrElse(1))) - ) + Some("%s_%s" format (config.getName.get, config.getJobId.getOrElse(1))), + Option(yarnConfig.getQueueName)) this }
