Repository: samza Updated Branches: refs/heads/master 76196b63c -> 725a52603
SAMZA-727: support Kerberos Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/725a5260 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/725a5260 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/725a5260 Branch: refs/heads/master Commit: 725a526031848ceae94c8e079feb40af353daff8 Parents: 76196b6 Author: Chen Song <[email protected]> Authored: Thu Jun 9 15:50:44 2016 -0700 Committer: Yi Pan (Data Infrastructure) <[email protected]> Committed: Thu Jun 9 15:50:44 2016 -0700 ---------------------------------------------------------------------- build.gradle | 1 + .../apache/samza/container/SecurityManager.java | 29 +++ .../samza/container/SecurityManagerFactory.java | 29 +++ .../org/apache/samza/config/JobConfig.scala | 3 + .../apache/samza/container/SamzaContainer.scala | 30 +++ .../org/apache/samza/config/YarnConfig.java | 45 +++++ .../apache/samza/job/yarn/ClientHelper.scala | 184 ++++++++++++++++--- .../apache/samza/job/yarn/SamzaAppMaster.scala | 24 ++- .../yarn/SamzaAppMasterSecurityManager.scala | 122 ++++++++++++ .../samza/job/yarn/SamzaAppMasterService.scala | 30 ++- .../yarn/SamzaContainerSecurityManager.scala | 96 ++++++++++ .../yarn/SamzaYarnSecurityManagerFactory.scala | 30 +++ .../org/apache/samza/job/yarn/YarnJob.scala | 103 +++++------ .../org/apache/samza/job/yarn/YarnJobUtil.scala | 109 +++++++++++ .../samza/job/yarn/TestClientHelper.scala | 91 +++++++++ .../job/yarn/TestSamzaAppMasterService.scala | 4 +- 16 files changed, 839 insertions(+), 91 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/725a5260/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 16facbb..ba4a9d1 100644 --- a/build.gradle +++ b/build.gradle @@ -316,6 +316,7 @@ project(":samza-yarn_$scalaVersion") { testCompile "junit:junit:$junitVersion" testCompile "org.mockito:mockito-all:$mockitoVersion" testCompile project(":samza-core_$scalaVersion").sourceSets.test.output + testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion" } repositories { http://git-wip-us.apache.org/repos/asf/samza/blob/725a5260/samza-core/src/main/java/org/apache/samza/container/SecurityManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/SecurityManager.java b/samza-core/src/main/java/org/apache/samza/container/SecurityManager.java new file mode 100644 index 0000000..eeb7ef6 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/container/SecurityManager.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.container; + +/** + * Manage the security of the environment. + */ +public interface SecurityManager { + void start(); + + void stop(); +} http://git-wip-us.apache.org/repos/asf/samza/blob/725a5260/samza-core/src/main/java/org/apache/samza/container/SecurityManagerFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/SecurityManagerFactory.java b/samza-core/src/main/java/org/apache/samza/container/SecurityManagerFactory.java new file mode 100644 index 0000000..f2d80d1 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/container/SecurityManagerFactory.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.container; + +import org.apache.samza.config.Config; + +/** + * Create an instance of {@link SecurityManager}. + */ +public interface SecurityManagerFactory { + SecurityManager getSecurityManager(Config config); +} http://git-wip-us.apache.org/repos/asf/samza/blob/725a5260/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala index 4d5ca4d..23a51b2 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala @@ -55,6 +55,7 @@ object JobConfig { val MONITOR_PARTITION_CHANGE = "job.coordinator.monitor-partition-change" val MONITOR_PARTITION_CHANGE_FREQUENCY_MS = "job.coordinator.monitor-partition-change.frequency.ms" val DEFAULT_MONITOR_PARTITION_CHANGE_FREQUENCY_MS = 300000 + val JOB_SECURITY_MANAGER_FACTORY = "job.security.manager.factory" implicit def Config2Job(config: Config) = new JobConfig(config) @@ -115,6 +116,8 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging { def getSystemStreamPartitionGrouperFactory = getOption(JobConfig.SSP_GROUPER_FACTORY).getOrElse(classOf[GroupByPartitionFactory].getCanonicalName) + def getSecurityManagerFactory = getOption(JobConfig.JOB_SECURITY_MANAGER_FACTORY) + val CHECKPOINT_SEGMENT_BYTES = "task.checkpoint.segment.bytes" val CHECKPOINT_REPLICATION_FACTOR = "task.checkpoint.replication.factor" http://git-wip-us.apache.org/repos/asf/samza/blob/725a5260/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index 951d479..086531e 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -313,6 +313,15 @@ object SamzaContainer extends Logging { info("Got metrics reporters: %s" format reporters.keys) + val securityManager = config.getSecurityManagerFactory match { + case Some(securityManagerFactoryClassName) => + Util + .getObj[SecurityManagerFactory](securityManagerFactoryClassName) + .getSecurityManager(config) + case _ => null + } + info("Got security manager: %s" format securityManager) + val coordinatorSystemProducer = new CoordinatorStreamSystemFactory().getCoordinatorStreamSystemProducer(config, samzaContainerMetrics.registry) val localityManager = new LocalityManager(coordinatorSystemProducer) val checkpointManager = config.getCheckpointManagerFactory() match { @@ -545,6 +554,7 @@ object SamzaContainer extends Logging { producerMultiplexer = producerMultiplexer, offsetManager = offsetManager, localityManager = localityManager, + securityManager = securityManager, metrics = samzaContainerMetrics, reporters = reporters, jvm = jvm, @@ -564,6 +574,7 @@ class SamzaContainer( diskSpaceMonitor: DiskSpaceMonitor = null, offsetManager: OffsetManager = new OffsetManager, localityManager: LocalityManager = null, + securityManager: SecurityManager = null, reporters: Map[String, MetricsReporter] = Map(), jvm: JvmMetrics = null) extends Runnable with Logging { @@ -579,6 +590,7 @@ class SamzaContainer( startProducers startTask startConsumers + startSecurityManger info("Entering run loop.") runLoop.run @@ -597,6 +609,7 @@ class SamzaContainer( shutdownLocalityManager shutdownOffsetManager shutdownMetrics + shutdownSecurityManger info("Shutdown complete.") } @@ -699,6 +712,14 @@ class SamzaContainer( consumerMultiplexer.start } + def startSecurityManger: Unit = { + if (securityManager != null) { + info("Starting security manager.") + + securityManager.start + } + } + def shutdownConsumers { info("Shutting down consumer multiplexer.") @@ -736,6 +757,7 @@ class SamzaContainer( offsetManager.stop } + def shutdownMetrics { info("Shutting down metrics reporters.") @@ -748,6 +770,14 @@ class SamzaContainer( } } + def shutdownSecurityManger: Unit = { + if (securityManager != null) { + info("Shutting down security manager.") + + securityManager.stop + } + } + def shutdownDiskSpaceMonitor: Unit = { if (diskSpaceMonitor != null) { info("Shutting down disk space monitor.") http://git-wip-us.apache.org/repos/asf/samza/blob/725a5260/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java b/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java index c556d83..8f2dc48 100644 --- a/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java +++ b/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java @@ -104,6 +104,31 @@ public class YarnConfig extends MapConfig { public static final String HOST_AFFINITY_ENABLED = "yarn.samza.host-affinity.enabled"; private static final boolean DEFAULT_HOST_AFFINITY_ENABLED = false; + /** + * Principal used to log in on a Kerberized secure cluster + */ + public static final String YARN_KERBEROS_PRINCIPAL = "yarn.kerberos.principal"; + + /** + * Key tab used to log in on a Kerberized secure cluster + */ + public static final String YARN_KERBEROS_KEYTAB = "yarn.kerberos.keytab"; + + /** + * Interval in seconds to renew a delegation token in Kerberized secure cluster + */ + public static final String YARN_TOKEN_RENEWAL_INTERVAL_SECONDS = "yarn.token.renewal.interval.seconds"; + private static final long DEFAULT_YARN_TOKEN_RENEWAL_INTERVAL_SECONDS = 24 * 3600; + + /** + * The location on HDFS to store the credentials file + */ + public static final String YARN_CREDENTIALS_FILE = "yarn.credentials.file"; + + /** + * The staging directory on HDFS for the job + */ + public static final String YARN_JOB_STAGING_DIRECTORY = "yarn.job.staging.directory"; public YarnConfig(Config config) { super(config); @@ -168,4 +193,24 @@ public class YarnConfig extends MapConfig { public boolean getHostAffinityEnabled() { return getBoolean(HOST_AFFINITY_ENABLED, DEFAULT_HOST_AFFINITY_ENABLED); } + + public String getYarnKerberosPrincipal() { + return get(YARN_KERBEROS_PRINCIPAL, null); + } + + public String getYarnKerberosKeytab() { + return get(YARN_KERBEROS_KEYTAB, null); + } + + public long getYarnTokenRenewalIntervalSeconds() { + return getLong(YARN_TOKEN_RENEWAL_INTERVAL_SECONDS, DEFAULT_YARN_TOKEN_RENEWAL_INTERVAL_SECONDS); + } + + public String getYarnCredentialsFile() { + return get(YARN_CREDENTIALS_FILE, null); + } + + public String getYarnJobStagingDirectory() { + return get(YARN_JOB_STAGING_DIRECTORY, null); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/725a5260/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala index 74a0676..0998c43 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala @@ -19,14 +19,19 @@ package org.apache.samza.job.yarn +import org.apache.hadoop.fs.permission.FsPermission +import org.apache.samza.config.{JobConfig, Config, YarnConfig} +import org.apache.samza.coordinator.stream.{CoordinatorStreamWriter} +import org.apache.samza.coordinator.stream.messages.SetConfig + import scala.collection.JavaConversions._ -import scala.collection.Map +import scala.collection.{Map} +import scala.collection.mutable.HashMap import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.api.records.ApplicationId import org.apache.hadoop.yarn.api.records.ApplicationReport -import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext import org.apache.hadoop.yarn.api.records.ContainerLaunchContext import org.apache.hadoop.yarn.api.records.FinalApplicationStatus import org.apache.hadoop.yarn.api.records.LocalResource @@ -38,6 +43,8 @@ import org.apache.hadoop.yarn.client.api.YarnClient import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.util.ConverterUtils import org.apache.hadoop.yarn.util.Records +import org.apache.hadoop.io.DataOutputBuffer +import org.apache.hadoop.fs.FileSystem import org.apache.samza.SamzaException import org.apache.samza.job.ApplicationStatus import org.apache.samza.job.ApplicationStatus.New @@ -45,10 +52,15 @@ import org.apache.samza.job.ApplicationStatus.Running import org.apache.samza.job.ApplicationStatus.SuccessfulFinish import org.apache.samza.job.ApplicationStatus.UnsuccessfulFinish import org.apache.samza.util.Logging -import java.util.Collections +import java.io.IOException +import java.nio.ByteBuffer object ClientHelper { val applicationType = "Samza" + + val CREDENTIALS_FILE = "credentials" + + val SOURCE = "yarn" } /** @@ -57,20 +69,31 @@ object ClientHelper { * container and its processes. */ class ClientHelper(conf: Configuration) extends Logging { - val yarnClient = YarnClient.createYarnClient - info("trying to connect to RM %s" format conf.get(YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS)) - yarnClient.init(conf); - yarnClient.start - var appId: Option[ApplicationId] = None + val yarnClient = createYarnClient + + private[yarn] def createYarnClient() = { + val yarnClient = YarnClient.createYarnClient + info("trying to connect to RM %s" format conf.get(YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS)) + yarnClient.init(conf) + yarnClient.start + yarnClient + } + + var jobContext: JobContext = null /** * Generate an application and submit it to the resource manager to start an application master */ - def submitApplication(packagePath: Path, memoryMb: Int, cpuCore: Int, cmds: List[String], env: Option[Map[String, String]], name: Option[String], queueName: Option[String]): Option[ApplicationId] = { + def submitApplication(config: Config, cmds: List[String], env: Option[Map[String, String]], name: Option[String]): Option[ApplicationId] = { val app = yarnClient.createApplication val newAppResponse = app.getNewApplicationResponse - var mem = memoryMb - var cpu = cpuCore + + val yarnConfig = new YarnConfig(config) + + val packagePath = new Path(yarnConfig.getPackagePath) + val mem = yarnConfig.getAMContainerMaxMemoryMb + val cpu = 1 + val queueName = Option(yarnConfig.getQueueName) // If we are asking for memory more than the max allowed, shout out if (mem > newAppResponse.getMaximumResourceCapability().getMemory()) { @@ -84,7 +107,9 @@ class ClientHelper(conf: Configuration) extends Logging { (cpu, newAppResponse.getMaximumResourceCapability().getVirtualCores())) } - appId = Some(newAppResponse.getApplicationId) + jobContext = new JobContext + jobContext.setAppId(newAppResponse.getApplicationId) + val appId = jobContext.getAppId info("preparing to request resources for app id %s" format appId.get) @@ -95,15 +120,7 @@ class ClientHelper(conf: Configuration) extends Logging { name match { case Some(name) => { appCtx.setApplicationName(name) } - case None => { appCtx.setApplicationName(appId.toString) } - } - - env match { - case Some(env) => { - containerCtx.setEnvironment(env) - info("set environment variables to %s for %s" format (env, appId.get)) - } - case None => None + case None => { appCtx.setApplicationName(appId.get.toString) } } queueName match { @@ -111,12 +128,13 @@ class ClientHelper(conf: Configuration) extends Logging { appCtx.setQueue(queueName) info("set yarn queue name to %s" format queueName) } - case None => None + case None => } // set the local package so that the containers and app master are provisioned with it val packageUrl = ConverterUtils.getYarnUrlFromPath(packagePath) - val fileStatus = packagePath.getFileSystem(conf).getFileStatus(packagePath) + val fs = packagePath.getFileSystem(conf) + val fileStatus = fs.getFileStatus(packagePath) packageResource.setResource(packageUrl) info("set package url to %s for %s" format (packageUrl, appId.get)) @@ -133,9 +151,44 @@ class ClientHelper(conf: Configuration) extends Logging { appCtx.setResource(resource) containerCtx.setCommands(cmds.toList) info("set command to %s for %s" format (cmds, appId.get)) - containerCtx.setLocalResources(Collections.singletonMap("__package", packageResource)) + appCtx.setApplicationId(appId.get) info("set app ID to %s" format appId.get) + + val localResources: HashMap[String, LocalResource] = HashMap[String, LocalResource]() + localResources += "__package" -> packageResource + + if (UserGroupInformation.isSecurityEnabled()) { + validateJobConfig(config) + + setupSecurityToken(fs, containerCtx) + info("set security token for %s" format appId.get) + + val amLocalResources = setupAMLocalResources(fs, Option(yarnConfig.getYarnKerberosPrincipal), Option(yarnConfig.getYarnKerberosKeytab)) + localResources ++= amLocalResources + + val securityYarnConfig = getSecurityYarnConfig + val coordinatorStreamWriter: CoordinatorStreamWriter = new CoordinatorStreamWriter(config) + coordinatorStreamWriter.start() + + securityYarnConfig.foreach { + case (key: String, value: String) => + coordinatorStreamWriter.sendMessage(SetConfig.TYPE, key, value) + } + coordinatorStreamWriter.stop() + } + + containerCtx.setLocalResources(localResources) + info("set local resources on application master for %s" format appId.get) + + env match { + case Some(env) => { + containerCtx.setEnvironment(env) + info("set environment variables to %s for %s" format (env, appId.get)) + } + case None => + } + appCtx.setAMContainerSpec(containerCtx) appCtx.setApplicationType(ClientHelper.applicationType) info("submitting application request for %s" format appId.get) @@ -178,4 +231,87 @@ class ClientHelper(conf: Configuration) extends Logging { case _ => Some(Running) } } + + private[yarn] def validateJobConfig(config: Config) = { + import org.apache.samza.config.JobConfig.Config2Job + + if (config.getSecurityManagerFactory.isEmpty) { + throw new SamzaException(s"Job config ${JobConfig.JOB_SECURITY_MANAGER_FACTORY} not found. This config must be set for a secure cluster") + } + } + + private def setupSecurityToken(fs: FileSystem, amContainer: ContainerLaunchContext): Unit = { + info("security is enabled") + val credentials = UserGroupInformation.getCurrentUser.getCredentials + val tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL); + if (tokenRenewer == null || tokenRenewer.length() == 0) { + throw new IOException( + "Can't get Master Kerberos principal for the RM to use as renewer"); + } + + val tokens = + fs.addDelegationTokens(tokenRenewer, credentials) + tokens.foreach { token => info("Got dt for " + fs.getUri() + "; " + token) } + val dob = new DataOutputBuffer + credentials.writeTokenStorageToStream(dob) + amContainer.setTokens(ByteBuffer.wrap(dob.getData)) + } + + private[yarn] def setupAMLocalResources(fs: FileSystem, principal: Option[String], keytab: Option[String]) = { + if (principal.isEmpty || keytab.isEmpty) { + throw new SamzaException("You need to set both %s and %s on a secure cluster" format(YarnConfig.YARN_KERBEROS_PRINCIPAL, YarnConfig + .YARN_KERBEROS_KEYTAB)) + } + + val localResources = HashMap[String, LocalResource]() + + // create application staging dir + val created = YarnJobUtil.createStagingDir(jobContext, fs) + created match { + case Some(appStagingDir) => + jobContext.setAppStagingDir(appStagingDir) + val destFilePath = addLocalFile(fs, keytab.get, appStagingDir) + localResources.put(destFilePath.getName(), getLocalResource(fs, destFilePath, LocalResourceType.FILE)) + localResources.toMap + case None => throw new SamzaException(s"Failed to create staging directory for %s" format jobContext.getAppId) + } + } + + private def addLocalFile(fs: FileSystem, localFile: String, destDirPath: Path) = { + val srcFilePath = new Path(localFile) + val destFilePath = new Path(destDirPath, srcFilePath.getName) + fs.copyFromLocalFile(srcFilePath, destFilePath) + val localFilePermission = FsPermission.createImmutable(Integer.parseInt("400", 8).toShort) + fs.setPermission(destFilePath, localFilePermission) + destFilePath + } + + private def getLocalResource(fs: FileSystem, destFilePath: Path, resourceType: LocalResourceType) = { + val localResource = Records.newRecord(classOf[LocalResource]) + val fileStatus = fs.getFileStatus(destFilePath) + localResource.setResource(ConverterUtils.getYarnUrlFromPath(destFilePath)) + localResource.setSize(fileStatus.getLen()) + localResource.setTimestamp(fileStatus.getModificationTime()) + localResource.setType(resourceType) + localResource.setVisibility(LocalResourceVisibility.APPLICATION) + localResource + } + + private[yarn] def getSecurityYarnConfig = { + val stagingDir = jobContext.getAppStagingDir.get + val credentialsFile = new Path(stagingDir, ClientHelper.CREDENTIALS_FILE) + val jobConfigs = HashMap[String, String]() + + jobConfigs += YarnConfig.YARN_CREDENTIALS_FILE -> credentialsFile.toString + jobConfigs += YarnConfig.YARN_JOB_STAGING_DIRECTORY -> stagingDir.toString + + jobConfigs.toMap + } + + /** + * Cleanup application staging directory. + */ + def cleanupStagingDir(): Unit = { + YarnJobUtil.cleanupStagingDir(jobContext, FileSystem.get(conf)) + } } http://git-wip-us.apache.org/repos/asf/samza/blob/725a5260/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala index 7bd8131..1fb18be 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala @@ -19,9 +19,14 @@ package org.apache.samza.job.yarn +import java.io.IOException + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{Path, FileSystem} + import scala.collection.JavaConversions.asScalaBuffer import org.apache.hadoop.yarn.api.ApplicationConstants -import org.apache.hadoop.yarn.api.records.{ Container, ContainerStatus, NodeReport } +import org.apache.hadoop.yarn.api.records.{FinalApplicationStatus, Container, ContainerStatus, NodeReport} import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync import org.apache.hadoop.yarn.conf.YarnConfiguration @@ -85,17 +90,20 @@ object SamzaAppMaster extends Logging with AMRMClientAsync.CallbackHandler { val containerMem = yarnConfig.getContainerMaxMemoryMb val containerCpu = yarnConfig.getContainerMaxCpuCores val jmxServer = if (yarnConfig.getJmxServerEnabled) Some(new JmxServer()) else None + val jobContext = new JobContext + Option(yarnConfig.getYarnJobStagingDirectory).map { + jobStagingDirectory => jobContext.setAppStagingDir(new Path(jobStagingDirectory)) + } + // wire up all of the yarn event listeners + val state = new SamzaAppState(jobCoordinator, -1, containerId, nodeHostString, nodePortString.toInt, nodeHttpPortString.toInt) try { - // wire up all of the yarn event listeners - val state = new SamzaAppState(jobCoordinator, -1, containerId, nodeHostString, nodePortString.toInt, nodeHttpPortString.toInt) - if (jmxServer.isDefined) { state.jmxUrl = jmxServer.get.getJmxUrl state.jmxTunnelingUrl = jmxServer.get.getTunnelingJmxUrl } - val service = new SamzaAppMasterService(config, state, registry, clientHelper) + val service = new SamzaAppMasterService(config, state, registry, clientHelper, hConfig) val lifecycle = new SamzaAppMasterLifecycle(containerMem, containerCpu, state, amClient) val metrics = new SamzaAppMasterMetrics(config, state, registry) val taskManager = new SamzaTaskManager(config, state, amClient, hConfig) @@ -103,6 +111,10 @@ object SamzaAppMaster extends Logging with AMRMClientAsync.CallbackHandler { listeners = List(service, lifecycle, metrics, taskManager) run(amClient, listeners, hConfig, interval) } finally { + if (state.status != FinalApplicationStatus.UNDEFINED) { + YarnJobUtil.cleanupStagingDir(jobContext, FileSystem.get(hConfig)) + } + // jmxServer has to be stopped or will prevent process from exiting. if (jmxServer.isDefined) { jmxServer.get.stop @@ -134,6 +146,7 @@ object SamzaAppMaster extends Logging with AMRMClientAsync.CallbackHandler { } catch { case e: Exception => warn("Listener %s failed to shutdown." format listener, e) }) + // amClient has to be stopped amClient.stop } @@ -156,5 +169,4 @@ object SamzaAppMaster extends Logging with AMRMClientAsync.CallbackHandler { error("Error occured in amClient's callback", e) storedException = e } - } http://git-wip-us.apache.org/repos/asf/samza/blob/725a5260/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterSecurityManager.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterSecurityManager.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterSecurityManager.scala new file mode 100644 index 0000000..2eaa19f --- /dev/null +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterSecurityManager.scala @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.job.yarn + +import java.security.PrivilegedExceptionAction +import java.util.concurrent.{TimeUnit, Executors} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{Path, FileSystem} +import org.apache.hadoop.security.{Credentials, UserGroupInformation} +import org.apache.samza.SamzaException +import org.apache.samza.config.{YarnConfig, Config} +import org.apache.samza.util.{DaemonThreadFactory, Logging} +import org.apache.samza.container.SecurityManager + +object SamzaAppMasterSecurityManager { + val TOKEN_RENEW_THREAD_NAME_PREFIX = "TOKEN-RENEW-PREFIX" +} + +/** + * The SamzaAppMasterSecurityManager is responsible for renewing and distributing HDFS delegation tokens on a secure YARN + * cluster. + * + * <p /> + * + * It runs in a daemon thread and periodically requests new delegation tokens and writes the fresh tokens in a configured + * staging directory at the configured frequency. + * + * @param config Samza config for the application + * @param hadoopConf the hadoop configuration + */ +class SamzaAppMasterSecurityManager(config: Config, hadoopConf: Configuration) extends SecurityManager with Logging { + private val tokenRenewExecutor = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory(SamzaAppMasterSecurityManager + .TOKEN_RENEW_THREAD_NAME_PREFIX)) + + def start() = { + val yarnConfig = new YarnConfig(config) + val principal = yarnConfig.getYarnKerberosPrincipal + // only get the name part of the keytab config, the keytab file will in the working directory + val keytab = new Path(yarnConfig.getYarnKerberosKeytab).getName + val renewalInterval = yarnConfig.getYarnTokenRenewalIntervalSeconds + val credentialsFile = yarnConfig.getYarnCredentialsFile + + val tokenRenewRunnable = new Runnable { + override def run(): Unit = { + try { + loginFromKeytab(principal, keytab, credentialsFile) + } catch { + case e: Exception => + warn("Failed to renew token and write out new credentials", e) + } + } + } + + tokenRenewExecutor.scheduleAtFixedRate(tokenRenewRunnable, renewalInterval, renewalInterval, TimeUnit.SECONDS) + } + + private def loginFromKeytab(principal: String, keytab: String, credentialsFile: String) = { + info(s"Logging to KDC using principal: $principal") + UserGroupInformation.loginUserFromKeytab(principal, keytab) + val keytabUser = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab) + val credentials = keytabUser.getCredentials + + // get the new delegation token from the key tab user + keytabUser.doAs(new PrivilegedExceptionAction[Void] { + override def run(): Void = { + getNewDelegationToken(credentials) + null + } + }) + + UserGroupInformation.getCurrentUser.addCredentials(credentials) + + val credentialsFilePath = new Path(credentialsFile) + writeNewDelegationToken(credentialsFilePath, credentials) + } + + private def getNewDelegationToken(credentials: Credentials) = { + val fs = FileSystem.get(hadoopConf) + val tokenRenewer = UserGroupInformation.getCurrentUser.getShortUserName + // HDFS will not issue new delegation token if there are existing ones in the credentials. This is hacked + // by creating a new credentials object from the login via the given keytab and principle, passing the new + // credentials object to FileSystem.addDelegationTokens to force a new delegation token created and adding + // the credentials to the current user's credential object + fs.addDelegationTokens(tokenRenewer, credentials) + } + + private def writeNewDelegationToken(credentialsFilePath: Path, credentials: Credentials) = { + val fs = FileSystem.get(hadoopConf) + if (fs.exists(credentialsFilePath)) { + logger.info(s"Deleting existing credentials file $credentialsFilePath") + val success = fs.delete(credentialsFilePath, false) + if (!success) { + throw new SamzaException(s"Failed deleting existing credentials file $credentialsFilePath") + } + } + + logger.info(s"Writing new delegation to the token file $credentialsFilePath") + credentials.writeTokenStorageFile(credentialsFilePath, hadoopConf) + } + + def stop() = { + tokenRenewExecutor.shutdown + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/725a5260/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala index 3adf86f..979d81d 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala @@ -19,6 +19,8 @@ package org.apache.samza.job.yarn +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.samza.coordinator.stream.CoordinatorStreamWriter import org.apache.samza.coordinator.stream.messages.SetConfig import org.apache.samza.util.Logging @@ -31,14 +33,19 @@ import org.apache.samza.webapp.ApplicationMasterRestServlet import org.apache.samza.webapp.ApplicationMasterWebServlet /** - * Samza's application master runs a very basic HTTP/JSON service to allow - * dashboards to check on the status of a job. SamzaAppMasterService starts - * up the web service when initialized. - */ -class SamzaAppMasterService(config: Config, state: SamzaAppState, registry: ReadableMetricsRegistry, clientHelper: ClientHelper) extends YarnAppMasterListener with Logging { + * Samza's application master runs a very basic HTTP/JSON service to allow + * dashboards to check on the status of a job. SamzaAppMasterService starts + * up the web service when initialized. + * <p /> + * Besides the HTTP/JSON service endpoints, it also starts an optional + * SecurityManager which takes care of the security needs when running in + * a secure environment. + */ +class SamzaAppMasterService(config: Config, state: SamzaAppState, registry: ReadableMetricsRegistry, clientHelper: ClientHelper, yarnConfiguration: YarnConfiguration) extends YarnAppMasterListener with Logging { var rpcApp: HttpServer = null var webApp: HttpServer = null val SERVER_URL_OPT: String = "samza.autoscaling.server.url" + var securityManager: Option[SamzaAppMasterSecurityManager] = None override def onInit() { // try starting the samza AM dashboard at a random rpc and tracking port @@ -65,6 +72,15 @@ class SamzaAppMasterService(config: Config, state: SamzaAppState, registry: Read debug("sent server url message with value: %s " format state.coordinatorUrl.toString) info("Webapp is started at (rpc %s, tracking %s, coordinator %s)" format(state.rpcUrl, state.trackingUrl, state.coordinatorUrl)) + + // start YarnSecurityManger for a secure cluster + if (UserGroupInformation.isSecurityEnabled) { + securityManager = Option { + val securityManager = new SamzaAppMasterSecurityManager(config, yarnConfiguration) + securityManager.start + securityManager + } + } } override def onShutdown() { @@ -77,5 +93,9 @@ class SamzaAppMasterService(config: Config, state: SamzaAppState, registry: Read } state.jobCoordinator.stop + + securityManager.map { + securityManager => securityManager.stop + } } } http://git-wip-us.apache.org/repos/asf/samza/blob/725a5260/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaContainerSecurityManager.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaContainerSecurityManager.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaContainerSecurityManager.scala new file mode 100644 index 0000000..10b971f --- /dev/null +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaContainerSecurityManager.scala @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.job.yarn + +import java.util.concurrent.{TimeUnit, Executors} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.security.{UserGroupInformation, Credentials} +import org.apache.samza.config.{Config, YarnConfig} +import org.apache.samza.util.{Logging, DaemonThreadFactory} +import org.apache.samza.container.SecurityManager + +object SamzaContainerSecurityManager { + val TOKEN_RENEW_THREAD_NAME_PREFIX = "TOKEN-RENEW-PREFIX" + val INITIAL_DELAY_IN_SECONDS = 60 +} + +class SamzaContainerSecurityManager(config: Config, hadoopConfig: Configuration) extends SecurityManager with Logging { + private val tokenRenewExecutor = Executors.newScheduledThreadPool(1, new DaemonThreadFactory(SamzaContainerSecurityManager.TOKEN_RENEW_THREAD_NAME_PREFIX)) + private var lastRefreshTimestamp = 0L + + + def start() = { + val yarnConfig = new YarnConfig(config) + val renewalInterval = yarnConfig.getYarnTokenRenewalIntervalSeconds + val tokenFilePath = new Path(yarnConfig.getYarnCredentialsFile) + + val tokenRenewRunnable = new Runnable { + override def run(): Unit = { + try { + val fs = FileSystem.get(hadoopConfig) + if (fs.exists(tokenFilePath)) { + val fileStatus = fs.getFileStatus(tokenFilePath) + if (lastRefreshTimestamp > fileStatus.getModificationTime) { + // credentials have not been updated, retry after 5 minutes + info("Expecting to update delegation tokens, but AM has not updated credentials file yet, will retry in 5 minutes") + tokenRenewExecutor.schedule(this, 5, TimeUnit.MINUTES) + } else { + val credentials = getCredentialsFromHDFS(fs, tokenFilePath) + UserGroupInformation.getCurrentUser.addCredentials(credentials) + info("Successfully renewed tokens from credentials file") + lastRefreshTimestamp = System.currentTimeMillis + info(s"Schedule the next fetch in $renewalInterval seconds") + tokenRenewExecutor.schedule(this, renewalInterval, TimeUnit.SECONDS) + } + } else { + info(s"Credentials file not found yet. Schedule the next fetch in $renewalInterval seconds") + tokenRenewExecutor.schedule(this, renewalInterval, TimeUnit.SECONDS) + } + } catch { + case e: Exception => + val retrySeconds = Math.min(renewalInterval, 3600) + warn(s"Failed to renew tokens, will retry in $retrySeconds seconds", e) + + tokenRenewExecutor.schedule(this, retrySeconds, TimeUnit.SECONDS) + } + } + } + + info(s"Schedule the next fetch in ${renewalInterval + SamzaContainerSecurityManager.INITIAL_DELAY_IN_SECONDS} seconds") + tokenRenewExecutor.schedule(tokenRenewRunnable, renewalInterval + SamzaContainerSecurityManager.INITIAL_DELAY_IN_SECONDS, TimeUnit.SECONDS) + } + + private def getCredentialsFromHDFS(fs: FileSystem, tokenPath: Path): Credentials = { + val stream = fs.open(tokenPath) + try { + val newCredentials = new Credentials() + newCredentials.readTokenStorageStream(stream) + newCredentials + } finally { + stream.close() + } + } + + def stop(): Unit = { + tokenRenewExecutor.shutdown() + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/725a5260/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnSecurityManagerFactory.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnSecurityManagerFactory.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnSecurityManagerFactory.scala new file mode 100644 index 0000000..824e81d --- /dev/null +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnSecurityManagerFactory.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.job.yarn + +import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.samza.config.Config +import org.apache.samza.container.{SecurityManager, SecurityManagerFactory} + +class SamzaYarnSecurityManagerFactory extends SecurityManagerFactory { + override def getSecurityManager(config: Config): SecurityManager = { + val yarnConfig = new YarnConfiguration + new SamzaContainerSecurityManager(config, yarnConfig) + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/725a5260/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala index 62ddb26..3ca1f0d 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala @@ -19,31 +19,15 @@ package org.apache.samza.job.yarn import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path -import org.apache.hadoop.security.UserGroupInformation -import org.apache.hadoop.yarn.api.records.ApplicationId import org.apache.hadoop.yarn.api.ApplicationConstants -import org.apache.samza.config.Config -import org.apache.samza.config.JobConfig -import org.apache.samza.util.Util -import org.apache.samza.job.ApplicationStatus -import org.apache.samza.job.ApplicationStatus.Running -import org.apache.samza.job.StreamJob -import org.apache.samza.job.ApplicationStatus.SuccessfulFinish -import org.apache.samza.job.ApplicationStatus.UnsuccessfulFinish +import org.apache.hadoop.yarn.api.records.ApplicationId import org.apache.samza.config.JobConfig.Config2Job -import org.apache.samza.config.YarnConfig -import org.apache.samza.config.ShellCommandConfig -import org.apache.samza.SamzaException +import org.apache.samza.config.{Config, JobConfig, ShellCommandConfig, YarnConfig} +import org.apache.samza.job.ApplicationStatus.{Running, SuccessfulFinish, UnsuccessfulFinish} +import org.apache.samza.job.{ApplicationStatus, StreamJob} import org.apache.samza.serializers.model.SamzaObjectMapper -import org.apache.samza.config.JobConfig.Config2Job +import org.apache.samza.util.Util import org.slf4j.LoggerFactory -import scala.collection.JavaConversions._ -import org.apache.samza.config.MapConfig -import org.apache.samza.config.ConfigException -import org.apache.samza.config.SystemConfig - - /** * Starts the application manager @@ -56,36 +40,40 @@ class YarnJob(config: Config, hadoopConfig: Configuration) extends StreamJob { val logger = LoggerFactory.getLogger(this.getClass) def submit: YarnJob = { + try { + val cmdExec = buildAmCmd() + + appId = client.submitApplication( + config, + List( + // we need something like this: + //"export SAMZA_LOG_DIR=%s && ln -sfn %s logs && exec <fwk_path>/bin/run-am.sh 1>logs/%s 2>logs/%s" + + "export SAMZA_LOG_DIR=%s && ln -sfn %s logs && exec %s 1>logs/%s 2>logs/%s" + format (ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.LOG_DIR_EXPANSION_VAR, + cmdExec, ApplicationConstants.STDOUT, ApplicationConstants.STDERR)), + Some({ + val coordinatorSystemConfig = Util.buildCoordinatorStreamConfig(config) + val envMap = Map( + ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG -> Util.envVarEscape(SamzaObjectMapper.getObjectMapper.writeValueAsString + (coordinatorSystemConfig)), + ShellCommandConfig.ENV_JAVA_OPTS -> Util.envVarEscape(yarnConfig.getAmOpts)) + val amJavaHome = yarnConfig.getAMJavaHome + val envMapWithJavaHome = if (amJavaHome == null) { + envMap + } else { + envMap + (ShellCommandConfig.ENV_JAVA_HOME -> amJavaHome) + } + envMapWithJavaHome + }), + Some("%s_%s" format(config.getName.get, config.getJobId.getOrElse(1))) + ) + } catch { + case e: Throwable => + client.cleanupStagingDir + throw e + } - val cmdExec = buildAmCmd() - - appId = client.submitApplication( - new Path(yarnConfig.getPackagePath), - yarnConfig.getAMContainerMaxMemoryMb, - 1, - List( - // we need something like this: - //"export SAMZA_LOG_DIR=%s && ln -sfn %s logs && exec <fwk_path>/bin/run-am.sh 1>logs/%s 2>logs/%s" - - "export SAMZA_LOG_DIR=%s && ln -sfn %s logs && exec %s 1>logs/%s 2>logs/%s" - format (ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.LOG_DIR_EXPANSION_VAR, - cmdExec, ApplicationConstants.STDOUT, ApplicationConstants.STDERR)), - - Some({ - val coordinatorSystemConfig = Util.buildCoordinatorStreamConfig(config) - val envMap = Map( - ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG -> Util.envVarEscape(SamzaObjectMapper.getObjectMapper.writeValueAsString(coordinatorSystemConfig)), - ShellCommandConfig.ENV_JAVA_OPTS -> Util.envVarEscape(yarnConfig.getAmOpts)) - val amJavaHome = yarnConfig.getAMJavaHome - val envMapWithJavaHome = if(amJavaHome == null) { - envMap - } else { - envMap + (ShellCommandConfig.ENV_JAVA_HOME -> amJavaHome) - } - envMapWithJavaHome - }), - Some("%s_%s" format (config.getName.get, config.getJobId.getOrElse(1))), - Option(yarnConfig.getQueueName)) this } @@ -118,8 +106,10 @@ class YarnJob(config: Config, hadoopConfig: Configuration) extends StreamJob { while (System.currentTimeMillis() - startTimeMs < timeoutMs) { Option(getStatus) match { - case Some(s) => if (SuccessfulFinish.equals(s) || UnsuccessfulFinish.equals(s)) return s - case None => null + case Some(s) => if (SuccessfulFinish.equals(s) || UnsuccessfulFinish.equals(s)) + client.cleanupStagingDir + return s + case None => } Thread.sleep(1000) @@ -152,8 +142,13 @@ class YarnJob(config: Config, hadoopConfig: Configuration) extends StreamJob { def kill: YarnJob = { appId match { - case Some(appId) => client.kill(appId) - case None => None + case Some(appId) => + try { + client.kill(appId) + } finally { + client.cleanupStagingDir + } + case None => } this } http://git-wip-us.apache.org/repos/asf/samza/blob/725a5260/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobUtil.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobUtil.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobUtil.scala new file mode 100644 index 0000000..d7afa15 --- /dev/null +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobUtil.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.job.yarn + +import java.io.IOException + +import org.apache.hadoop.fs.permission.FsPermission +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.yarn.api.records.ApplicationId +import org.apache.samza.util.Logging + +object JobContext { + val STAGING_DIR = ".samzaStaging" +} + +/** + * JobContext is used to store the meta information about the job running on Yarn. + * + * Optionally, one can set Application Id and Application Staging Directory to the job context object. + */ +class JobContext { + private var appId: ApplicationId = null + private var appStagingDir: Path = null + + def getAppId: Option[ApplicationId] = { + Option(appId) + } + + def getAppStagingDir: Option[Path] = { + Option(appStagingDir) + } + + def setAppId(appId: ApplicationId) = { + this.appId = appId + } + + def setAppStagingDir(appStagingDir: Path) = { + this.appStagingDir = appStagingDir + } + + /** + * Return the staging directory path to the given application. + */ + def defaultAppStagingDir: Option[String] = { + getAppId.map(JobContext.STAGING_DIR + Path.SEPARATOR + _.toString) + } +} + +object YarnJobUtil extends Logging { + /** + * Create the staging directory for the application. + * + * @param jobContext + * @param fs + * @return the Option of the Path object to the staging directory + */ + def createStagingDir(jobContext: JobContext, fs: FileSystem) = { + val defaultStagingDir = jobContext.defaultAppStagingDir.map(new Path(fs.getHomeDirectory, _)) + val stagingDir = jobContext.getAppStagingDir match { + case appStagingDir: Some[Path] => appStagingDir + case None => defaultStagingDir + } + stagingDir.map { + appStagingDir => + val appStagingDirPermission = FsPermission.createImmutable(Integer.parseInt("700", 8).toShort) + if (FileSystem.mkdirs(fs, appStagingDir, appStagingDirPermission)) { + appStagingDir + } else { + null + } + } + } + + /** + * Clean up the application staging directory. + */ + def cleanupStagingDir(jobContext: JobContext, fs: FileSystem): Unit = { + jobContext.getAppStagingDir match { + case Some(appStagingDir) => try { + if (fs.exists(appStagingDir)) { + info("Deleting staging directory " + appStagingDir) + fs.delete(appStagingDir, true) + } + } catch { + case ioe: IOException => + warn("Failed to cleanup staging dir " + appStagingDir, ioe) + } + case None => info("No staging dir exists") + } + + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/725a5260/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestClientHelper.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestClientHelper.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestClientHelper.scala new file mode 100644 index 0000000..5c15385 --- /dev/null +++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestClientHelper.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.job.yarn + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.permission.FsPermission +import org.apache.hadoop.fs.{FileStatus, Path, FileSystem} +import org.apache.hadoop.yarn.api.records.ApplicationId +import org.apache.hadoop.yarn.client.api.YarnClient +import org.apache.samza.SamzaException +import org.apache.samza.config.{MapConfig, JobConfig, Config, YarnConfig} +import org.mockito.Mockito._ +import org.mockito.Matchers.any +import org.scalatest.FunSuite +import org.scalatest.mock.MockitoSugar + + +class TestClientHelper extends FunSuite { + import MockitoSugar._ + val hadoopConfig = mock[Configuration] + + val clientHelper = new ClientHelper(hadoopConfig) { + override def createYarnClient() = { + mock[YarnClient] + } + } + + test("test validateJobConfig") { + import collection.JavaConverters._ + var config = new MapConfig() + + intercept[SamzaException] { + clientHelper.validateJobConfig(config) + } + + config = new MapConfig(Map(JobConfig.JOB_SECURITY_MANAGER_FACTORY -> "some value").asJava) + + clientHelper.validateJobConfig(config) + } + + test("test prepareJobConfig") { + val jobContext = new JobContext + jobContext.setAppStagingDir(new Path("/user/temp/.samzaStaging/app_123")) + clientHelper.jobContext = jobContext + + val ret = clientHelper.getSecurityYarnConfig + + assert(ret.size == 2) + assert(ret.get(YarnConfig.YARN_JOB_STAGING_DIRECTORY) == Some("/user/temp/.samzaStaging/app_123")) + assert(ret.get(YarnConfig.YARN_CREDENTIALS_FILE) == Some("/user/temp/.samzaStaging/app_123/credentials")) + } + + test("test setupAMLocalResources") { + val applicationId = mock[ApplicationId] + when(applicationId.toString).thenReturn("application_123") + val jobContext = new JobContext + jobContext.setAppId(applicationId) + clientHelper.jobContext = jobContext + + val mockFs = mock[FileSystem] + val fileStatus = new FileStatus(0, false, 0, 0, System.currentTimeMillis(), null) + + when(mockFs.getHomeDirectory).thenReturn(new Path("/user/test")) + when(mockFs.getFileStatus(any[Path])).thenReturn(fileStatus) + when(mockFs.mkdirs(any[Path])).thenReturn(true) + + doNothing().when(mockFs).copyFromLocalFile(any[Path], any[Path]) + doNothing().when(mockFs).setPermission(any[Path], any[FsPermission]) + + val ret = clientHelper.setupAMLocalResources(mockFs, Some("some.principal"), Some("some.keytab")) + + assert(ret.size == 1) + assert(ret.contains("some.keytab")) + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/725a5260/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala index fc0091f..3de5614 100644 --- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala +++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala @@ -43,7 +43,7 @@ class TestSamzaAppMasterService { def testAppMasterDashboardShouldStart { val config = getDummyConfig val state = new SamzaAppState(JobModelManager(config), -1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "", 1, 2) - val service = new SamzaAppMasterService(config, state, null, null) + val service = new SamzaAppMasterService(config, state, null, null, null) val taskName = new TaskName("test") // start the dashboard @@ -74,7 +74,7 @@ class TestSamzaAppMasterService { // Create some dummy config val config = getDummyConfig val state = new SamzaAppState(JobModelManager(config), -1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "", 1, 2) - val service = new SamzaAppMasterService(config, state, null, null) + val service = new SamzaAppMasterService(config, state, null, null, null) // start the dashboard service.onInit
