Updated Branches: refs/heads/master 2cd18279c -> 4217fc7f9
SAMZA-9; upgrading samza to yarn 2.2.0. Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/4217fc7f Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/4217fc7f Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/4217fc7f Branch: refs/heads/master Commit: 4217fc7f9dd951c2c84b89e06daf8a1b4b0a420f Parents: 2cd1827 Author: Chris Riccomini <[email protected]> Authored: Tue Oct 22 13:14:25 2013 -0700 Committer: Chris Riccomini <[email protected]> Committed: Tue Oct 22 13:14:25 2013 -0700 ---------------------------------------------------------------------- build.gradle | 4 - gradle.properties | 1 - gradle/dependency-versions.gradle | 1 + .../apache/samza/job/yarn/ClientHelper.scala | 102 ++++------- .../apache/samza/job/yarn/SamzaAppMaster.scala | 13 +- .../job/yarn/SamzaAppMasterLifecycle.scala | 11 +- .../samza/job/yarn/SamzaAppMasterService.scala | 27 ++- .../job/yarn/SamzaAppMasterTaskManager.scala | 66 +++---- .../apache/samza/job/yarn/YarnAppMaster.scala | 21 ++- .../org/apache/samza/job/yarn/YarnJob.scala | 1 - .../webapp/ApplicationMasterWebServlet.scala | 3 +- .../job/yarn/TestSamzaAppMasterLifecycle.scala | 59 +++---- .../yarn/TestSamzaAppMasterTaskManager.scala | 174 +++++++++---------- .../samza/job/yarn/TestYarnAppMaster.scala | 1 - 14 files changed, 234 insertions(+), 250 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4217fc7f/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 1482ea9..f30128f 100644 --- a/build.gradle +++ b/build.gradle @@ -104,10 +104,6 @@ project(":samza-serializers_$scalaVersion") { project(":samza-yarn_$scalaVersion") { apply plugin: 'scala' - jar { - classifier = "yarn-$yarnVersion" - } - dependencies { compile project(':samza-api') compile project(":samza-core_$scalaVersion") http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4217fc7f/gradle.properties ---------------------------------------------------------------------- diff --git a/gradle.properties b/gradle.properties index ed6390b..4bfa1c3 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,3 @@ group=org.apache.samza version=0.7.0 scalaVersion=2.9.2 -yarnVersion=2.0.5-alpha http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4217fc7f/gradle/dependency-versions.gradle ---------------------------------------------------------------------- diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle index 5f3fb32..f1f8831 100644 --- a/gradle/dependency-versions.gradle +++ b/gradle/dependency-versions.gradle @@ -9,4 +9,5 @@ ext { kafkaVersion = "0.8.1-SNAPSHOT" commonsHttpClientVersion = "3.1" leveldbVersion = "1.7" + yarnVersion = "2.2.0" } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4217fc7f/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala index b2b529e..2339960 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala @@ -19,44 +19,37 @@ package org.apache.samza.job.yarn -import java.util.Collections - import scala.collection.JavaConversions._ import scala.collection.Map - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.hadoop.net.NetUtils import org.apache.hadoop.security.UserGroupInformation -import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest -import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest -import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest -import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest -import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest -import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext -import org.apache.hadoop.yarn.api.records.ContainerLaunchContext -import org.apache.hadoop.yarn.api.records.LocalResource -import org.apache.hadoop.yarn.api.records.Resource import org.apache.hadoop.yarn.api.records.ApplicationId import org.apache.hadoop.yarn.api.records.ApplicationReport +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext import org.apache.hadoop.yarn.api.records.FinalApplicationStatus +import org.apache.hadoop.yarn.api.records.LocalResource import org.apache.hadoop.yarn.api.records.LocalResourceType import org.apache.hadoop.yarn.api.records.LocalResourceVisibility +import org.apache.hadoop.yarn.api.records.Resource import org.apache.hadoop.yarn.api.records.YarnApplicationState -import org.apache.hadoop.yarn.api.ClientRMProtocol +import org.apache.hadoop.yarn.client.api.YarnClient import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.ConverterUtils import org.apache.hadoop.yarn.util.Records - +import org.apache.samza.SamzaException import org.apache.samza.job.ApplicationStatus import org.apache.samza.job.ApplicationStatus.New import org.apache.samza.job.ApplicationStatus.Running import org.apache.samza.job.ApplicationStatus.SuccessfulFinish import org.apache.samza.job.ApplicationStatus.UnsuccessfulFinish - import grizzled.slf4j.Logging -import org.apache.samza.SamzaException +import java.util.Collections + +object ClientHelper { + val applicationType = "Samza" +} /** * Client helper class required to submit an application master start script to the resource manager. Also @@ -64,41 +57,27 @@ import org.apache.samza.SamzaException * container and its processes. */ class ClientHelper(conf: Configuration) extends Logging { - val rpc = YarnRPC.create(conf) - val rmAddress = NetUtils.createSocketAddr(conf.get(YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS)) - info("trying to connect to RM %s" format rmAddress) - val applicationsManager = rpc.getProxy(classOf[ClientRMProtocol], rmAddress, conf).asInstanceOf[ClientRMProtocol] + val yarnClient = YarnClient.createYarnClient + info("trying to connect to RM %s" format conf.get(YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS)) + yarnClient.init(conf); + yarnClient.start var appId: Option[ApplicationId] = None /** * Generate an application and submit it to the resource manager to start an application master */ - def submitApplication(packagePath: Path, memoryMb: Int, cpuCore: Int, user: UserGroupInformation, cmds: List[String], env: Option[Map[String, String]], name: Option[String]): Option[ApplicationId] = { - val newAppRequest = Records.newRecord(classOf[GetNewApplicationRequest]) - val newAppResponse = applicationsManager.getNewApplication(newAppRequest) + def submitApplication(packagePath: Path, memoryMb: Int, cpuCore: Int, cmds: List[String], env: Option[Map[String, String]], name: Option[String]): Option[ApplicationId] = { + val app = yarnClient.createApplication + val newAppResponse = app.getNewApplicationResponse var mem = memoryMb var cpu = cpuCore - // If we are asking for memory less than the minimum required, bump it - if (mem < newAppResponse.getMinimumResourceCapability().getMemory()) { - val min = newAppResponse.getMinimumResourceCapability().getMemory() - warn("requesting %s megs of memory, which is less than minimum capability of %s, so using minimum" format (mem, min)) - mem = min - } - // If we are asking for memory more than the max allowed, shout out if (mem > newAppResponse.getMaximumResourceCapability().getMemory()) { throw new SamzaException("You're asking for more memory (%s) than is allowed by YARN: %s" format (mem, newAppResponse.getMaximumResourceCapability().getMemory())) } - // if we are asking for cpu less than the minimum required, bump it - if (cpu < newAppResponse.getMinimumResourceCapability().getVirtualCores()) { - val min = newAppResponse.getMinimumResourceCapability.getVirtualCores() - warn("requesting %s virtual cores of cpu, which is less than minimum capability of %s, so using minimum" format (cpu, min)) - cpu = min - } - // If we are asking for cpu more than the max allowed, shout out if (cpu > newAppResponse.getMaximumResourceCapability().getVirtualCores()) { throw new SamzaException("You're asking for more CPU (%s) than is allowed by YARN: %s" format @@ -109,10 +88,9 @@ class ClientHelper(conf: Configuration) extends Logging { info("preparing to request resources for app id %s" format appId.get) - val appCtx = Records.newRecord(classOf[ApplicationSubmissionContext]) + val appCtx = app.getApplicationSubmissionContext val containerCtx = Records.newRecord(classOf[ContainerLaunchContext]) val resource = Records.newRecord(classOf[Resource]) - val submitAppRequest = Records.newRecord(classOf[SubmitApplicationRequest]) val packageResource = Records.newRecord(classOf[LocalResource]) name match { @@ -144,54 +122,48 @@ class ClientHelper(conf: Configuration) extends Logging { info("set memory request to %s for %s" format (mem, appId.get)) resource.setVirtualCores(cpu) info("set cpu core request to %s for %s" format (cpu, appId.get)) - containerCtx.setResource(resource) + appCtx.setResource(resource) containerCtx.setCommands(cmds.toList) info("set command to %s for %s" format (cmds, appId.get)) containerCtx.setLocalResources(Collections.singletonMap("__package", packageResource)) appCtx.setApplicationId(appId.get) - info("set app ID to %s" format (user, appId.get)) - appCtx.setUser(user.getShortUserName) - info("set user to %s for %s" format (user, appId.get)) + info("set app ID to %s" format appId.get) appCtx.setAMContainerSpec(containerCtx) - submitAppRequest.setApplicationSubmissionContext(appCtx) + appCtx.setApplicationType(ClientHelper.applicationType) info("submitting application request for %s" format appId.get) - applicationsManager.submitApplication(submitAppRequest) + yarnClient.submitApplication(appCtx) appId } def status(appId: ApplicationId): Option[ApplicationStatus] = { - val statusRequest = Records.newRecord(classOf[GetApplicationReportRequest]) - statusRequest.setApplicationId(appId) - val statusResponse = applicationsManager.getApplicationReport(statusRequest) - convertState(statusResponse.getApplicationReport) + val statusResponse = yarnClient.getApplicationReport(appId) + convertState(statusResponse.getYarnApplicationState, statusResponse.getFinalApplicationStatus) } def kill(appId: ApplicationId) { - val killRequest = Records.newRecord(classOf[KillApplicationRequest]) - killRequest.setApplicationId(appId) - applicationsManager.forceKillApplication(killRequest) + yarnClient.killApplication(appId) } def getApplicationMaster(appId: ApplicationId): Option[ApplicationReport] = { - val getAppsReq = Records.newRecord(classOf[GetAllApplicationsRequest]) - val getAppsRsp = applicationsManager.getAllApplications(getAppsReq) - - getAppsRsp.getApplicationList.filter(appRep => appId.equals(appRep.getApplicationId())).headOption + yarnClient + .getApplications + .filter(appRep => appId.equals(appRep.getApplicationId())) + .headOption } def getApplicationMasters(status: Option[ApplicationStatus]): List[ApplicationReport] = { - val getAppsReq = Records.newRecord(classOf[GetAllApplicationsRequest]) - val getAppsRsp = applicationsManager.getAllApplications(getAppsReq) + val getAppsRsp = yarnClient.getApplications status match { - case Some(status) => getAppsRsp.getApplicationList - .filter(appRep => status.equals(convertState(appRep).get)).toList - case None => getAppsRsp.getApplicationList.toList + case Some(status) => getAppsRsp + .filter(appRep => status.equals(convertState(appRep.getYarnApplicationState, appRep.getFinalApplicationStatus).get)) + .toList + case None => getAppsRsp.toList } } - private def convertState(appReport: ApplicationReport): Option[ApplicationStatus] = { - (appReport.getYarnApplicationState(), appReport.getFinalApplicationStatus()) match { + private def convertState(state: YarnApplicationState, status: FinalApplicationStatus): Option[ApplicationStatus] = { + (state, status) match { case (YarnApplicationState.FINISHED, FinalApplicationStatus.SUCCEEDED) => Some(SuccessfulFinish) case (YarnApplicationState.KILLED, _) | (YarnApplicationState.FAILED, _) => Some(UnsuccessfulFinish) case (YarnApplicationState.NEW, _) | (YarnApplicationState.SUBMITTED, _) => Some(New) http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4217fc7f/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala index 7f830f2..1cac06a 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala @@ -27,9 +27,10 @@ import org.apache.hadoop.yarn.util.ConverterUtils import scala.collection.JavaConversions._ import org.apache.samza.metrics.{ JmxServer, MetricsRegistryMap } import grizzled.slf4j.Logging -import org.apache.hadoop.yarn.client.AMRMClientImpl +import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl import org.apache.samza.config.YarnConfig._ import org.apache.samza.job.yarn.SamzaAppMasterTaskManager._ +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest /** * When YARN executes an application master, it needs a bash command to @@ -44,22 +45,22 @@ import org.apache.samza.job.yarn.SamzaAppMasterTaskManager._ */ object SamzaAppMaster extends Logging { def main(args: Array[String]) { - val containerIdStr = System.getenv(ApplicationConstants.AM_CONTAINER_ID_ENV) + val containerIdStr = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.toString) info("got container id: %s" format containerIdStr) val containerId = ConverterUtils.toContainerId(containerIdStr) val applicationAttemptId = containerId.getApplicationAttemptId info("got app attempt id: %s" format applicationAttemptId) - val nodeHostString = System.getenv(ApplicationConstants.NM_HOST_ENV) + val nodeHostString = System.getenv(ApplicationConstants.Environment.NM_HOST.toString) info("got node manager host: %s" format nodeHostString) - val nodePortString = System.getenv(ApplicationConstants.NM_PORT_ENV) + val nodePortString = System.getenv(ApplicationConstants.Environment.NM_PORT.toString) info("got node manager port: %s" format nodePortString) - val nodeHttpPortString = System.getenv(ApplicationConstants.NM_HTTP_PORT_ENV) + val nodeHttpPortString = System.getenv(ApplicationConstants.Environment.NM_HTTP_PORT.toString) info("got node manager http port: %s" format nodeHttpPortString) val config = new MapConfig(JsonConfigSerializer.fromJson(System.getenv(YarnConfig.ENV_CONFIG))) info("got config: %s" format config) val hConfig = new YarnConfiguration hConfig.set("fs.http.impl", "samza.util.hadoop.HttpFileSystem") - val amClient = new AMRMClientImpl(applicationAttemptId) + val amClient = new AMRMClientImpl[ContainerRequest] val clientHelper = new ClientHelper(hConfig) val registry = new MetricsRegistryMap val containerMem = config.getContainerMaxMemoryMb.getOrElse(DEFAULT_CONTAINER_MEM) http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4217fc7f/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala index 95a6f05..5d09265 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala @@ -20,7 +20,7 @@ package org.apache.samza.job.yarn import grizzled.slf4j.Logging import org.apache.samza.SamzaException -import org.apache.hadoop.yarn.client.AMRMClient +import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.api.records.FinalApplicationStatus @@ -29,7 +29,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus * this means registering and unregistering with the RM, and shutting down * when the RM tells us to Reboot. */ -class SamzaAppMasterLifecycle(containerMem: Int, containerCpu: Int, state: SamzaAppMasterState, amClient: AMRMClient, conf: YarnConfiguration) extends YarnAppMasterListener with Logging { +class SamzaAppMasterLifecycle(containerMem: Int, containerCpu: Int, state: SamzaAppMasterState, amClient: AMRMClient[_], conf: YarnConfiguration) extends YarnAppMasterListener with Logging { var validResourceRequest = true var shutdownMessage: String = null @@ -43,15 +43,12 @@ class SamzaAppMasterLifecycle(containerMem: Int, containerCpu: Int, state: Samza // validate that the YARN cluster can handle our container resource requirements val maxCapability = response.getMaximumResourceCapability - val minCapability = response.getMinimumResourceCapability val maxMem = maxCapability.getMemory - val minMem = minCapability.getMemory val maxCpu = maxCapability.getVirtualCores - val minCpu = minCapability.getVirtualCores - info("Got AM register response. The YARN RM supports container requests with max-mem: %s, min-mem: %s, max-cpu: %s, min-cpu: %s" format (maxMem, minMem, maxCpu, minCpu)) + info("Got AM register response. The YARN RM supports container requests with max-mem: %s, max-cpu: %s" format (maxMem, maxCpu)) - if (containerMem < minMem || containerMem > maxMem || containerCpu < minCpu || containerCpu > maxCpu) { + if (containerMem > maxMem || containerCpu > maxCpu) { shutdownMessage = "The YARN cluster is unable to run your job due to unsatisfiable resource requirements. You asked for mem: %s, and cpu: %s." format (containerMem, containerCpu) error(shutdownMessage) validResourceRequest = false http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4217fc7f/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala index ce3fcc3..82d90d4 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala @@ -32,6 +32,9 @@ import org.apache.samza.SamzaException * up the web service when initialized. */ class SamzaAppMasterService(config: Config, state: SamzaAppMasterState, registry: ReadableMetricsRegistry, clientHelper: ClientHelper) extends YarnAppMasterListener with Logging { + var rpcApp: WebAppServer = null + var webApp: WebAppServer = null + override def onInit() { // try starting the samza AM dashboard. try ten times, just in case we // pick a port that's already in use. @@ -41,13 +44,13 @@ class SamzaAppMasterService(config: Config, state: SamzaAppMasterState, registry info("Starting webapp at rpc %d, tracking port %d" format (rpcPort, trackingPort)) try { - val rpcapp = new WebAppServer("/", rpcPort) - rpcapp.addServlet("/*", new ApplicationMasterRestServlet(config, state, registry)) - rpcapp.start + rpcApp = new WebAppServer("/", rpcPort) + rpcApp.addServlet("/*", new ApplicationMasterRestServlet(config, state, registry)) + rpcApp.start - val webapp = new WebAppServer("/", trackingPort) - webapp.addServlet("/*", new ApplicationMasterWebServlet(config, state)) - webapp.start + webApp = new WebAppServer("/", trackingPort) + webApp.addServlet("/*", new ApplicationMasterWebServlet(config, state)) + webApp.start state.rpcPort = rpcPort state.trackingPort = trackingPort @@ -63,4 +66,16 @@ class SamzaAppMasterService(config: Config, state: SamzaAppMasterState, registry throw new SamzaException("Giving up trying to start the webapp, since we keep getting ports that are already in use") } } + + override def onShutdown() { + if (rpcApp != null) { + rpcApp.context.stop + rpcApp.server.stop + } + + if (webApp != null) { + webApp.context.stop + webApp.server.stop + } + } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4217fc7f/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala index 1a13ee5..51097c1 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala @@ -38,8 +38,8 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus import org.apache.samza.util.Util import scala.collection.JavaConversions._ import org.apache.samza.SamzaException -import org.apache.hadoop.yarn.client.AMRMClient -import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest +import org.apache.hadoop.yarn.client.api.AMRMClient +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest import org.apache.hadoop.yarn.api.records.Priority import org.apache.hadoop.yarn.api.records.Resource import org.apache.hadoop.yarn.util.Records @@ -49,7 +49,6 @@ import org.apache.hadoop.yarn.util.ConverterUtils import org.apache.hadoop.yarn.api.records.ContainerLaunchContext import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.hadoop.yarn.api.ContainerManager import org.apache.hadoop.yarn.api.records.LocalResourceType import org.apache.hadoop.yarn.api.records.LocalResourceVisibility import org.apache.hadoop.yarn.ipc.YarnRPC @@ -59,6 +58,11 @@ import org.apache.hadoop.net.NetUtils import java.util.Collections import java.security.PrivilegedAction import org.apache.samza.job.ShellCommandBuilder +import org.apache.hadoop.io.DataOutputBuffer +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier +import java.nio.ByteBuffer +import org.apache.hadoop.yarn.client.api.NMClient +import org.apache.hadoop.yarn.client.api.impl.NMClientImpl object SamzaAppMasterTaskManager { val DEFAULT_CONTAINER_MEM = 256 @@ -79,7 +83,7 @@ case class TaskFailure(val count: Int, val lastFailure: Long) * containers, handling failures, and notifying the application master that the * job is done. */ -class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaAppMasterState, amClient: AMRMClient, conf: YarnConfiguration) extends YarnAppMasterListener with Logging { +class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaAppMasterState, amClient: AMRMClient[ContainerRequest], conf: YarnConfiguration) extends YarnAppMasterListener with Logging { import SamzaAppMasterTaskManager._ state.taskCount = config.getTaskCount match { @@ -92,18 +96,28 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA val partitions = Util.getMaxInputStreamPartitions(config) var taskFailures = Map[Int, TaskFailure]() var tooManyFailedContainers = false + var containerManager: NMClientImpl = null override def shouldShutdown = state.completedTasks == state.taskCount || tooManyFailedContainers override def onInit() { state.neededContainers = state.taskCount state.unclaimedTasks = (0 until state.taskCount).toSet + containerManager = new NMClientImpl() + containerManager.init(conf) + containerManager.start info("Requesting %s containers" format state.taskCount) requestContainers(config.getContainerMaxMemoryMb.getOrElse(DEFAULT_CONTAINER_MEM), config.getContainerMaxCpuCores.getOrElse(DEFAULT_CPU_CORES), state.neededContainers) } + override def onShutdown { + if (containerManager != null) { + containerManager.stop + } + } + override def onContainerAllocated(container: Container) { val containerIdStr = ConverterUtils.toString(container.getId) @@ -124,15 +138,12 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA info("Task ID %s using command %s" format (taskId, command)) val env = cmdBuilder.buildEnvironment.map { case (k, v) => (k, Util.envVarEscape(v)) } info("Task ID %s using env %s" format (taskId, env)) - val user = UserGroupInformation.getCurrentUser - info("Task ID %s using user %s" format (taskId, user)) val path = new Path(config.getPackagePath.get) info("Starting task ID %s using package path %s" format (taskId, path)) startContainer( path, container, - user, env.toMap, "export SAMZA_LOG_DIR=%s && ln -sfn %s logs && exec ./__package/%s 1>logs/%s 2>logs/%s" format (ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.LOG_DIR_EXPANSION_VAR, command, ApplicationConstants.STDOUT, ApplicationConstants.STDERR)) @@ -273,26 +284,11 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA } } - protected def startContainer(packagePath: Path, container: Container, ugi: UserGroupInformation, env: Map[String, String], cmds: String*) { - info("starting container %s %s %s %s %s" format (packagePath, container, ugi, env, cmds)) + protected def startContainer(packagePath: Path, container: Container, env: Map[String, String], cmds: String*) { + info("starting container %s %s %s %s" format (packagePath, container, env, cmds)) // connect to container manager (based on similar code in the ContainerLauncher in Hadoop MapReduce) val contToken = container.getContainerToken val address = container.getNodeId.getHost + ":" + container.getNodeId.getPort - var user = ugi - - if (UserGroupInformation.isSecurityEnabled) { - debug("security is enabled") - val hadoopToken = new Token[ContainerTokenIdentifier](contToken.getIdentifier.array, contToken.getPassword.array, new Text(contToken.getKind), new Text(contToken.getService)) - user = UserGroupInformation.createRemoteUser(address) - user.addToken(hadoopToken) - debug("changed user to %s" format user) - } - - val containerManager = user.doAs(new PrivilegedAction[ContainerManager] { - def run(): ContainerManager = { - return YarnRPC.create(conf).getProxy(classOf[ContainerManager], NetUtils.createSocketAddr(address), conf).asInstanceOf[ContainerManager] - } - }) // set the local package so that the containers and app master are provisioned with it val packageResource = Records.newRecord(classOf[LocalResource]) @@ -305,12 +301,24 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA packageResource.setType(LocalResourceType.ARCHIVE) packageResource.setVisibility(LocalResourceVisibility.APPLICATION) + // copy tokens (copied from dist shell example) + val credentials = UserGroupInformation.getCurrentUser.getCredentials + val dob = new DataOutputBuffer + credentials.writeTokenStorageToStream(dob) + // now remove the AM->RM token so that containers cannot access it + val iter = credentials.getAllTokens.iterator + while (iter.hasNext) { + val token = iter.next + if (token.getKind.equals(AMRMTokenIdentifier.KIND_NAME)) { + iter.remove + } + } + val allTokens = ByteBuffer.wrap(dob.getData, 0, dob.getLength) + // start the container val ctx = Records.newRecord(classOf[ContainerLaunchContext]) ctx.setEnvironment(env) - ctx.setContainerId(container.getId()) - ctx.setResource(container.getResource()) - ctx.setUser(user.getShortUserName()) + ctx.setTokens(allTokens.duplicate) ctx.setCommands(cmds.toList) ctx.setLocalResources(Collections.singletonMap("__package", packageResource)) @@ -319,7 +327,7 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA val startContainerRequest = Records.newRecord(classOf[StartContainerRequest]) startContainerRequest.setContainerLaunchContext(ctx) - containerManager.startContainer(startContainerRequest) + containerManager.startContainer(container, ctx) } protected def requestContainers(memMb: Int, cpuCores: Int, containers: Int) { @@ -329,6 +337,6 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA priority.setPriority(0) capability.setMemory(memMb) capability.setVirtualCores(cpuCores) - amClient.addContainerRequest(new ContainerRequest(capability, null, null, priority, containers)) + (0 until containers).foreach(idx => amClient.addContainerRequest(new ContainerRequest(capability, null, null, priority))) } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4217fc7f/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMaster.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMaster.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMaster.scala index 14e3865..4938192 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMaster.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMaster.scala @@ -21,7 +21,9 @@ package org.apache.samza.job.yarn import scala.collection.JavaConversions._ import grizzled.slf4j.Logging -import org.apache.hadoop.yarn.client.AMRMClient +import org.apache.hadoop.yarn.client.api.AMRMClient +import org.apache.hadoop.yarn.api.records.AMCommand._ +import org.apache.samza.SamzaException /** * YARN's API is somewhat clunky. Most implementations just sit in a loop, and @@ -36,20 +38,27 @@ import org.apache.hadoop.yarn.client.AMRMClient * SamzaAppMaster uses this class to wire up all of Samza's application master * listeners. */ -class YarnAppMaster(pollIntervalMs: Long, listeners: List[YarnAppMasterListener], amClient: AMRMClient) extends Logging { +class YarnAppMaster(pollIntervalMs: Long, listeners: List[YarnAppMasterListener], amClient: AMRMClient[_]) extends Logging { var isShutdown = false - def this(listeners: List[YarnAppMasterListener], amClient: AMRMClient) = this(1000, listeners, amClient) + def this(listeners: List[YarnAppMasterListener], amClient: AMRMClient[_]) = this(1000, listeners, amClient) def run { try { listeners.foreach(_.onInit) while (!isShutdown && !listeners.map(_.shouldShutdown).reduceLeft(_ || _)) { - val response = amClient.allocate(0).getAMResponse + val response = amClient.allocate(0) - if (response.getReboot) { - listeners.foreach(_.onReboot) + if (response.getAMCommand != null) { + response.getAMCommand match { + case AM_RESYNC | AM_SHUTDOWN => + listeners.foreach(_.onReboot) + case _ => + val msg = "Unhandled value of AMCommand: " + response.getAMCommand + error(msg); + throw new SamzaException(msg); + } } listeners.foreach(_.onEventLoop) http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4217fc7f/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala index bde38e1..3f03399 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala @@ -49,7 +49,6 @@ class YarnJob(config: Config, hadoopConfig: Configuration) extends StreamJob { new Path(config.getPackagePath.getOrElse(throw new SamzaException("No YARN package path defined in config."))), config.getAMContainerMaxMemoryMb.getOrElse(512), 1, - UserGroupInformation.getCurrentUser, List( "export SAMZA_LOG_DIR=%s && ln -sfn %s logs && exec ./__package/bin/run-am.sh 1>logs/%s 2>logs/%s" format (ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.STDOUT, ApplicationConstants.STDERR)), http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4217fc7f/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala index cbd7c1e..734d9d2 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala @@ -26,6 +26,7 @@ import org.apache.samza.config.Config import scala.collection.JavaConversions._ import scala.collection.immutable.TreeMap import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.hadoop.yarn.webapp.util.WebAppUtils class ApplicationMasterWebServlet(config: Config, state: SamzaAppMasterState) extends ScalatraServlet with ScalateSupport { val yarnConfig = new YarnConfiguration @@ -38,6 +39,6 @@ class ApplicationMasterWebServlet(config: Config, state: SamzaAppMasterState) ex layoutTemplate("/WEB-INF/views/index.scaml", "config" -> TreeMap(config.toMap.toArray: _*), "state" -> state, - "rmHttpAddress" -> YarnConfiguration.getRMWebAppURL(yarnConfig)) + "rmHttpAddress" -> WebAppUtils.getRMWebAppURLWithScheme(yarnConfig)) } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4217fc7f/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala index 8bf48eb..b24f85a 100644 --- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala +++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala @@ -29,64 +29,53 @@ import org.apache.hadoop.yarn.api.records.ContainerId import org.apache.hadoop.yarn.api.records.FinalApplicationStatus import org.apache.hadoop.yarn.api.records.Priority import org.apache.hadoop.yarn.api.records.Resource -import org.apache.hadoop.yarn.client.AMRMClient -import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest +import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl import org.apache.hadoop.yarn.api.records.Resource import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse -import org.apache.hadoop.yarn.service._ +import org.apache.hadoop.service._ import org.apache.samza.SamzaException import org.apache.hadoop.yarn.api.records.ApplicationAccessType +import java.nio.ByteBuffer class TestSamzaAppMasterLifecycle { - val amClient = new AMRMClient { + val amClient = new AMRMClientImpl() { var host = "" var port = 0 var status: FinalApplicationStatus = null - def registerApplicationMaster(appHostName: String, appHostPort: Int, appTrackingUrl: String): RegisterApplicationMasterResponse = { + override def registerApplicationMaster(appHostName: String, appHostPort: Int, appTrackingUrl: String): RegisterApplicationMasterResponse = { this.host = appHostName this.port = appHostPort new RegisterApplicationMasterResponse { - def setApplicationACLs(map: java.util.Map[ApplicationAccessType, String]) = null - def getApplicationACLs = null - def setMaximumResourceCapability(r: Resource) = null - def getMaximumResourceCapability = new Resource { + override def setApplicationACLs(map: java.util.Map[ApplicationAccessType, String]) = null + override def getApplicationACLs = null + override def setMaximumResourceCapability(r: Resource) = null + override def getMaximumResourceCapability = new Resource { def getMemory = 512 def getVirtualCores = 2 def setMemory(memory: Int) {} def setVirtualCores(vCores: Int) {} def compareTo(o: Resource) = 0 } - def setMinimumResourceCapability(r: Resource) = null - def getMinimumResourceCapability = new Resource { - def getMemory = 128 - def getVirtualCores = 1 - def setMemory(memory: Int) {} - def setVirtualCores(vCores: Int) {} - def compareTo(o: Resource) = 0 - } + override def getClientToAMTokenMasterKey = null + override def setClientToAMTokenMasterKey(buffer: ByteBuffer) {} } } - def allocate(progressIndicator: Float): AllocateResponse = null - def unregisterApplicationMaster(appStatus: FinalApplicationStatus, + override def allocate(progressIndicator: Float): AllocateResponse = null + override def unregisterApplicationMaster(appStatus: FinalApplicationStatus, appMessage: String, appTrackingUrl: String) { this.status = appStatus } - def addContainerRequest(req: ContainerRequest) {} - def removeContainerRequest(req: ContainerRequest) {} - def releaseAssignedContainer(containerId: ContainerId) {} - def getClusterAvailableResources(): Resource = null - def getClusterNodeCount() = 1 + override def releaseAssignedContainer(containerId: ContainerId) {} + override def getClusterNodeCount() = 1 - def init(config: Configuration) {} - def start() {} - def stop() {} - def register(listener: ServiceStateChangeListener) {} - def unregister(listener: ServiceStateChangeListener) {} - def getName(): String = "" - def getConfig() = null - def getServiceState() = null - def getStartTime() = 0L + override def init(config: Configuration) {} + override def start() {} + override def stop() {} + override def getName(): String = "" + override def getConfig() = null + override def getStartTime() = 0L } @Test @@ -125,9 +114,7 @@ class TestSamzaAppMasterLifecycle { val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "test", 1, 2) state.rpcPort = 1 List(new SamzaAppMasterLifecycle(768, 1, state, amClient, new YarnConfiguration), - new SamzaAppMasterLifecycle(0, 1, state, amClient, new YarnConfiguration), - new SamzaAppMasterLifecycle(368, 3, state, amClient, new YarnConfiguration), - new SamzaAppMasterLifecycle(768, 0, state, amClient, new YarnConfiguration)).map(saml => { + new SamzaAppMasterLifecycle(368, 3, state, amClient, new YarnConfiguration)).map(saml => { saml.onInit assertTrue(saml.shouldShutdown) }) http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4217fc7f/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala index c9f7029..ee3ffef 100644 --- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala +++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala @@ -32,9 +32,10 @@ import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.conf.Configuration import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse -import org.apache.hadoop.yarn.client.AMRMClient -import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest -import org.apache.hadoop.yarn.service._ +import org.apache.hadoop.yarn.client.api.AMRMClient +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest +import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl +import org.apache.hadoop.service._ import org.apache.hadoop.fs.Path import org.apache.hadoop.yarn.api.records.NodeReport import TestSamzaAppMasterTaskManager._ @@ -45,90 +46,89 @@ import org.apache.samza.SamzaException object TestSamzaAppMasterTaskManager { def getContainer(containerId: ContainerId) = new Container { - def getId(): ContainerId = containerId - def setId(id: ContainerId) {} - def getNodeId(): NodeId = new NodeId { + override def getId(): ContainerId = containerId + override def setId(id: ContainerId) {} + override def getNodeId(): NodeId = new NodeId { var host = "" var port = 12345 - def getHost() = host - def setHost(host: String) = { + override def getHost() = host + override def setHost(host: String) = { this.host = host } - def getPort() = port - def setPort(port: Int) = { + override def getPort() = port + override def setPort(port: Int) = { this.port = port } + override def build() = null } - def setNodeId(nodeId: NodeId) {} - def getNodeHttpAddress(): String = "" - def setNodeHttpAddress(nodeHttpAddress: String) {} - def getResource(): Resource = null - def setResource(resource: Resource) {} - def getPriority(): Priority = null - def setPriority(priority: Priority) {} - def getState(): ContainerState = null - def setState(state: ContainerState) {} - def getContainerToken(): ContainerToken = null - def setContainerToken(containerToken: ContainerToken) {} - def getContainerStatus(): ContainerStatus = null - def setContainerStatus(containerStatus: ContainerStatus) {} - def compareTo(c: Container): Int = containerId.compareTo(c.getId) + override def setNodeId(nodeId: NodeId) {} + override def getNodeHttpAddress(): String = "" + override def setNodeHttpAddress(nodeHttpAddress: String) {} + override def getResource(): Resource = null + override def setResource(resource: Resource) {} + override def getPriority(): Priority = null + override def setPriority(priority: Priority) {} + override def getContainerToken(): Token = null + override def setContainerToken(containerToken: Token) {} + override def compareTo(c: Container): Int = containerId.compareTo(c.getId) } def getContainerStatus(containerId: ContainerId, exitCode: Int, diagnostic: String) = new ContainerStatus { - def getContainerId(): ContainerId = containerId - def setContainerId(containerId: ContainerId) {} - def getState(): ContainerState = null - def setState(state: ContainerState) {} - def getExitStatus(): Int = exitCode - def setExitStatus(exitStatus: Int) {} - def getDiagnostics() = diagnostic - def setDiagnostics(diagnostics: String) = {} + override def getContainerId(): ContainerId = containerId + override def setContainerId(containerId: ContainerId) {} + override def getState(): ContainerState = null + override def setState(state: ContainerState) {} + override def getExitStatus(): Int = exitCode + override def setExitStatus(exitStatus: Int) {} + override def getDiagnostics() = diagnostic + override def setDiagnostics(diagnostics: String) = {} } - def getAmClient = (response: AllocateResponse) => new AMRMClient { + def getAmClient = (response: AllocateResponse) => new AMRMClientImpl[ContainerRequest] { var requests: List[ContainerRequest] = List[ContainerRequest]() - var release: List[ContainerId] = List[ContainerId]() - - def registerApplicationMaster(appHostName: String, appHostPort: Int, appTrackingUrl: String): RegisterApplicationMasterResponse = null - def allocate(progressIndicator: Float): AllocateResponse = response - def unregisterApplicationMaster(appStatus: FinalApplicationStatus, appMessage: String, appTrackingUrl: String) = null - def addContainerRequest(req: ContainerRequest) { requests ::= req } - def removeContainerRequest(req: ContainerRequest) {} - def releaseAssignedContainer(containerId: ContainerId) { release ::= containerId } - def getClusterAvailableResources(): Resource = null - def getClusterNodeCount() = 1 - - def init(config: Configuration) {} - def start() {} - def stop() {} - def register(listener: ServiceStateChangeListener) {} - def unregister(listener: ServiceStateChangeListener) {} - def getName(): String = "" - def getConfig() = null - def getServiceState() = null - def getStartTime() = 0L + + def getRelease = release + def resetRelease = release.clear + override def registerApplicationMaster(appHostName: String, appHostPort: Int, appTrackingUrl: String): RegisterApplicationMasterResponse = null + override def allocate(progressIndicator: Float): AllocateResponse = response + override def unregisterApplicationMaster(appStatus: FinalApplicationStatus, appMessage: String, appTrackingUrl: String) = null + override def addContainerRequest(req: ContainerRequest) { requests ::= req } + override def removeContainerRequest(req: ContainerRequest) {} + override def getClusterNodeCount() = 1 + + override def init(config: Configuration) {} + override def start() {} + override def stop() {} + override def getName(): String = "" + override def getConfig() = null + override def getStartTime() = 0L } def getAppMasterResponse(reboot: Boolean, containers: List[Container], completed: List[ContainerStatus]) = new AllocateResponse { - def getAMResponse = new AMResponse { - def getReboot(): Boolean = reboot - def setReboot(reboot: Boolean) {} - def getResponseId() = 0 - def setResponseId(responseId: Int) {} - def getAllocatedContainers() = containers - def setAllocatedContainers(containers: java.util.List[Container]) {} - def getAvailableResources(): Resource = null - def setAvailableResources(limit: Resource) {} - def getCompletedContainersStatuses() = completed - def setCompletedContainersStatuses(containers: java.util.List[ContainerStatus]) {} - def setUpdatedNodes(nodes: java.util.List[NodeReport]) {} - def getUpdatedNodes = null + override def getResponseId() = 0 + override def setResponseId(responseId: Int) {} + override def getAllocatedContainers() = containers + override def setAllocatedContainers(containers: java.util.List[Container]) {} + override def getAvailableResources(): Resource = null + override def setAvailableResources(limit: Resource) {} + override def getCompletedContainersStatuses() = completed + override def setCompletedContainersStatuses(containers: java.util.List[ContainerStatus]) {} + override def setUpdatedNodes(nodes: java.util.List[NodeReport]) {} + override def getUpdatedNodes = null + override def getNumClusterNodes = 1 + override def setNumClusterNodes(num: Int) {} + override def getNMTokens = null + override def setNMTokens(nmTokens: java.util.List[NMToken]) {} + override def setAMCommand(command: AMCommand) {} + override def getPreemptionMessage = null + override def setPreemptionMessage(request: PreemptionMessage) {} + + override def getAMCommand = if (reboot) { + AMCommand.AM_RESYNC + } else { + null } - def getNumClusterNodes = 1 - def setNumClusterNodes(num: Int) {} - def setAMResponse(response: AMResponse) {} } } @@ -169,7 +169,7 @@ class TestSamzaAppMasterTaskManager { val amClient = getAmClient(getAppMasterResponse(false, List(), List())) val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2) val taskManager = new SamzaAppMasterTaskManager(clock, config, state, amClient, new YarnConfiguration) { - override def startContainer(packagePath: Path, container: Container, ugi: UserGroupInformation, env: Map[String, String], cmds: String*) { + override def startContainer(packagePath: Path, container: Container, env: Map[String, String], cmds: String*) { // Do nothing. } } @@ -182,13 +182,13 @@ class TestSamzaAppMasterTaskManager { assert(taskManager.shouldShutdown == false) // 2. First is from onInit, second is from onContainerCompleted, since it failed. assertEquals(2, amClient.requests.size) - assertEquals(0, amClient.release.size) + assertEquals(0, amClient.getRelease.size) assertFalse(taskManager.shouldShutdown) // Now trigger an AM shutdown since our retry count is 1, and we're failing twice taskManager.onContainerAllocated(getContainer(container2)) taskManager.onContainerCompleted(getContainerStatus(container2, 1, "expecting a failure here")) assertEquals(2, amClient.requests.size) - assertEquals(0, amClient.release.size) + assertEquals(0, amClient.getRelease.size) assertTrue(taskManager.shouldShutdown) } @@ -200,7 +200,7 @@ class TestSamzaAppMasterTaskManager { var containersRequested = 0 var containersStarted = 0 val taskManager = new SamzaAppMasterTaskManager(clock, config, state, amClient, new YarnConfiguration) { - override def startContainer(packagePath: Path, container: Container, ugi: UserGroupInformation, env: Map[String, String], cmds: String*) { + override def startContainer(packagePath: Path, container: Container, env: Map[String, String], cmds: String*) { containersStarted += 1 } @@ -216,7 +216,7 @@ class TestSamzaAppMasterTaskManager { taskManager.onInit assert(taskManager.shouldShutdown == false) assert(amClient.requests.size == 1) - assert(amClient.release.size == 0) + assert(amClient.getRelease.size == 0) // allocate container 2 taskManager.onContainerAllocated(getContainer(container2)) @@ -234,12 +234,12 @@ class TestSamzaAppMasterTaskManager { assert(state.taskPartitions.size == 1) assert(state.unclaimedTasks.size == 0) assert(amClient.requests.size == 1) - assert(amClient.release.size == 1) - assert(amClient.release(0).equals(container3)) + assert(amClient.getRelease.size == 1) + assert(amClient.getRelease.head.equals(container3)) // reset the helper state, so we can make sure that releasing the container (next step) doesn't request more resources amClient.requests = List() - amClient.release = List() + amClient.resetRelease // now release the container, and make sure the AM doesn't ask for more assert(taskManager.shouldShutdown == false) @@ -250,14 +250,14 @@ class TestSamzaAppMasterTaskManager { assert(state.taskPartitions.size == 1) assert(state.unclaimedTasks.size == 0) assert(amClient.requests.size == 0) - assert(amClient.release.size == 0) + assert(amClient.getRelease.size == 0) // pretend container 2 is released due to an NM failure, and make sure that the AM requests a new container assert(taskManager.shouldShutdown == false) taskManager.onContainerCompleted(getContainerStatus(container2, -100, "pretend the container was 'lost' due to an NM failure")) assert(taskManager.shouldShutdown == false) assert(amClient.requests.size == 1) - assert(amClient.release.size == 0) + assert(amClient.getRelease.size == 0) } @Test @@ -270,7 +270,7 @@ class TestSamzaAppMasterTaskManager { state.taskCount = 2 var containersStarted = 0 val taskManager = new SamzaAppMasterTaskManager(clock, newConfig, state, amClient, new YarnConfiguration) { - override def startContainer(packagePath: Path, container: Container, ugi: UserGroupInformation, env: Map[String, String], cmds: String*) { + override def startContainer(packagePath: Path, container: Container, env: Map[String, String], cmds: String*) { containersStarted += 1 } } @@ -280,8 +280,8 @@ class TestSamzaAppMasterTaskManager { assert(taskManager.shouldShutdown == false) taskManager.onInit assert(taskManager.shouldShutdown == false) - assert(amClient.requests.size == 1) - assert(amClient.release.size == 0) + assert(amClient.requests.size == 2) + assert(amClient.getRelease.size == 0) taskManager.onContainerAllocated(getContainer(container2)) assert(state.neededContainers == 1) assert(state.runningTasks.size == 1) @@ -337,7 +337,7 @@ class TestSamzaAppMasterTaskManager { var containersRequested = 0 var containersStarted = 0 val taskManager = new SamzaAppMasterTaskManager(clock, config, state, amClient, new YarnConfiguration) { - override def startContainer(packagePath: Path, container: Container, ugi: UserGroupInformation, env: Map[String, String], cmds: String*) { + override def startContainer(packagePath: Path, container: Container, env: Map[String, String], cmds: String*) { containersStarted += 1 } @@ -353,7 +353,7 @@ class TestSamzaAppMasterTaskManager { taskManager.onInit assert(taskManager.shouldShutdown == false) assert(amClient.requests.size == 1) - assert(amClient.release.size == 0) + assert(amClient.getRelease.size == 0) assert(state.neededContainers == 1) assert(state.runningTasks.size == 0) assert(state.taskPartitions.size == 0) @@ -373,8 +373,8 @@ class TestSamzaAppMasterTaskManager { assert(containersRequested == 1) assert(containersStarted == 1) assert(amClient.requests.size == 1) - assert(amClient.release.size == 1) - assert(amClient.release(0).equals(container3)) + assert(amClient.getRelease.size == 1) + assert(amClient.getRelease.head.equals(container3)) } @Test @@ -429,6 +429,6 @@ class MockSystemFactory extends SystemFactory { class MockSinglePartitionManager extends SystemAdmin { def getPartitions(streamName: String) = Set(new Partition(0)) - + def getLastOffsets(streams: java.util.Set[String]) = throw new SamzaException("Need to implement this") } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4217fc7f/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestYarnAppMaster.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestYarnAppMaster.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestYarnAppMaster.scala index 0040648..98f7844 100644 --- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestYarnAppMaster.scala +++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestYarnAppMaster.scala @@ -25,7 +25,6 @@ import org.apache.hadoop.yarn.api.records.Container import org.apache.hadoop.yarn.api.records.ContainerStatus import org.apache.hadoop.yarn.api.records.ResourceRequest import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.hadoop.yarn.api.records.AMResponse import org.apache.hadoop.yarn.api.records.ContainerId import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse
