Repository: samza
Updated Branches:
  refs/heads/master 76196b63c -> 725a52603


SAMZA-727: support Kerberos


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/725a5260
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/725a5260
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/725a5260

Branch: refs/heads/master
Commit: 725a526031848ceae94c8e079feb40af353daff8
Parents: 76196b6
Author: Chen Song <[email protected]>
Authored: Thu Jun 9 15:50:44 2016 -0700
Committer: Yi Pan (Data Infrastructure) <[email protected]>
Committed: Thu Jun 9 15:50:44 2016 -0700

----------------------------------------------------------------------
 build.gradle                                    |   1 +
 .../apache/samza/container/SecurityManager.java |  29 +++
 .../samza/container/SecurityManagerFactory.java |  29 +++
 .../org/apache/samza/config/JobConfig.scala     |   3 +
 .../apache/samza/container/SamzaContainer.scala |  30 +++
 .../org/apache/samza/config/YarnConfig.java     |  45 +++++
 .../apache/samza/job/yarn/ClientHelper.scala    | 184 ++++++++++++++++---
 .../apache/samza/job/yarn/SamzaAppMaster.scala  |  24 ++-
 .../yarn/SamzaAppMasterSecurityManager.scala    | 122 ++++++++++++
 .../samza/job/yarn/SamzaAppMasterService.scala  |  30 ++-
 .../yarn/SamzaContainerSecurityManager.scala    |  96 ++++++++++
 .../yarn/SamzaYarnSecurityManagerFactory.scala  |  30 +++
 .../org/apache/samza/job/yarn/YarnJob.scala     | 103 +++++------
 .../org/apache/samza/job/yarn/YarnJobUtil.scala | 109 +++++++++++
 .../samza/job/yarn/TestClientHelper.scala       |  91 +++++++++
 .../job/yarn/TestSamzaAppMasterService.scala    |   4 +-
 16 files changed, 839 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/725a5260/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 16facbb..ba4a9d1 100644
--- a/build.gradle
+++ b/build.gradle
@@ -316,6 +316,7 @@ project(":samza-yarn_$scalaVersion") {
     testCompile "junit:junit:$junitVersion"
     testCompile "org.mockito:mockito-all:$mockitoVersion"
     testCompile project(":samza-core_$scalaVersion").sourceSets.test.output
+    testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion"
   }
 
   repositories {

http://git-wip-us.apache.org/repos/asf/samza/blob/725a5260/samza-core/src/main/java/org/apache/samza/container/SecurityManager.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/container/SecurityManager.java 
b/samza-core/src/main/java/org/apache/samza/container/SecurityManager.java
new file mode 100644
index 0000000..eeb7ef6
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/SecurityManager.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container;
+
+/**
+ * Manage the security of the environment.
+ */
+public interface SecurityManager {
+  void start();
+
+  void stop();
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/725a5260/samza-core/src/main/java/org/apache/samza/container/SecurityManagerFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/container/SecurityManagerFactory.java
 
b/samza-core/src/main/java/org/apache/samza/container/SecurityManagerFactory.java
new file mode 100644
index 0000000..f2d80d1
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/container/SecurityManagerFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container;
+
+import org.apache.samza.config.Config;
+
+/**
+ * Create an instance of {@link SecurityManager}.
+ */
+public interface SecurityManagerFactory {
+  SecurityManager getSecurityManager(Config config);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/725a5260/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 
b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index 4d5ca4d..23a51b2 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -55,6 +55,7 @@ object JobConfig {
   val MONITOR_PARTITION_CHANGE = "job.coordinator.monitor-partition-change"
   val MONITOR_PARTITION_CHANGE_FREQUENCY_MS = 
"job.coordinator.monitor-partition-change.frequency.ms"
   val DEFAULT_MONITOR_PARTITION_CHANGE_FREQUENCY_MS = 300000
+  val JOB_SECURITY_MANAGER_FACTORY = "job.security.manager.factory"
 
   implicit def Config2Job(config: Config) = new JobConfig(config)
 
@@ -115,6 +116,8 @@ class JobConfig(config: Config) extends 
ScalaMapConfig(config) with Logging {
 
   def getSystemStreamPartitionGrouperFactory = 
getOption(JobConfig.SSP_GROUPER_FACTORY).getOrElse(classOf[GroupByPartitionFactory].getCanonicalName)
 
+  def getSecurityManagerFactory = 
getOption(JobConfig.JOB_SECURITY_MANAGER_FACTORY)
+
   val CHECKPOINT_SEGMENT_BYTES = "task.checkpoint.segment.bytes"
   val CHECKPOINT_REPLICATION_FACTOR = "task.checkpoint.replication.factor"
 

http://git-wip-us.apache.org/repos/asf/samza/blob/725a5260/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 951d479..086531e 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -313,6 +313,15 @@ object SamzaContainer extends Logging {
 
     info("Got metrics reporters: %s" format reporters.keys)
 
+    val securityManager = config.getSecurityManagerFactory match {
+      case Some(securityManagerFactoryClassName) =>
+        Util
+          .getObj[SecurityManagerFactory](securityManagerFactoryClassName)
+          .getSecurityManager(config)
+      case _ => null
+    }
+    info("Got security manager: %s" format securityManager)
+
     val coordinatorSystemProducer = new 
CoordinatorStreamSystemFactory().getCoordinatorStreamSystemProducer(config, 
samzaContainerMetrics.registry)
     val localityManager = new LocalityManager(coordinatorSystemProducer)
     val checkpointManager = config.getCheckpointManagerFactory() match {
@@ -545,6 +554,7 @@ object SamzaContainer extends Logging {
       producerMultiplexer = producerMultiplexer,
       offsetManager = offsetManager,
       localityManager = localityManager,
+      securityManager = securityManager,
       metrics = samzaContainerMetrics,
       reporters = reporters,
       jvm = jvm,
@@ -564,6 +574,7 @@ class SamzaContainer(
   diskSpaceMonitor: DiskSpaceMonitor = null,
   offsetManager: OffsetManager = new OffsetManager,
   localityManager: LocalityManager = null,
+  securityManager: SecurityManager = null,
   reporters: Map[String, MetricsReporter] = Map(),
   jvm: JvmMetrics = null) extends Runnable with Logging {
 
@@ -579,6 +590,7 @@ class SamzaContainer(
       startProducers
       startTask
       startConsumers
+      startSecurityManger
 
       info("Entering run loop.")
       runLoop.run
@@ -597,6 +609,7 @@ class SamzaContainer(
       shutdownLocalityManager
       shutdownOffsetManager
       shutdownMetrics
+      shutdownSecurityManger
 
       info("Shutdown complete.")
     }
@@ -699,6 +712,14 @@ class SamzaContainer(
     consumerMultiplexer.start
   }
 
+  def startSecurityManger: Unit = {
+    if (securityManager != null) {
+      info("Starting security manager.")
+
+      securityManager.start
+    }
+  }
+
   def shutdownConsumers {
     info("Shutting down consumer multiplexer.")
 
@@ -736,6 +757,7 @@ class SamzaContainer(
     offsetManager.stop
   }
 
+
   def shutdownMetrics {
     info("Shutting down metrics reporters.")
 
@@ -748,6 +770,14 @@ class SamzaContainer(
     }
   }
 
+  def shutdownSecurityManger: Unit = {
+    if (securityManager != null) {
+      info("Shutting down security manager.")
+
+      securityManager.stop
+    }
+  }
+
   def shutdownDiskSpaceMonitor: Unit = {
     if (diskSpaceMonitor != null) {
       info("Shutting down disk space monitor.")

http://git-wip-us.apache.org/repos/asf/samza/blob/725a5260/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java 
b/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java
index c556d83..8f2dc48 100644
--- a/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java
+++ b/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java
@@ -104,6 +104,31 @@ public class YarnConfig extends MapConfig {
   public static final String HOST_AFFINITY_ENABLED = 
"yarn.samza.host-affinity.enabled";
   private static final boolean DEFAULT_HOST_AFFINITY_ENABLED = false;
 
+  /**
+   * Principal used to log in on a Kerberized secure cluster
+   */
+  public static final String YARN_KERBEROS_PRINCIPAL = 
"yarn.kerberos.principal";
+
+  /**
+   * Key tab used to log in on a Kerberized secure cluster
+   */
+  public static final String YARN_KERBEROS_KEYTAB = "yarn.kerberos.keytab";
+
+  /**
+   * Interval in seconds to renew a delegation token in Kerberized secure 
cluster
+   */
+  public static final String YARN_TOKEN_RENEWAL_INTERVAL_SECONDS = 
"yarn.token.renewal.interval.seconds";
+  private static final long DEFAULT_YARN_TOKEN_RENEWAL_INTERVAL_SECONDS = 24 * 
3600;
+
+  /**
+   * The location on HDFS to store the credentials file
+   */
+  public static final String YARN_CREDENTIALS_FILE = "yarn.credentials.file";
+
+  /**
+   * The staging directory on HDFS for the job
+   */
+  public static final String YARN_JOB_STAGING_DIRECTORY = 
"yarn.job.staging.directory";
 
   public YarnConfig(Config config) {
     super(config);
@@ -168,4 +193,24 @@ public class YarnConfig extends MapConfig {
   public boolean getHostAffinityEnabled() {
     return getBoolean(HOST_AFFINITY_ENABLED, DEFAULT_HOST_AFFINITY_ENABLED);
   }
+
+  public String getYarnKerberosPrincipal() {
+    return get(YARN_KERBEROS_PRINCIPAL, null);
+  }
+
+  public String getYarnKerberosKeytab() {
+    return get(YARN_KERBEROS_KEYTAB, null);
+  }
+
+  public long getYarnTokenRenewalIntervalSeconds() {
+    return getLong(YARN_TOKEN_RENEWAL_INTERVAL_SECONDS, 
DEFAULT_YARN_TOKEN_RENEWAL_INTERVAL_SECONDS);
+  }
+
+  public String getYarnCredentialsFile() {
+    return get(YARN_CREDENTIALS_FILE, null);
+  }
+
+  public String getYarnJobStagingDirectory() {
+    return get(YARN_JOB_STAGING_DIRECTORY, null);
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/725a5260/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
index 74a0676..0998c43 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
@@ -19,14 +19,19 @@
 
 package org.apache.samza.job.yarn
 
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.samza.config.{JobConfig, Config, YarnConfig}
+import org.apache.samza.coordinator.stream.{CoordinatorStreamWriter}
+import org.apache.samza.coordinator.stream.messages.SetConfig
+
 import scala.collection.JavaConversions._
-import scala.collection.Map
+import scala.collection.{Map}
+import scala.collection.mutable.HashMap
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.yarn.api.records.ApplicationId
 import org.apache.hadoop.yarn.api.records.ApplicationReport
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
 import org.apache.hadoop.yarn.api.records.LocalResource
@@ -38,6 +43,8 @@ import org.apache.hadoop.yarn.client.api.YarnClient
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.util.ConverterUtils
 import org.apache.hadoop.yarn.util.Records
+import org.apache.hadoop.io.DataOutputBuffer
+import org.apache.hadoop.fs.FileSystem
 import org.apache.samza.SamzaException
 import org.apache.samza.job.ApplicationStatus
 import org.apache.samza.job.ApplicationStatus.New
@@ -45,10 +52,15 @@ import org.apache.samza.job.ApplicationStatus.Running
 import org.apache.samza.job.ApplicationStatus.SuccessfulFinish
 import org.apache.samza.job.ApplicationStatus.UnsuccessfulFinish
 import org.apache.samza.util.Logging
-import java.util.Collections
+import java.io.IOException
+import java.nio.ByteBuffer
 
 object ClientHelper {
   val applicationType = "Samza"
+
+  val CREDENTIALS_FILE = "credentials"
+
+  val SOURCE = "yarn"
 }
 
 /**
@@ -57,20 +69,31 @@ object ClientHelper {
  * container and its processes.
  */
 class ClientHelper(conf: Configuration) extends Logging {
-  val yarnClient = YarnClient.createYarnClient
-  info("trying to connect to RM %s" format 
conf.get(YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS))
-  yarnClient.init(conf);
-  yarnClient.start
-  var appId: Option[ApplicationId] = None
+  val yarnClient = createYarnClient
+
+  private[yarn] def createYarnClient() = {
+    val yarnClient = YarnClient.createYarnClient
+    info("trying to connect to RM %s" format 
conf.get(YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS))
+    yarnClient.init(conf)
+    yarnClient.start
+    yarnClient
+  }
+
+  var jobContext: JobContext = null
 
   /**
    * Generate an application and submit it to the resource manager to start an 
application master
    */
-  def submitApplication(packagePath: Path, memoryMb: Int, cpuCore: Int, cmds: 
List[String], env: Option[Map[String, String]], name: Option[String], 
queueName: Option[String]): Option[ApplicationId] = {
+  def submitApplication(config: Config, cmds: List[String], env: 
Option[Map[String, String]], name: Option[String]): Option[ApplicationId] = {
     val app = yarnClient.createApplication
     val newAppResponse = app.getNewApplicationResponse
-    var mem = memoryMb
-    var cpu = cpuCore
+
+    val yarnConfig = new YarnConfig(config)
+
+    val packagePath = new Path(yarnConfig.getPackagePath)
+    val mem = yarnConfig.getAMContainerMaxMemoryMb
+    val cpu = 1
+    val queueName = Option(yarnConfig.getQueueName)
 
     // If we are asking for memory more than the max allowed, shout out
     if (mem > newAppResponse.getMaximumResourceCapability().getMemory()) {
@@ -84,7 +107,9 @@ class ClientHelper(conf: Configuration) extends Logging {
         (cpu, newAppResponse.getMaximumResourceCapability().getVirtualCores()))
     }
 
-    appId = Some(newAppResponse.getApplicationId)
+    jobContext = new JobContext
+    jobContext.setAppId(newAppResponse.getApplicationId)
+    val appId = jobContext.getAppId
 
     info("preparing to request resources for app id %s" format appId.get)
 
@@ -95,15 +120,7 @@ class ClientHelper(conf: Configuration) extends Logging {
 
     name match {
       case Some(name) => { appCtx.setApplicationName(name) }
-      case None => { appCtx.setApplicationName(appId.toString) }
-    }
-
-    env match {
-      case Some(env) => {
-        containerCtx.setEnvironment(env)
-        info("set environment variables to %s for %s" format (env, appId.get))
-      }
-      case None => None
+      case None => { appCtx.setApplicationName(appId.get.toString) }
     }
 
     queueName match {
@@ -111,12 +128,13 @@ class ClientHelper(conf: Configuration) extends Logging {
         appCtx.setQueue(queueName)
         info("set yarn queue name to %s" format queueName)
       }
-      case None => None
+      case None =>
     }
 
     // set the local package so that the containers and app master are 
provisioned with it
     val packageUrl = ConverterUtils.getYarnUrlFromPath(packagePath)
-    val fileStatus = packagePath.getFileSystem(conf).getFileStatus(packagePath)
+    val fs = packagePath.getFileSystem(conf)
+    val fileStatus = fs.getFileStatus(packagePath)
 
     packageResource.setResource(packageUrl)
     info("set package url to %s for %s" format (packageUrl, appId.get))
@@ -133,9 +151,44 @@ class ClientHelper(conf: Configuration) extends Logging {
     appCtx.setResource(resource)
     containerCtx.setCommands(cmds.toList)
     info("set command to %s for %s" format (cmds, appId.get))
-    containerCtx.setLocalResources(Collections.singletonMap("__package", 
packageResource))
+
     appCtx.setApplicationId(appId.get)
     info("set app ID to %s" format appId.get)
+
+    val localResources: HashMap[String, LocalResource] = HashMap[String, 
LocalResource]()
+    localResources += "__package" -> packageResource
+
+    if (UserGroupInformation.isSecurityEnabled()) {
+      validateJobConfig(config)
+
+      setupSecurityToken(fs, containerCtx)
+      info("set security token for %s" format appId.get)
+
+      val amLocalResources = setupAMLocalResources(fs, 
Option(yarnConfig.getYarnKerberosPrincipal), 
Option(yarnConfig.getYarnKerberosKeytab))
+      localResources ++= amLocalResources
+
+      val securityYarnConfig  = getSecurityYarnConfig
+      val coordinatorStreamWriter: CoordinatorStreamWriter = new 
CoordinatorStreamWriter(config)
+      coordinatorStreamWriter.start()
+
+      securityYarnConfig.foreach {
+        case (key: String, value: String) =>
+          coordinatorStreamWriter.sendMessage(SetConfig.TYPE, key, value)
+      }
+      coordinatorStreamWriter.stop()
+    }
+
+    containerCtx.setLocalResources(localResources)
+    info("set local resources on application master for %s" format appId.get)
+
+    env match {
+      case Some(env) => {
+        containerCtx.setEnvironment(env)
+        info("set environment variables to %s for %s" format (env, appId.get))
+      }
+      case None =>
+    }
+
     appCtx.setAMContainerSpec(containerCtx)
     appCtx.setApplicationType(ClientHelper.applicationType)
     info("submitting application request for %s" format appId.get)
@@ -178,4 +231,87 @@ class ClientHelper(conf: Configuration) extends Logging {
       case _ => Some(Running)
     }
   }
+
+  private[yarn] def validateJobConfig(config: Config) = {
+    import org.apache.samza.config.JobConfig.Config2Job
+
+    if (config.getSecurityManagerFactory.isEmpty) {
+      throw new SamzaException(s"Job config 
${JobConfig.JOB_SECURITY_MANAGER_FACTORY} not found. This config must be set 
for a secure cluster")
+    }
+  }
+
+  private def setupSecurityToken(fs: FileSystem, amContainer: 
ContainerLaunchContext): Unit = {
+    info("security is enabled")
+    val credentials = UserGroupInformation.getCurrentUser.getCredentials
+    val tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL);
+    if (tokenRenewer == null || tokenRenewer.length() == 0) {
+      throw new IOException(
+        "Can't get Master Kerberos principal for the RM to use as renewer");
+    }
+
+    val tokens =
+      fs.addDelegationTokens(tokenRenewer, credentials)
+    tokens.foreach { token => info("Got dt for " + fs.getUri() + "; " + token) 
}
+    val dob = new DataOutputBuffer
+    credentials.writeTokenStorageToStream(dob)
+    amContainer.setTokens(ByteBuffer.wrap(dob.getData))
+  }
+
+  private[yarn] def setupAMLocalResources(fs: FileSystem, principal: 
Option[String], keytab: Option[String]) = {
+    if (principal.isEmpty || keytab.isEmpty) {
+      throw new SamzaException("You need to set both %s and %s on a secure 
cluster" format(YarnConfig.YARN_KERBEROS_PRINCIPAL, YarnConfig
+        .YARN_KERBEROS_KEYTAB))
+    }
+
+    val localResources = HashMap[String, LocalResource]()
+
+    // create application staging dir
+    val created = YarnJobUtil.createStagingDir(jobContext, fs)
+    created match {
+      case Some(appStagingDir) =>
+        jobContext.setAppStagingDir(appStagingDir)
+        val destFilePath = addLocalFile(fs, keytab.get, appStagingDir)
+        localResources.put(destFilePath.getName(), getLocalResource(fs, 
destFilePath, LocalResourceType.FILE))
+        localResources.toMap
+      case None => throw new SamzaException(s"Failed to create staging 
directory for %s" format jobContext.getAppId)
+    }
+  }
+
+  private def addLocalFile(fs: FileSystem, localFile: String, destDirPath: 
Path) = {
+    val srcFilePath = new Path(localFile)
+    val destFilePath = new Path(destDirPath, srcFilePath.getName)
+    fs.copyFromLocalFile(srcFilePath, destFilePath)
+    val localFilePermission = 
FsPermission.createImmutable(Integer.parseInt("400", 8).toShort)
+    fs.setPermission(destFilePath, localFilePermission)
+    destFilePath
+  }
+
+  private def getLocalResource(fs: FileSystem, destFilePath: Path, 
resourceType: LocalResourceType) = {
+    val localResource = Records.newRecord(classOf[LocalResource])
+    val fileStatus = fs.getFileStatus(destFilePath)
+    localResource.setResource(ConverterUtils.getYarnUrlFromPath(destFilePath))
+    localResource.setSize(fileStatus.getLen())
+    localResource.setTimestamp(fileStatus.getModificationTime())
+    localResource.setType(resourceType)
+    localResource.setVisibility(LocalResourceVisibility.APPLICATION)
+    localResource
+  }
+
+  private[yarn] def getSecurityYarnConfig = {
+    val stagingDir = jobContext.getAppStagingDir.get
+    val credentialsFile = new Path(stagingDir, ClientHelper.CREDENTIALS_FILE)
+    val jobConfigs = HashMap[String, String]()
+
+    jobConfigs += YarnConfig.YARN_CREDENTIALS_FILE -> credentialsFile.toString
+    jobConfigs += YarnConfig.YARN_JOB_STAGING_DIRECTORY -> stagingDir.toString
+
+    jobConfigs.toMap
+  }
+
+  /**
+    * Cleanup application staging directory.
+    */
+  def cleanupStagingDir(): Unit = {
+    YarnJobUtil.cleanupStagingDir(jobContext, FileSystem.get(conf))
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/725a5260/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
index 7bd8131..1fb18be 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
@@ -19,9 +19,14 @@
 
 package org.apache.samza.job.yarn
 
+import java.io.IOException
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{Path, FileSystem}
+
 import scala.collection.JavaConversions.asScalaBuffer
 import org.apache.hadoop.yarn.api.ApplicationConstants
-import org.apache.hadoop.yarn.api.records.{ Container, ContainerStatus, 
NodeReport }
+import org.apache.hadoop.yarn.api.records.{FinalApplicationStatus, Container, 
ContainerStatus, NodeReport}
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync
 import org.apache.hadoop.yarn.conf.YarnConfiguration
@@ -85,17 +90,20 @@ object SamzaAppMaster extends Logging with 
AMRMClientAsync.CallbackHandler {
     val containerMem = yarnConfig.getContainerMaxMemoryMb
     val containerCpu = yarnConfig.getContainerMaxCpuCores
     val jmxServer = if (yarnConfig.getJmxServerEnabled) Some(new JmxServer()) 
else None
+    val jobContext = new JobContext
+    Option(yarnConfig.getYarnJobStagingDirectory).map {
+      jobStagingDirectory => jobContext.setAppStagingDir(new 
Path(jobStagingDirectory))
+    }
 
+    // wire up all of the yarn event listeners
+    val state = new SamzaAppState(jobCoordinator, -1, containerId, 
nodeHostString, nodePortString.toInt, nodeHttpPortString.toInt)
     try {
-      // wire up all of the yarn event listeners
-      val state = new SamzaAppState(jobCoordinator, -1, containerId, 
nodeHostString, nodePortString.toInt, nodeHttpPortString.toInt)
-
       if (jmxServer.isDefined) {
         state.jmxUrl = jmxServer.get.getJmxUrl
         state.jmxTunnelingUrl = jmxServer.get.getTunnelingJmxUrl
       }
 
-      val service = new SamzaAppMasterService(config, state, registry, 
clientHelper)
+      val service = new SamzaAppMasterService(config, state, registry, 
clientHelper, hConfig)
       val lifecycle = new SamzaAppMasterLifecycle(containerMem, containerCpu, 
state, amClient)
       val metrics = new SamzaAppMasterMetrics(config, state, registry)
       val taskManager = new SamzaTaskManager(config, state, amClient, hConfig)
@@ -103,6 +111,10 @@ object SamzaAppMaster extends Logging with 
AMRMClientAsync.CallbackHandler {
       listeners =  List(service, lifecycle, metrics, taskManager)
       run(amClient, listeners, hConfig, interval)
     } finally {
+      if (state.status != FinalApplicationStatus.UNDEFINED) {
+        YarnJobUtil.cleanupStagingDir(jobContext, FileSystem.get(hConfig))
+      }
+
       // jmxServer has to be stopped or will prevent process from exiting.
       if (jmxServer.isDefined) {
         jmxServer.get.stop
@@ -134,6 +146,7 @@ object SamzaAppMaster extends Logging with 
AMRMClientAsync.CallbackHandler {
       } catch {
         case e: Exception => warn("Listener %s failed to shutdown." format 
listener, e)
       })
+
       // amClient has to be stopped
       amClient.stop
     }
@@ -156,5 +169,4 @@ object SamzaAppMaster extends Logging with 
AMRMClientAsync.CallbackHandler {
     error("Error occured in amClient's callback", e)
     storedException = e
   }
-
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/725a5260/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterSecurityManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterSecurityManager.scala
 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterSecurityManager.scala
new file mode 100644
index 0000000..2eaa19f
--- /dev/null
+++ 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterSecurityManager.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.yarn
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{TimeUnit, Executors}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{Path, FileSystem}
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+import org.apache.samza.SamzaException
+import org.apache.samza.config.{YarnConfig, Config}
+import org.apache.samza.util.{DaemonThreadFactory, Logging}
+import org.apache.samza.container.SecurityManager
+
+object SamzaAppMasterSecurityManager {
+  val TOKEN_RENEW_THREAD_NAME_PREFIX = "TOKEN-RENEW-PREFIX"
+}
+
+/**
+  * The SamzaAppMasterSecurityManager is responsible for renewing and 
distributing HDFS delegation tokens on a secure YARN
+  * cluster.
+  *
+  * <p />
+  *
+  * It runs in a daemon thread and periodically requests new delegation tokens 
and writes the fresh tokens in a configured
+  * staging directory at the configured frequency.
+  *
+  * @param config     Samza config for the application
+  * @param hadoopConf the hadoop configuration
+  */
+class SamzaAppMasterSecurityManager(config: Config, hadoopConf: Configuration) 
extends SecurityManager with Logging {
+  private val tokenRenewExecutor = 
Executors.newSingleThreadScheduledExecutor(new 
DaemonThreadFactory(SamzaAppMasterSecurityManager
+    .TOKEN_RENEW_THREAD_NAME_PREFIX))
+
+  def start() = {
+    val yarnConfig = new YarnConfig(config)
+    val principal = yarnConfig.getYarnKerberosPrincipal
+    // only get the name part of the keytab config, the keytab file will in 
the working directory
+    val keytab = new Path(yarnConfig.getYarnKerberosKeytab).getName
+    val renewalInterval = yarnConfig.getYarnTokenRenewalIntervalSeconds
+    val credentialsFile = yarnConfig.getYarnCredentialsFile
+
+    val tokenRenewRunnable = new Runnable {
+      override def run(): Unit = {
+        try {
+          loginFromKeytab(principal, keytab, credentialsFile)
+        } catch {
+          case e: Exception =>
+            warn("Failed to renew token and write out new credentials", e)
+        }
+      }
+    }
+
+    tokenRenewExecutor.scheduleAtFixedRate(tokenRenewRunnable, 
renewalInterval, renewalInterval, TimeUnit.SECONDS)
+  }
+
+  private def loginFromKeytab(principal: String, keytab: String, 
credentialsFile: String) = {
+    info(s"Logging to KDC using principal: $principal")
+    UserGroupInformation.loginUserFromKeytab(principal, keytab)
+    val keytabUser = 
UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)
+    val credentials = keytabUser.getCredentials
+
+    // get the new delegation token from the key tab user
+    keytabUser.doAs(new PrivilegedExceptionAction[Void] {
+      override def run(): Void = {
+        getNewDelegationToken(credentials)
+        null
+      }
+    })
+
+    UserGroupInformation.getCurrentUser.addCredentials(credentials)
+
+    val credentialsFilePath = new Path(credentialsFile)
+    writeNewDelegationToken(credentialsFilePath, credentials)
+  }
+
+  private def getNewDelegationToken(credentials: Credentials) = {
+    val fs = FileSystem.get(hadoopConf)
+    val tokenRenewer = UserGroupInformation.getCurrentUser.getShortUserName
+    // HDFS will not issue new delegation token if there are existing ones in 
the credentials. This is hacked
+    // by creating a new credentials object from the login via the given 
keytab and principle, passing the new
+    // credentials object to FileSystem.addDelegationTokens to force a new 
delegation token created and adding
+    // the credentials to the current user's credential object
+    fs.addDelegationTokens(tokenRenewer, credentials)
+  }
+
+  private def writeNewDelegationToken(credentialsFilePath: Path, credentials: 
Credentials) = {
+    val fs = FileSystem.get(hadoopConf)
+    if (fs.exists(credentialsFilePath)) {
+      logger.info(s"Deleting existing credentials file $credentialsFilePath")
+      val success = fs.delete(credentialsFilePath, false)
+      if (!success) {
+        throw new SamzaException(s"Failed deleting existing credentials file 
$credentialsFilePath")
+      }
+    }
+
+    logger.info(s"Writing new delegation to the token file 
$credentialsFilePath")
+    credentials.writeTokenStorageFile(credentialsFilePath, hadoopConf)
+  }
+
+  def stop() = {
+    tokenRenewExecutor.shutdown
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/725a5260/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala
 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala
index 3adf86f..979d81d 100644
--- 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala
+++ 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala
@@ -19,6 +19,8 @@
 
 package org.apache.samza.job.yarn
 
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.samza.coordinator.stream.CoordinatorStreamWriter
 import org.apache.samza.coordinator.stream.messages.SetConfig
 import org.apache.samza.util.Logging
@@ -31,14 +33,19 @@ import org.apache.samza.webapp.ApplicationMasterRestServlet
 import org.apache.samza.webapp.ApplicationMasterWebServlet
 
 /**
- * Samza's application master runs a very basic HTTP/JSON service to allow
- * dashboards to check on the status of a job. SamzaAppMasterService starts
- * up the web service when initialized.
- */
-class SamzaAppMasterService(config: Config, state: SamzaAppState, registry: 
ReadableMetricsRegistry, clientHelper: ClientHelper) extends 
YarnAppMasterListener with Logging {
+  * Samza's application master runs a very basic HTTP/JSON service to allow
+  * dashboards to check on the status of a job. SamzaAppMasterService starts
+  * up the web service when initialized.
+  * <p />
+  * Besides the HTTP/JSON service endpoints, it also starts an optional
+  * SecurityManager which takes care of the security needs when running in
+  * a secure environment.
+  */
+class SamzaAppMasterService(config: Config, state: SamzaAppState, registry: 
ReadableMetricsRegistry, clientHelper: ClientHelper, yarnConfiguration: 
YarnConfiguration) extends YarnAppMasterListener with Logging {
   var rpcApp: HttpServer = null
   var webApp: HttpServer = null
   val SERVER_URL_OPT: String = "samza.autoscaling.server.url"
+  var securityManager: Option[SamzaAppMasterSecurityManager] = None
 
   override def onInit() {
     // try starting the samza AM dashboard at a random rpc and tracking port
@@ -65,6 +72,15 @@ class SamzaAppMasterService(config: Config, state: 
SamzaAppState, registry: Read
     debug("sent server url message with value: %s " format 
state.coordinatorUrl.toString)
 
     info("Webapp is started at (rpc %s, tracking %s, coordinator %s)" 
format(state.rpcUrl, state.trackingUrl, state.coordinatorUrl))
+
+    // start YarnSecurityManger for a secure cluster
+    if (UserGroupInformation.isSecurityEnabled) {
+      securityManager = Option {
+        val securityManager = new SamzaAppMasterSecurityManager(config, 
yarnConfiguration)
+        securityManager.start
+        securityManager
+      }
+    }
   }
 
   override def onShutdown() {
@@ -77,5 +93,9 @@ class SamzaAppMasterService(config: Config, state: 
SamzaAppState, registry: Read
     }
 
     state.jobCoordinator.stop
+
+    securityManager.map {
+      securityManager => securityManager.stop
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/725a5260/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaContainerSecurityManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaContainerSecurityManager.scala
 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaContainerSecurityManager.scala
new file mode 100644
index 0000000..10b971f
--- /dev/null
+++ 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaContainerSecurityManager.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.yarn
+
+import java.util.concurrent.{TimeUnit, Executors}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.security.{UserGroupInformation, Credentials}
+import org.apache.samza.config.{Config, YarnConfig}
+import org.apache.samza.util.{Logging, DaemonThreadFactory}
+import org.apache.samza.container.SecurityManager
+
+object SamzaContainerSecurityManager {
+  val TOKEN_RENEW_THREAD_NAME_PREFIX = "TOKEN-RENEW-PREFIX"
+  val INITIAL_DELAY_IN_SECONDS = 60
+}
+
+class SamzaContainerSecurityManager(config: Config, hadoopConfig: 
Configuration) extends SecurityManager with Logging {
+  private val tokenRenewExecutor = Executors.newScheduledThreadPool(1, new 
DaemonThreadFactory(SamzaContainerSecurityManager.TOKEN_RENEW_THREAD_NAME_PREFIX))
+  private var lastRefreshTimestamp = 0L
+
+
+  def start() = {
+    val yarnConfig = new YarnConfig(config)
+    val renewalInterval = yarnConfig.getYarnTokenRenewalIntervalSeconds
+    val tokenFilePath = new Path(yarnConfig.getYarnCredentialsFile)
+
+    val tokenRenewRunnable = new Runnable {
+      override def run(): Unit = {
+        try {
+          val fs = FileSystem.get(hadoopConfig)
+          if (fs.exists(tokenFilePath)) {
+            val fileStatus = fs.getFileStatus(tokenFilePath)
+            if (lastRefreshTimestamp > fileStatus.getModificationTime) {
+              // credentials have not been updated, retry after 5 minutes
+              info("Expecting to update delegation tokens, but AM has not 
updated credentials file yet, will retry in 5 minutes")
+              tokenRenewExecutor.schedule(this, 5, TimeUnit.MINUTES)
+            } else {
+              val credentials = getCredentialsFromHDFS(fs, tokenFilePath)
+              UserGroupInformation.getCurrentUser.addCredentials(credentials)
+              info("Successfully renewed tokens from credentials file")
+              lastRefreshTimestamp = System.currentTimeMillis
+              info(s"Schedule the next fetch in $renewalInterval seconds")
+              tokenRenewExecutor.schedule(this, renewalInterval, 
TimeUnit.SECONDS)
+            }
+          } else {
+            info(s"Credentials file not found yet. Schedule the next fetch in 
$renewalInterval seconds")
+            tokenRenewExecutor.schedule(this, renewalInterval, 
TimeUnit.SECONDS)
+          }
+        } catch {
+          case e: Exception =>
+            val retrySeconds = Math.min(renewalInterval, 3600)
+            warn(s"Failed to renew tokens, will retry in $retrySeconds 
seconds", e)
+
+            tokenRenewExecutor.schedule(this, retrySeconds, TimeUnit.SECONDS)
+        }
+      }
+    }
+
+    info(s"Schedule the next fetch in ${renewalInterval + 
SamzaContainerSecurityManager.INITIAL_DELAY_IN_SECONDS} seconds")
+    tokenRenewExecutor.schedule(tokenRenewRunnable, renewalInterval + 
SamzaContainerSecurityManager.INITIAL_DELAY_IN_SECONDS, TimeUnit.SECONDS)
+  }
+
+  private def getCredentialsFromHDFS(fs: FileSystem, tokenPath: Path): 
Credentials = {
+    val stream = fs.open(tokenPath)
+    try {
+      val newCredentials = new Credentials()
+      newCredentials.readTokenStorageStream(stream)
+      newCredentials
+    } finally {
+      stream.close()
+    }
+  }
+
+  def stop(): Unit = {
+    tokenRenewExecutor.shutdown()
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/725a5260/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnSecurityManagerFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnSecurityManagerFactory.scala
 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnSecurityManagerFactory.scala
new file mode 100644
index 0000000..824e81d
--- /dev/null
+++ 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnSecurityManagerFactory.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job.yarn
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.samza.config.Config
+import org.apache.samza.container.{SecurityManager, SecurityManagerFactory}
+
+class SamzaYarnSecurityManagerFactory extends SecurityManagerFactory {
+  override def getSecurityManager(config: Config): SecurityManager = {
+    val yarnConfig = new YarnConfiguration
+    new SamzaContainerSecurityManager(config, yarnConfig)
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/725a5260/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
index 62ddb26..3ca1f0d 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
@@ -19,31 +19,15 @@
 
 package org.apache.samza.job.yarn
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.yarn.api.records.ApplicationId
 import org.apache.hadoop.yarn.api.ApplicationConstants
-import org.apache.samza.config.Config
-import org.apache.samza.config.JobConfig
-import org.apache.samza.util.Util
-import org.apache.samza.job.ApplicationStatus
-import org.apache.samza.job.ApplicationStatus.Running
-import org.apache.samza.job.StreamJob
-import org.apache.samza.job.ApplicationStatus.SuccessfulFinish
-import org.apache.samza.job.ApplicationStatus.UnsuccessfulFinish
+import org.apache.hadoop.yarn.api.records.ApplicationId
 import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.config.YarnConfig
-import org.apache.samza.config.ShellCommandConfig
-import org.apache.samza.SamzaException
+import org.apache.samza.config.{Config, JobConfig, ShellCommandConfig, 
YarnConfig}
+import org.apache.samza.job.ApplicationStatus.{Running, SuccessfulFinish, 
UnsuccessfulFinish}
+import org.apache.samza.job.{ApplicationStatus, StreamJob}
 import org.apache.samza.serializers.model.SamzaObjectMapper
-import org.apache.samza.config.JobConfig.Config2Job
+import org.apache.samza.util.Util
 import org.slf4j.LoggerFactory
-import scala.collection.JavaConversions._
-import org.apache.samza.config.MapConfig
-import org.apache.samza.config.ConfigException
-import org.apache.samza.config.SystemConfig
-
-
 
 /**
  * Starts the application manager
@@ -56,36 +40,40 @@ class YarnJob(config: Config, hadoopConfig: Configuration) 
extends StreamJob {
   val logger = LoggerFactory.getLogger(this.getClass)
 
   def submit: YarnJob = {
+    try {
+      val cmdExec = buildAmCmd()
+
+      appId = client.submitApplication(
+        config,
+        List(
+          // we need something like this:
+          //"export SAMZA_LOG_DIR=%s && ln -sfn %s logs && exec 
<fwk_path>/bin/run-am.sh 1>logs/%s 2>logs/%s"
+
+          "export SAMZA_LOG_DIR=%s && ln -sfn %s logs && exec %s 1>logs/%s 
2>logs/%s"
+            format (ApplicationConstants.LOG_DIR_EXPANSION_VAR, 
ApplicationConstants.LOG_DIR_EXPANSION_VAR,
+            cmdExec, ApplicationConstants.STDOUT, 
ApplicationConstants.STDERR)),
+        Some({
+          val coordinatorSystemConfig = 
Util.buildCoordinatorStreamConfig(config)
+          val envMap = Map(
+            ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG -> 
Util.envVarEscape(SamzaObjectMapper.getObjectMapper.writeValueAsString
+            (coordinatorSystemConfig)),
+            ShellCommandConfig.ENV_JAVA_OPTS -> 
Util.envVarEscape(yarnConfig.getAmOpts))
+          val amJavaHome = yarnConfig.getAMJavaHome
+          val envMapWithJavaHome = if (amJavaHome == null) {
+            envMap
+          } else {
+            envMap + (ShellCommandConfig.ENV_JAVA_HOME -> amJavaHome)
+          }
+          envMapWithJavaHome
+        }),
+        Some("%s_%s" format(config.getName.get, config.getJobId.getOrElse(1)))
+      )
+    } catch {
+      case e: Throwable =>
+        client.cleanupStagingDir
+        throw e
+    }
 
-    val cmdExec = buildAmCmd()
-
-    appId = client.submitApplication(
-      new Path(yarnConfig.getPackagePath),
-      yarnConfig.getAMContainerMaxMemoryMb,
-      1,
-      List(
-       // we need something like this:
-       //"export SAMZA_LOG_DIR=%s && ln -sfn %s logs && exec 
<fwk_path>/bin/run-am.sh 1>logs/%s 2>logs/%s"
-
-        "export SAMZA_LOG_DIR=%s && ln -sfn %s logs && exec %s 1>logs/%s 
2>logs/%s"
-           format (ApplicationConstants.LOG_DIR_EXPANSION_VAR, 
ApplicationConstants.LOG_DIR_EXPANSION_VAR,
-                   cmdExec, ApplicationConstants.STDOUT, 
ApplicationConstants.STDERR)),
-
-      Some({
-        val coordinatorSystemConfig = Util.buildCoordinatorStreamConfig(config)
-        val envMap = Map(
-          ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG -> 
Util.envVarEscape(SamzaObjectMapper.getObjectMapper.writeValueAsString(coordinatorSystemConfig)),
-          ShellCommandConfig.ENV_JAVA_OPTS -> 
Util.envVarEscape(yarnConfig.getAmOpts))
-        val amJavaHome = yarnConfig.getAMJavaHome
-        val envMapWithJavaHome = if(amJavaHome == null) {
-          envMap
-        } else {
-          envMap + (ShellCommandConfig.ENV_JAVA_HOME -> amJavaHome)
-        }
-        envMapWithJavaHome
-      }),
-      Some("%s_%s" format (config.getName.get, config.getJobId.getOrElse(1))),
-      Option(yarnConfig.getQueueName))
     this
   }
 
@@ -118,8 +106,10 @@ class YarnJob(config: Config, hadoopConfig: Configuration) 
extends StreamJob {
 
     while (System.currentTimeMillis() - startTimeMs < timeoutMs) {
       Option(getStatus) match {
-        case Some(s) => if (SuccessfulFinish.equals(s) || 
UnsuccessfulFinish.equals(s)) return s
-        case None => null
+        case Some(s) => if (SuccessfulFinish.equals(s) || 
UnsuccessfulFinish.equals(s))
+          client.cleanupStagingDir
+          return s
+        case None =>
       }
 
       Thread.sleep(1000)
@@ -152,8 +142,13 @@ class YarnJob(config: Config, hadoopConfig: Configuration) 
extends StreamJob {
 
   def kill: YarnJob = {
     appId match {
-      case Some(appId) => client.kill(appId)
-      case None => None
+      case Some(appId) =>
+        try {
+          client.kill(appId)
+        } finally {
+          client.cleanupStagingDir
+        }
+      case None =>
     }
     this
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/725a5260/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobUtil.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobUtil.scala 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobUtil.scala
new file mode 100644
index 0000000..d7afa15
--- /dev/null
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobUtil.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.yarn
+
+import java.io.IOException
+
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.yarn.api.records.ApplicationId
+import org.apache.samza.util.Logging
+
+object JobContext {
+  val STAGING_DIR = ".samzaStaging"
+}
+
+/**
+  * JobContext is used to store the meta information about the job running on 
Yarn.
+  *
+  * Optionally, one can set Application Id and Application Staging Directory 
to the job context object.
+  */
+class JobContext {
+  private var appId: ApplicationId = null
+  private var appStagingDir: Path = null
+
+  def getAppId: Option[ApplicationId] = {
+    Option(appId)
+  }
+
+  def getAppStagingDir: Option[Path] = {
+    Option(appStagingDir)
+  }
+
+  def setAppId(appId: ApplicationId) = {
+    this.appId = appId
+  }
+
+  def setAppStagingDir(appStagingDir: Path) = {
+    this.appStagingDir = appStagingDir
+  }
+
+  /**
+    * Return the staging directory path to the given application.
+    */
+  def defaultAppStagingDir: Option[String] = {
+    getAppId.map(JobContext.STAGING_DIR + Path.SEPARATOR + _.toString)
+  }
+}
+
+object YarnJobUtil extends Logging {
+  /**
+    * Create the staging directory for the application.
+    *
+    * @param jobContext
+    * @param fs
+    * @return the Option of the Path object to the staging directory
+    */
+  def createStagingDir(jobContext: JobContext, fs: FileSystem) = {
+    val defaultStagingDir = jobContext.defaultAppStagingDir.map(new 
Path(fs.getHomeDirectory, _))
+    val stagingDir = jobContext.getAppStagingDir match {
+      case appStagingDir: Some[Path] => appStagingDir
+      case None => defaultStagingDir
+  }
+    stagingDir.map {
+      appStagingDir =>
+        val appStagingDirPermission = 
FsPermission.createImmutable(Integer.parseInt("700", 8).toShort)
+        if (FileSystem.mkdirs(fs, appStagingDir, appStagingDirPermission)) {
+          appStagingDir
+        } else {
+          null
+        }
+    }
+  }
+
+  /**
+    * Clean up the application staging directory.
+    */
+  def cleanupStagingDir(jobContext: JobContext, fs: FileSystem): Unit = {
+    jobContext.getAppStagingDir match {
+      case Some(appStagingDir) => try {
+        if (fs.exists(appStagingDir)) {
+          info("Deleting staging directory " + appStagingDir)
+          fs.delete(appStagingDir, true)
+        }
+      } catch {
+        case ioe: IOException =>
+          warn("Failed to cleanup staging dir " + appStagingDir, ioe)
+      }
+      case None => info("No staging dir exists")
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/725a5260/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestClientHelper.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestClientHelper.scala 
b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestClientHelper.scala
new file mode 100644
index 0000000..5c15385
--- /dev/null
+++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestClientHelper.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.job.yarn
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.hadoop.fs.{FileStatus, Path, FileSystem}
+import org.apache.hadoop.yarn.api.records.ApplicationId
+import org.apache.hadoop.yarn.client.api.YarnClient
+import org.apache.samza.SamzaException
+import org.apache.samza.config.{MapConfig, JobConfig, Config, YarnConfig}
+import org.mockito.Mockito._
+import org.mockito.Matchers.any
+import org.scalatest.FunSuite
+import org.scalatest.mock.MockitoSugar
+
+
+class TestClientHelper extends FunSuite {
+  import MockitoSugar._
+  val hadoopConfig = mock[Configuration]
+
+  val clientHelper = new ClientHelper(hadoopConfig) {
+    override def createYarnClient() = {
+      mock[YarnClient]
+    }
+  }
+
+  test("test validateJobConfig") {
+    import collection.JavaConverters._
+    var config = new MapConfig()
+
+    intercept[SamzaException] {
+      clientHelper.validateJobConfig(config)
+    }
+
+    config = new MapConfig(Map(JobConfig.JOB_SECURITY_MANAGER_FACTORY -> "some 
value").asJava)
+
+    clientHelper.validateJobConfig(config)
+  }
+
+  test("test prepareJobConfig") {
+    val jobContext = new JobContext
+    jobContext.setAppStagingDir(new Path("/user/temp/.samzaStaging/app_123"))
+    clientHelper.jobContext = jobContext
+
+    val ret = clientHelper.getSecurityYarnConfig
+
+    assert(ret.size == 2)
+    assert(ret.get(YarnConfig.YARN_JOB_STAGING_DIRECTORY) == 
Some("/user/temp/.samzaStaging/app_123"))
+    assert(ret.get(YarnConfig.YARN_CREDENTIALS_FILE) == 
Some("/user/temp/.samzaStaging/app_123/credentials"))
+  }
+
+  test("test setupAMLocalResources") {
+    val applicationId = mock[ApplicationId]
+    when(applicationId.toString).thenReturn("application_123")
+    val jobContext = new JobContext
+    jobContext.setAppId(applicationId)
+    clientHelper.jobContext = jobContext
+
+    val mockFs = mock[FileSystem]
+    val fileStatus = new FileStatus(0, false, 0, 0, 
System.currentTimeMillis(), null)
+
+    when(mockFs.getHomeDirectory).thenReturn(new Path("/user/test"))
+    when(mockFs.getFileStatus(any[Path])).thenReturn(fileStatus)
+    when(mockFs.mkdirs(any[Path])).thenReturn(true)
+
+    doNothing().when(mockFs).copyFromLocalFile(any[Path], any[Path])
+    doNothing().when(mockFs).setPermission(any[Path], any[FsPermission])
+
+    val ret = clientHelper.setupAMLocalResources(mockFs, 
Some("some.principal"), Some("some.keytab"))
+
+    assert(ret.size == 1)
+    assert(ret.contains("some.keytab"))
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/725a5260/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
 
b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
index fc0091f..3de5614 100644
--- 
a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
+++ 
b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
@@ -43,7 +43,7 @@ class TestSamzaAppMasterService {
   def testAppMasterDashboardShouldStart {
     val config = getDummyConfig
     val state = new SamzaAppState(JobModelManager(config), -1, 
ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "", 1, 
2)
-    val service = new SamzaAppMasterService(config, state, null, null)
+    val service = new SamzaAppMasterService(config, state, null, null, null)
     val taskName = new TaskName("test")
 
     // start the dashboard
@@ -74,7 +74,7 @@ class TestSamzaAppMasterService {
     // Create some dummy config
     val config = getDummyConfig
     val state = new SamzaAppState(JobModelManager(config), -1, 
ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "", 1, 
2)
-    val service = new SamzaAppMasterService(config, state, null, null)
+    val service = new SamzaAppMasterService(config, state, null, null, null)
 
     // start the dashboard
     service.onInit

Reply via email to