http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java index 7778a38..1fac7f4 100644 --- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java +++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java @@ -19,6 +19,8 @@ package org.apache.samza.job.yarn; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.records.*; import org.apache.hadoop.yarn.client.api.AMRMClient; @@ -40,6 +42,7 @@ import org.apache.samza.util.hadoop.HttpFileSystem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -90,6 +93,7 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement */ private final SamzaYarnAppMasterService service; + private final YarnConfig yarnConfig; /** * State variables to map Yarn specific callbacks into Samza specific callbacks. @@ -126,18 +130,19 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement int nodePort = Integer.parseInt(nodePortString); int nodeHttpPort = Integer.parseInt(nodeHttpPortString); YarnConfig yarnConfig = new YarnConfig(config); + this.yarnConfig = yarnConfig; int interval = yarnConfig.getAMPollIntervalMs(); //Instantiate the AM Client. this.amClient = AMRMClientAsync.createAMRMClientAsync(interval, this); - this.state = new YarnAppState(jobModelManager, -1, containerId, nodeHostString, nodePort, nodeHttpPort, samzaAppState); + this.state = new YarnAppState(-1, containerId, nodeHostString, nodePort, nodeHttpPort); log.info("Initialized YarnAppState: {}", state.toString()); - this.service = new SamzaYarnAppMasterService(config, this.state, registry); + this.service = new SamzaYarnAppMasterService(config, samzaAppState, this.state, registry, hConfig); log.info("ContainerID str {}, Nodehost {} , Nodeport {} , NodeHttpport {}", new Object [] {containerIdStr, nodeHostString, nodePort, nodeHttpPort}); - this.lifecycle = new SamzaYarnAppMasterLifecycle(yarnConfig.getContainerMaxMemoryMb(), yarnConfig.getContainerMaxCpuCores(), state, amClient ); + this.lifecycle = new SamzaYarnAppMasterLifecycle(yarnConfig.getContainerMaxMemoryMb(), yarnConfig.getContainerMaxCpuCores(), samzaAppState, state, amClient ); yarnContainerRunner = new YarnContainerRunner(config, hConfig); } @@ -312,6 +317,33 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement amClient.stop(); log.info("Stopping the AM service " ); service.onShutdown(); + + if(status != SamzaApplicationState.SamzaAppStatus.UNDEFINED) { + cleanupStagingDir(); + } + } + + /** + * Cleans up the staging directory of the job. All exceptions during the cleanup + * are swallowed. + */ + private void cleanupStagingDir() { + String yarnJobStagingDirectory = yarnConfig.getYarnJobStagingDirectory(); + if(yarnJobStagingDirectory != null) { + JobContext context = new JobContext(); + context.setAppStagingDir(new Path(yarnJobStagingDirectory)); + + FileSystem fs = null; + try { + fs = FileSystem.get(hConfig); + } catch (IOException e) { + log.error("Unable to clean up file system: {}", e); + return; + } + if(fs != null) { + YarnJobUtil.cleanupStagingDir(context, fs); + } + } } /**
http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml b/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml index 93660c7..93176ff 100644 --- a/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml +++ b/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml @@ -15,7 +15,8 @@ KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --@ val state: org.apache.samza.job.yarn.SamzaAppState +-@ val state: org.apache.samza.job.yarn.YarnAppState +-@ val samzaAppState: org.apache.samza.clustermanager.SamzaApplicationState -@ val config: scala.collection.immutable.TreeMap[String, String] -@ val rmHttpAddress: String -@ val jobName: String = config.get("job.name").getOrElse("MISSING JOB NAME") @@ -91,10 +92,10 @@ %a(target="_blank" href="http://#{state.nodeHost}:#{state.nodeHttpPort.toString}/node/containerlogs/#{state.amContainerId.toString}/#{username}")= state.amContainerId.toString %tr %td.key JMX server url - %td= state.jmxUrl + %td= samzaAppState.jmxUrl %tr %td.key JMX server tunneling url - %td= state.jmxTunnelingUrl + %td= samzaAppState.jmxTunnelingUrl %div.tab-pane#containers %h2 Containers @@ -102,16 +103,16 @@ %tr %tr %td.key Completed - %td= state.completedContainers.toString + %td= samzaAppState.completedContainers.toString %tr %td.key Needed - %td= state.neededContainers.toString + %td= samzaAppState.neededContainers.toString %tr %td.key Failed - %td= state.failedContainers.toString + %td= samzaAppState.failedContainers.toString %tr %td.key Released - %td= state.releasedContainers.toString + %td= samzaAppState.releasedContainers.toString %h2 Running Containers %table.table.table-striped.table-bordered.tablesorter#containers-table @@ -124,7 +125,7 @@ %th Up Time %th JMX access %tbody - - for((containerId, container) <- state.runningContainers) + - for((containerId, container) <- state.runningYarnContainers) %tr %td #{containerId.toString} %td @@ -136,8 +137,8 @@ %td Up time: #{container.upTimeStr()} %td - Ordinary: #{state.jobCoordinator.jobModel.getContainerToHostValue(containerId, org.apache.samza.coordinator.stream.messages.SetContainerHostMapping.JMX_URL_KEY)} - Tunneling: #{state.jobCoordinator.jobModel.getContainerToHostValue(containerId, org.apache.samza.coordinator.stream.messages.SetContainerHostMapping.JMX_TUNNELING_URL_KEY)} + Ordinary: #{samzaAppState.jobModelManager.jobModel.getContainerToHostValue(containerId, org.apache.samza.coordinator.stream.messages.SetContainerHostMapping.JMX_URL_KEY)} + Tunneling: #{samzaAppState.jobModelManager.jobModel.getContainerToHostValue(containerId, org.apache.samza.coordinator.stream.messages.SetContainerHostMapping.JMX_TUNNELING_URL_KEY)} %h2 Failed Containers %table.table.table-striped.table-bordered.tablesorter#containers-table @@ -162,10 +163,10 @@ %tbody %tr %td.key Total - %td= state.containerCount.toString + %td= samzaAppState.containerCount.toString %tr %td.key Finished - %td= state.finishedContainers.size.toString + %td= samzaAppState.finishedContainers.toString %h3 TaskName Assignment %table.table.table-striped.table-bordered.tablesorter#taskids-table @@ -176,8 +177,8 @@ %th SystemStreamPartitions %th Container %tbody - - for((containerId, container) <- state.runningContainers) - - val containerModel = state.jobCoordinator.jobModel.getContainers.get(containerId) + - for((containerId, container) <- state.runningYarnContainers) + - val containerModel = samzaAppState.jobModelManager.jobModel.getContainers.get(containerId) - for((taskName, taskModel) <- containerModel.getTasks) %tr %td= containerId http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala deleted file mode 100644 index 1fb18be..0000000 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.job.yarn - -import java.io.IOException - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{Path, FileSystem} - -import scala.collection.JavaConversions.asScalaBuffer -import org.apache.hadoop.yarn.api.ApplicationConstants -import org.apache.hadoop.yarn.api.records.{FinalApplicationStatus, Container, ContainerStatus, NodeReport} -import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest -import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync -import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.hadoop.yarn.util.ConverterUtils -import org.apache.samza.config.MapConfig -import org.apache.samza.config.Config -import org.apache.samza.config.ShellCommandConfig -import org.apache.samza.config.JobConfig.Config2Job -import org.apache.samza.config.YarnConfig -import org.apache.samza.metrics.JmxServer -import org.apache.samza.metrics.MetricsRegistryMap -import org.apache.samza.util.hadoop.HttpFileSystem -import org.apache.samza.util.Logging -import org.apache.samza.serializers.model.SamzaObjectMapper -import org.apache.samza.coordinator.JobModelManager -import org.apache.samza.SamzaException - -/** - * When YARN executes an application master, it needs a bash command to - * execute. For Samza, YARN will execute this main method when starting Samza's - * YARN application master. - * - * <br/><br/> - * - * The main method gets all of the environment variables (passed by Samza's - * YARN client, and YARN itself), and wires up everything to run Samza's - * application master. - */ -object SamzaAppMaster extends Logging with AMRMClientAsync.CallbackHandler { - val DEFAULT_POLL_INTERVAL_MS: Int = 1000 - var listeners: List[YarnAppMasterListener] = null - var storedException: Throwable = null - - def main(args: Array[String]) { - putMDC("containerName", "samza-application-master") - val containerIdStr = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.toString) - info("got container id: %s" format containerIdStr) - val containerId = ConverterUtils.toContainerId(containerIdStr) - val applicationAttemptId = containerId.getApplicationAttemptId - info("got app attempt id: %s" format applicationAttemptId) - val nodeHostString = System.getenv(ApplicationConstants.Environment.NM_HOST.toString) - info("got node manager host: %s" format nodeHostString) - val nodePortString = System.getenv(ApplicationConstants.Environment.NM_PORT.toString) - info("got node manager port: %s" format nodePortString) - val nodeHttpPortString = System.getenv(ApplicationConstants.Environment.NM_HTTP_PORT.toString) - info("got node manager http port: %s" format nodeHttpPortString) - val coordinatorSystemConfig = new MapConfig(SamzaObjectMapper.getObjectMapper.readValue(System.getenv(ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG), classOf[Config])) - info("got coordinator system config: %s" format coordinatorSystemConfig) - val registry = new MetricsRegistryMap - val jobCoordinator = JobModelManager(coordinatorSystemConfig, registry) - val config = jobCoordinator.jobModel.getConfig - val yarnConfig = new YarnConfig(config) - info("got config: %s" format coordinatorSystemConfig) - putMDC("jobName", config.getName.getOrElse(throw new SamzaException("can not find the job name"))) - putMDC("jobId", config.getJobId.getOrElse("1")) - val hConfig = new YarnConfiguration - hConfig.set("fs.http.impl", classOf[HttpFileSystem].getName) - val interval = yarnConfig.getAMPollIntervalMs - val amClient = AMRMClientAsync.createAMRMClientAsync[ContainerRequest](interval, this) - val clientHelper = new ClientHelper(hConfig) - val containerMem = yarnConfig.getContainerMaxMemoryMb - val containerCpu = yarnConfig.getContainerMaxCpuCores - val jmxServer = if (yarnConfig.getJmxServerEnabled) Some(new JmxServer()) else None - val jobContext = new JobContext - Option(yarnConfig.getYarnJobStagingDirectory).map { - jobStagingDirectory => jobContext.setAppStagingDir(new Path(jobStagingDirectory)) - } - - // wire up all of the yarn event listeners - val state = new SamzaAppState(jobCoordinator, -1, containerId, nodeHostString, nodePortString.toInt, nodeHttpPortString.toInt) - try { - if (jmxServer.isDefined) { - state.jmxUrl = jmxServer.get.getJmxUrl - state.jmxTunnelingUrl = jmxServer.get.getTunnelingJmxUrl - } - - val service = new SamzaAppMasterService(config, state, registry, clientHelper, hConfig) - val lifecycle = new SamzaAppMasterLifecycle(containerMem, containerCpu, state, amClient) - val metrics = new SamzaAppMasterMetrics(config, state, registry) - val taskManager = new SamzaTaskManager(config, state, amClient, hConfig) - - listeners = List(service, lifecycle, metrics, taskManager) - run(amClient, listeners, hConfig, interval) - } finally { - if (state.status != FinalApplicationStatus.UNDEFINED) { - YarnJobUtil.cleanupStagingDir(jobContext, FileSystem.get(hConfig)) - } - - // jmxServer has to be stopped or will prevent process from exiting. - if (jmxServer.isDefined) { - jmxServer.get.stop - } - } - } - - def run(amClient: AMRMClientAsync[ContainerRequest], listeners: List[YarnAppMasterListener], hConfig: YarnConfiguration, interval: Int): Unit = { - try { - amClient.init(hConfig) - amClient.start - listeners.foreach(_.onInit) - var isShutdown: Boolean = false - // have the loop to prevent the process from exiting until the job is to shutdown or error occurs on amClient - while (!isShutdown && !listeners.map(_.shouldShutdown).reduceLeft(_ || _) && storedException == null) { - try { - Thread.sleep(interval) - } catch { - case e: InterruptedException => { - isShutdown = true - info("got interrupt in app master thread, so shutting down") - } - } - } - } finally { - // listeners has to be stopped - listeners.foreach(listener => try { - listener.onShutdown - } catch { - case e: Exception => warn("Listener %s failed to shutdown." format listener, e) - }) - - // amClient has to be stopped - amClient.stop - } - } - - override def onContainersCompleted(statuses: java.util.List[ContainerStatus]): Unit = - statuses.foreach(containerStatus => listeners.foreach(_.onContainerCompleted(containerStatus))) - - override def onContainersAllocated(containers: java.util.List[Container]): Unit = - containers.foreach(container => listeners.foreach(_.onContainerAllocated(container))) - - override def onShutdownRequest: Unit = listeners.foreach(_.onReboot) - - override def onNodesUpdated(updatedNodes: java.util.List[NodeReport]): Unit = Unit - - // TODO need to think about meaningful SAMZA's progress - override def getProgress: Float = 0.0F - - override def onError(e: Throwable): Unit = { - error("Error occured in amClient's callback", e) - storedException = e - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala deleted file mode 100644 index 2a5c0d8..0000000 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterLifecycle.scala +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.job.yarn - -import org.apache.hadoop.yarn.api.records.FinalApplicationStatus -import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest -import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync -import org.apache.samza.SamzaException -import org.apache.samza.util.Logging - -/** - * Responsible for managing the lifecycle of the application master. Mostly, - * this means registering and unregistering with the RM, and shutting down - * when the RM tells us to Reboot. - */ -class SamzaAppMasterLifecycle(containerMem: Int, containerCpu: Int, state: SamzaAppState, amClient: AMRMClientAsync[ContainerRequest]) extends YarnAppMasterListener with Logging { - var validResourceRequest = true - var shutdownMessage: String = null - - override def onInit() { - val host = state.nodeHost - val response = amClient.registerApplicationMaster(host, state.rpcUrl.getPort, "%s:%d" format (host, state.trackingUrl.getPort)) - - // validate that the YARN cluster can handle our container resource requirements - val maxCapability = response.getMaximumResourceCapability - val maxMem = maxCapability.getMemory - val maxCpu = maxCapability.getVirtualCores - - info("Got AM register response. The YARN RM supports container requests with max-mem: %s, max-cpu: %s" format (maxMem, maxCpu)) - - if (containerMem > maxMem || containerCpu > maxCpu) { - shutdownMessage = "The YARN cluster is unable to run your job due to unsatisfiable resource requirements. You asked for mem: %s, and cpu: %s." format (containerMem, containerCpu) - error(shutdownMessage) - validResourceRequest = false - state.status = FinalApplicationStatus.FAILED - state.jobHealthy.set(false) - } - } - - override def onReboot() { - throw new SamzaException("Received a reboot signal from the RM, so throwing an exception to reboot the AM.") - } - - override def onShutdown() { - info("Shutting down.") - amClient.unregisterApplicationMaster(state.status, shutdownMessage, null) - } - - override def shouldShutdown = !validResourceRequest -} http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala deleted file mode 100644 index 054d8b6..0000000 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.job.yarn -import org.apache.samza.metrics.MetricsRegistryMap -import org.apache.samza.metrics.JvmMetrics -import org.apache.samza.config.Config -import org.apache.samza.task.TaskContext -import org.apache.samza.Partition -import org.apache.samza.metrics.MetricsRegistry -import org.apache.samza.config.StreamConfig.Config2Stream -import org.apache.samza.config.MetricsConfig.Config2Metrics -import org.apache.samza.metrics.MetricsReporterFactory -import org.apache.samza.util.Util -import org.apache.samza.metrics.ReadableMetricsRegistry -import org.apache.samza.util.Logging -import org.apache.samza.SamzaException -import java.util.Timer -import java.util.TimerTask -import org.apache.samza.metrics.MetricsHelper - -object SamzaAppMasterMetrics { - val sourceName = "ApplicationMaster" -} - -/** - * Responsible for wiring up Samza's metrics. Given that Samza has a metric - * registry, we might as well use it. This class takes Samza's application - * master state, and converts it to metrics. - */ -class SamzaAppMasterMetrics( - val config: Config, - val state: SamzaAppState, - val registry: ReadableMetricsRegistry) extends MetricsHelper with YarnAppMasterListener with Logging { - - val jvm = new JvmMetrics(registry) - val reporters = config.getMetricReporterNames.map(reporterName => { - val metricsFactoryClassName = config - .getMetricsFactoryClass(reporterName) - .getOrElse(throw new SamzaException("Metrics reporter %s missing .class config" format reporterName)) - - val reporter = - Util - .getObj[MetricsReporterFactory](metricsFactoryClassName) - .getMetricsReporter(reporterName, SamzaAppMasterMetrics.sourceName, config) - - reporter.register(SamzaAppMasterMetrics.sourceName, registry) - (reporterName, reporter) - }).toMap - - override def onInit() { - val mRunningContainers = newGauge("running-containers", () => state.runningContainers.size) - val mNeededContainers = newGauge("needed-containers", () => state.neededContainers.get()) - val mCompletedContainers = newGauge("completed-containers", () => state.completedContainers.get()) - val mFailedContainers = newGauge("failed-containers", () => state.failedContainers.get()) - val mReleasedContainers = newGauge("released-containers", () => state.releasedContainers.get()) - val mContainers = newGauge("container-count", () => state.containerCount) - val mHost = newGauge("http-host", () => state.nodeHost) - val mTrackingPort = newGauge("http-port", () => state.trackingUrl.getPort) - val mRpcPort = newGauge("rpc-port", () => state.rpcUrl.getPort) - val mAppAttemptId = newGauge("app-attempt-id", () => state.appAttemptId.toString) - val mJobHealthy = newGauge("job-healthy", () => if (state.jobHealthy.get()) 1 else 0) - val mLocalityMatchedRequests = newGauge( - "locality-matched", - () => { - if (state.containerRequests.get() != 0) { - state.matchedContainerRequests.get() / state.containerRequests.get() - } else { - 0L - } - }) - - jvm.start - reporters.values.foreach(_.start) - } - - override def onShutdown() { - reporters.values.foreach(_.stop) - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala deleted file mode 100644 index 979d81d..0000000 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterService.scala +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.job.yarn - -import org.apache.hadoop.security.UserGroupInformation -import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.samza.coordinator.stream.CoordinatorStreamWriter -import org.apache.samza.coordinator.stream.messages.SetConfig -import org.apache.samza.util.Logging -import org.apache.samza.config.Config -import org.apache.samza.metrics.ReadableMetricsRegistry -import org.apache.samza.SamzaException -import org.apache.samza.coordinator.server.HttpServer -import org.apache.samza.coordinator.server.JobServlet -import org.apache.samza.webapp.ApplicationMasterRestServlet -import org.apache.samza.webapp.ApplicationMasterWebServlet - -/** - * Samza's application master runs a very basic HTTP/JSON service to allow - * dashboards to check on the status of a job. SamzaAppMasterService starts - * up the web service when initialized. - * <p /> - * Besides the HTTP/JSON service endpoints, it also starts an optional - * SecurityManager which takes care of the security needs when running in - * a secure environment. - */ -class SamzaAppMasterService(config: Config, state: SamzaAppState, registry: ReadableMetricsRegistry, clientHelper: ClientHelper, yarnConfiguration: YarnConfiguration) extends YarnAppMasterListener with Logging { - var rpcApp: HttpServer = null - var webApp: HttpServer = null - val SERVER_URL_OPT: String = "samza.autoscaling.server.url" - var securityManager: Option[SamzaAppMasterSecurityManager] = None - - override def onInit() { - // try starting the samza AM dashboard at a random rpc and tracking port - info("Starting webapp at a random rpc and tracking port") - - rpcApp = new HttpServer(resourceBasePath = "scalate") - rpcApp.addServlet("/*", new ApplicationMasterRestServlet(config, state, registry)) - rpcApp.start - - webApp = new HttpServer(resourceBasePath = "scalate") - webApp.addServlet("/*", new ApplicationMasterWebServlet(config, state)) - webApp.start - - state.jobCoordinator.start - state.rpcUrl = rpcApp.getUrl - state.trackingUrl = webApp.getUrl - state.coordinatorUrl = state.jobCoordinator.server.getUrl - - //write server url to coordinator stream - val coordinatorStreamWriter: CoordinatorStreamWriter = new CoordinatorStreamWriter(config) - coordinatorStreamWriter.start() - coordinatorStreamWriter.sendMessage(SetConfig.TYPE, SERVER_URL_OPT, state.coordinatorUrl.toString) - coordinatorStreamWriter.stop() - debug("sent server url message with value: %s " format state.coordinatorUrl.toString) - - info("Webapp is started at (rpc %s, tracking %s, coordinator %s)" format(state.rpcUrl, state.trackingUrl, state.coordinatorUrl)) - - // start YarnSecurityManger for a secure cluster - if (UserGroupInformation.isSecurityEnabled) { - securityManager = Option { - val securityManager = new SamzaAppMasterSecurityManager(config, yarnConfiguration) - securityManager.start - securityManager - } - } - } - - override def onShutdown() { - if (rpcApp != null) { - rpcApp.stop - } - - if (webApp != null) { - webApp.stop - } - - state.jobCoordinator.stop - - securityManager.map { - securityManager => securityManager.stop - } - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterLifecycle.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterLifecycle.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterLifecycle.scala index 2ed9baf..c9c1e18 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterLifecycle.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterLifecycle.scala @@ -33,7 +33,7 @@ import org.apache.samza.util.Logging * when the RM tells us to Reboot. */ //This class is used in the refactored code path as called by run-jc.sh -class SamzaYarnAppMasterLifecycle(containerMem: Int, containerCpu: Int, state: YarnAppState, amClient: AMRMClientAsync[ContainerRequest]) extends Logging { +class SamzaYarnAppMasterLifecycle(containerMem: Int, containerCpu: Int, samzaAppState: SamzaApplicationState, state: YarnAppState, amClient: AMRMClientAsync[ContainerRequest]) extends Logging { var validResourceRequest = true var shutdownMessage: String = null var webApp: SamzaYarnAppMasterService = null @@ -51,8 +51,8 @@ class SamzaYarnAppMasterLifecycle(containerMem: Int, containerCpu: Int, state: Y shutdownMessage = "The YARN cluster is unable to run your job due to unsatisfiable resource requirements. You asked for mem: %s, and cpu: %s." format (containerMem, containerCpu) error(shutdownMessage) validResourceRequest = false - state.samzaAppState.status = SamzaAppStatus.FAILED; - state.samzaAppState.jobHealthy.set(false) + samzaAppState.status = SamzaAppStatus.FAILED; + samzaAppState.jobHealthy.set(false) } } http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala index f62bec1..5f2bfc5 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaYarnAppMasterService.scala @@ -19,42 +19,47 @@ package org.apache.samza.job.yarn +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.samza.clustermanager.SamzaApplicationState import org.apache.samza.config.Config import org.apache.samza.coordinator.server.HttpServer import org.apache.samza.coordinator.stream.CoordinatorStreamWriter import org.apache.samza.coordinator.stream.messages.SetConfig import org.apache.samza.metrics.ReadableMetricsRegistry import org.apache.samza.util.Logging +import org.apache.samza.webapp.{ApplicationMasterWebServlet, ApplicationMasterRestServlet} /** - * Samza's application master runs a very basic HTTP/JSON service to allow - * dashboards to check on the status of a job. SamzaAppMasterService starts - * up the web service when initialized. - */ + * Samza's application master runs a very basic HTTP/JSON service to allow + * dashboards to check on the status of a job. SamzaAppMasterService starts + * up the web service when initialized. + */ //This class is used in the refactored code path as called by run-jc.sh -class SamzaYarnAppMasterService(config: Config, state: YarnAppState, registry: ReadableMetricsRegistry) extends Logging { +class SamzaYarnAppMasterService(config: Config, samzaAppState: SamzaApplicationState, state: YarnAppState, registry: ReadableMetricsRegistry, yarnConfiguration: YarnConfiguration) extends Logging { var rpcApp: HttpServer = null var webApp: HttpServer = null val SERVER_URL_OPT: String = "samza.autoscaling.server.url" + var securityManager: Option[SamzaAppMasterSecurityManager] = None - def onInit() { + def onInit() { // try starting the samza AM dashboard at a random rpc and tracking port info("Starting webapp at a random rpc and tracking port") rpcApp = new HttpServer(resourceBasePath = "scalate") - //TODO: Since the state has changed into Samza specific and Yarn specific states, this UI has to be refactored too. - //rpcApp.addServlet("/*", refactor ApplicationMasterRestServlet(config, state, registry)) + + rpcApp.addServlet("/*", new ApplicationMasterRestServlet(config, samzaAppState, state, registry)) rpcApp.start webApp = new HttpServer(resourceBasePath = "scalate") - //webApp.addServlet("/*", refactor ApplicationMasterWebServlet(config, state)) + webApp.addServlet("/*", new ApplicationMasterWebServlet(config, samzaAppState, state)) webApp.start - state.jobModelManager.start + samzaAppState.jobModelManager.start state.rpcUrl = rpcApp.getUrl state.trackingUrl = webApp.getUrl - state.coordinatorUrl = state.jobModelManager.server.getUrl + state.coordinatorUrl = samzaAppState.jobModelManager.server.getUrl //write server url to coordinator stream val coordinatorStreamWriter: CoordinatorStreamWriter = new CoordinatorStreamWriter(config) @@ -64,9 +69,19 @@ class SamzaYarnAppMasterService(config: Config, state: YarnAppState, registry: R debug("Sent server url message with value: %s " format state.coordinatorUrl.toString) info("Webapp is started at (rpc %s, tracking %s, coordinator %s)" format(state.rpcUrl, state.trackingUrl, state.coordinatorUrl)) + + // start YarnSecurityManger for a secure cluster + if (UserGroupInformation.isSecurityEnabled) { + securityManager = Option { + val securityManager = new SamzaAppMasterSecurityManager(config, yarnConfiguration) + securityManager.start + securityManager + } + } + } - def onShutdown() { + def onShutdown() { if (rpcApp != null) { rpcApp.stop } @@ -75,6 +90,11 @@ class SamzaYarnAppMasterService(config: Config, state: YarnAppState, registry: R webApp.stop } - state.jobModelManager.stop + samzaAppState.jobModelManager.stop + + securityManager.map { + securityManager => securityManager.stop + } + } } http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala index 3ca1f0d..46dc4d1 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala @@ -86,11 +86,11 @@ class YarnJob(config: Config, hadoopConfig: Configuration) extends StreamJob { } logger.info("Inside YarnJob: fwk_path is %s, ver is %s use it directly " format(fwkPath, fwkVersion)) - var cmdExec = "./__package/bin/run-am.sh" // default location + var cmdExec = "./__package/bin/run-jc.sh" // default location if (!fwkPath.isEmpty()) { // if we have framework installed as a separate package - use it - cmdExec = fwkPath + "/" + fwkVersion + "/bin/run-am.sh" + cmdExec = fwkPath + "/" + fwkVersion + "/bin/run-jc.sh" logger.info("Using FWK path: " + "export SAMZA_LOG_DIR=%s && ln -sfn %s logs && exec %s 1>logs/%s 2>logs/%s". format(ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.LOG_DIR_EXPANSION_VAR, cmdExec, http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala index a40ab72..cdd389c 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala @@ -19,17 +19,18 @@ package org.apache.samza.webapp +import org.apache.samza.clustermanager.SamzaApplicationState import org.scalatra._ import scalate.ScalateSupport import org.apache.samza.config.Config -import org.apache.samza.job.yarn.{SamzaAppState, ClientHelper} +import org.apache.samza.job.yarn.{YarnAppState, ClientHelper} import org.apache.samza.metrics._ import scala.collection.JavaConversions._ import org.apache.hadoop.yarn.conf.YarnConfiguration import java.util.HashMap import org.apache.samza.serializers.model.SamzaObjectMapper -class ApplicationMasterRestServlet(config: Config, state: SamzaAppState, registry: ReadableMetricsRegistry) extends ScalatraServlet with ScalateSupport { +class ApplicationMasterRestServlet(config: Config, samzaAppState: SamzaApplicationState, state: YarnAppState, registry: ReadableMetricsRegistry) extends ScalatraServlet with ScalateSupport { val yarnConfig = new YarnConfiguration val client = new ClientHelper(yarnConfig) val jsonMapper = SamzaObjectMapper.getObjectMapper @@ -78,11 +79,11 @@ class ApplicationMasterRestServlet(config: Config, state: SamzaAppState, registr get("/am") { val containers = new HashMap[String, HashMap[String, Object]] - state.runningContainers.foreach { + state.runningYarnContainers.foreach { case (containerId, container) => val yarnContainerId = container.id.toString val containerMap = new HashMap[String, Object] - val taskModels = state.jobCoordinator.jobModel.getContainers.get(containerId).getTasks + val taskModels = samzaAppState.jobModelManager.jobModel.getContainers.get(containerId).getTasks containerMap.put("yarn-address", container.nodeHttpAddress) containerMap.put("start-time", container.startTime.toString) containerMap.put("up-time", container.upTime.toString) http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala index 605332a..a32cd65 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala @@ -19,16 +19,17 @@ package org.apache.samza.webapp +import org.apache.samza.clustermanager.SamzaApplicationState import org.scalatra._ import scalate.ScalateSupport -import org.apache.samza.job.yarn.{SamzaAppState} +import org.apache.samza.job.yarn.YarnAppState import org.apache.samza.config.Config import scala.collection.JavaConversions._ import scala.collection.immutable.TreeMap import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.webapp.util.WebAppUtils -class ApplicationMasterWebServlet(config: Config, state: SamzaAppState) extends ScalatraServlet with ScalateSupport { +class ApplicationMasterWebServlet(config: Config, samzaAppState: SamzaApplicationState, state: YarnAppState) extends ScalatraServlet with ScalateSupport { val yarnConfig = new YarnConfiguration before() { @@ -39,6 +40,7 @@ class ApplicationMasterWebServlet(config: Config, state: SamzaAppState) extends layoutTemplate("/WEB-INF/views/index.scaml", "config" -> TreeMap(config.sanitize.toMap.toArray: _*), "state" -> state, + "samzaAppState" -> samzaAppState, "rmHttpAddress" -> WebAppUtils.getRMWebAppURLWithScheme(yarnConfig)) } } http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java deleted file mode 100644 index e21aded..0000000 --- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.job.yarn; - -import java.util.HashMap; -import java.util.Map; -import org.apache.hadoop.yarn.client.api.AMRMClient; -import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; -import org.apache.hadoop.yarn.util.ConverterUtils; -import org.apache.samza.config.Config; -import org.apache.samza.config.MapConfig; -import org.apache.samza.job.yarn.util.MockContainerRequestState; -import org.apache.samza.job.yarn.util.TestUtil; -import org.junit.Test; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -public class TestContainerAllocator extends TestContainerAllocatorCommon { - - private final Config config = new MapConfig(new HashMap<String, String>() { - { - put("yarn.container.count", "1"); - put("systems.test-system.samza.factory", "org.apache.samza.job.yarn.MockSystemFactory"); - put("yarn.container.memory.mb", "512"); - put("yarn.package.path", "/foo"); - put("task.inputs", "test-system.test-stream"); - put("systems.test-system.samza.key.serde", "org.apache.samza.serializers.JsonSerde"); - put("systems.test-system.samza.msg.serde", "org.apache.samza.serializers.JsonSerde"); - put("yarn.container.retry.count", "1"); - put("yarn.container.retry.window.ms", "1999999999"); - put("yarn.container.request.timeout.ms", "3"); - put("yarn.allocator.sleep.ms", "1"); - } - }); - - @Override - protected Config getConfig() { - return config; - } - - @Override - protected MockContainerRequestState createContainerRequestState( - AMRMClientAsync<AMRMClient.ContainerRequest> amClient) { - return new MockContainerRequestState(amClient, false); - } - - /** - * Test request containers with no containerToHostMapping makes the right number of requests - */ - @Test - public void testRequestContainersWithNoMapping() throws Exception { - int containerCount = 4; - Map<Integer, String> containersToHostMapping = new HashMap<Integer, String>(); - for (int i = 0; i < containerCount; i++) { - containersToHostMapping.put(i, null); - } - allocatorThread.start(); - - containerAllocator.requestContainers(containersToHostMapping); - - assertNotNull(requestState); - - assertNotNull(requestState.getRequestsQueue()); - assertTrue(requestState.getRequestsQueue().size() == 4); - - // If host-affinty is not enabled, it doesn't update the requestMap - assertNotNull(requestState.getRequestsToCountMap()); - assertTrue(requestState.getRequestsToCountMap().keySet().size() == 0); - } - /** - * Adds all containers returned to ANY_HOST only - */ - @Test - public void testAddContainer() throws Exception { - assertNull(requestState.getContainersOnAHost("host1")); - assertNull(requestState.getContainersOnAHost(ANY_HOST)); - - containerAllocator.addContainer(TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "host1", 123)); - containerAllocator.addContainer(TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "host3", 123)); - - assertNull(requestState.getContainersOnAHost("host1")); - assertNotNull(requestState.getContainersOnAHost(ANY_HOST)); - assertTrue(requestState.getContainersOnAHost(ANY_HOST).size() == 2); - } - - /** - * Test requestContainers - */ - @Test - public void testRequestContainers() throws Exception { - Map<Integer, String> containersToHostMapping = new HashMap<Integer, String>() { - { - put(0, "host1"); - put(1, "host2"); - put(2, null); - put(3, "host1"); - } - }; - - allocatorThread.start(); - - containerAllocator.requestContainers(containersToHostMapping); - - assertNotNull(testAMRMClient.requests); - assertEquals(4, testAMRMClient.requests.size()); - - assertNotNull(requestState); - - assertNotNull(requestState.getRequestsQueue()); - assertTrue(requestState.getRequestsQueue().size() == 4); - - // If host-affinty is not enabled, it doesn't update the requestMap - assertNotNull(requestState.getRequestsToCountMap()); - assertTrue(requestState.getRequestsToCountMap().keySet().size() == 0); - } - -} http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocatorCommon.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocatorCommon.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocatorCommon.java deleted file mode 100644 index 0bbd48d..0000000 --- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocatorCommon.java +++ /dev/null @@ -1,225 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.job.yarn; - -import java.io.IOException; -import java.lang.reflect.Field; -import java.net.URL; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.client.api.AMRMClient; -import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; -import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl; -import org.apache.hadoop.yarn.util.ConverterUtils; -import org.apache.samza.config.Config; -import org.apache.samza.config.YarnConfig; -import org.apache.samza.container.TaskName; -import org.apache.samza.coordinator.JobModelManager; -import org.apache.samza.coordinator.server.HttpServer; -import org.apache.samza.job.model.ContainerModel; -import org.apache.samza.job.model.JobModel; -import org.apache.samza.job.model.TaskModel; -import org.apache.samza.job.yarn.util.MockContainerListener; -import org.apache.samza.job.yarn.util.MockContainerRequestState; -import org.apache.samza.job.yarn.util.MockContainerUtil; -import org.apache.samza.job.yarn.util.MockHttpServer; -import org.apache.samza.job.yarn.util.TestAMRMClientImpl; -import org.apache.samza.job.yarn.util.TestUtil; -import org.eclipse.jetty.servlet.DefaultServlet; -import org.eclipse.jetty.servlet.ServletHolder; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - - -/** - * Handles all common fields/tests for ContainerAllocators. - */ -public abstract class TestContainerAllocatorCommon { - protected static final String ANY_HOST = ContainerRequestState.ANY_HOST; - - protected final HttpServer server = new MockHttpServer("/", 7777, null, new ServletHolder(DefaultServlet.class)); - - protected AMRMClientAsyncImpl amRmClientAsync; - protected TestAMRMClientImpl testAMRMClient; - protected MockContainerRequestState requestState; - protected AbstractContainerAllocator containerAllocator; - protected Thread allocatorThread; - protected ContainerUtil containerUtil; - - protected SamzaAppState state = new SamzaAppState(getCoordinator(1), -1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2); - - protected abstract Config getConfig(); - protected abstract MockContainerRequestState createContainerRequestState(AMRMClientAsync<AMRMClient.ContainerRequest> amClient); - - private JobModelManager getCoordinator(int containerCount) { - Map<Integer, ContainerModel> containers = new java.util.HashMap<>(); - for (int i = 0; i < containerCount; i++) { - ContainerModel container = new ContainerModel(i, new HashMap<TaskName, TaskModel>()); - containers.put(i, container); - } - JobModel jobModel = new JobModel(getConfig(), containers); - return new JobModelManager(jobModel, server, null); - } - - - @Before - public void setup() throws Exception { - // Create AMRMClient - testAMRMClient = new TestAMRMClientImpl( - TestUtil.getAppMasterResponse( - false, - new ArrayList<Container>(), - new ArrayList<ContainerStatus>() - )); - amRmClientAsync = TestUtil.getAMClient(testAMRMClient); - - // Initialize certain state variables - state.coordinatorUrl = new URL("http://localhost:7778/"); - - containerUtil = TestUtil.getContainerUtil(getConfig(), state); - - requestState = createContainerRequestState(amRmClientAsync); - containerAllocator = new HostAwareContainerAllocator( - amRmClientAsync, - containerUtil, - new YarnConfig(getConfig()) - ); - Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("containerRequestState"); - requestStateField.setAccessible(true); - requestStateField.set(containerAllocator, requestState); - - allocatorThread = new Thread(containerAllocator); - } - - @After - public void teardown() throws Exception { - containerAllocator.setIsRunning(false); - allocatorThread.join(); - } - - /** - * If the container fails to start e.g because it fails to connect to a NM on a host that - * is down, the allocator should request a new container on a different host. - */ - @Test - public void testRerequestOnAnyHostIfContainerStartFails() throws Exception { - final Container container = TestUtil - .getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "host2", 123); - final Container container1 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "host1", 123); - - ((MockContainerUtil) containerUtil).containerStartException = new IOException("You shall not... connect to the NM!"); - - Runnable releasedContainerAssertions = new Runnable() { - @Override - public void run() { - // The failed container should be released. The successful one should not. - assertNotNull(testAMRMClient.getRelease()); - assertEquals(1, testAMRMClient.getRelease().size()); - assertTrue(testAMRMClient.getRelease().contains(container.getId())); - } - }; - - Runnable assignedContainerAssertions = new Runnable() { - @Override - public void run() { - // Test that the first request assignment had a preferred host and the retry didn't - assertEquals(2, requestState.assignedRequests.size()); - - SamzaContainerRequest request = requestState.assignedRequests.remove(); - assertEquals(0, request.expectedContainerId); - assertEquals("host2", request.getPreferredHost()); - - request = requestState.assignedRequests.remove(); - assertEquals(0, request.expectedContainerId); - assertEquals("ANY_HOST", request.getPreferredHost()); - - // This routine should be called after the retry is assigned, but before it's started. - // So there should still be 1 container needed because neededContainers should not be decremented for a failed start. - assertEquals(1, state.neededContainers.get()); - } - }; - - // Set up our final asserts before starting the allocator thread - MockContainerListener listener = new MockContainerListener(2, 1, 2, 0, null, releasedContainerAssertions, assignedContainerAssertions, null); - requestState.registerContainerListener(listener); - state.neededContainers.set(1); // Normally this would be done in the SamzaTaskManager - - // Only request 1 container and we should see 2 assignments in the assertions above (because of the retry) - containerAllocator.requestContainer(0, "host2"); - containerAllocator.addContainer(container); - containerAllocator.addContainer(container1); - - allocatorThread.start(); - - listener.verify(); - } - - - /** - * Extra allocated containers that are returned by the RM and unused by the AM should be released. - * Containers are considered "extra" only when there are no more pending requests to fulfill - * @throws Exception - */ - @Test - public void testAllocatorReleasesExtraContainers() throws Exception { - final Container container = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "host1", 123); - final Container container1 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "host1", 123); - final Container container2 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "host2", 123); - - Runnable releasedContainerAssertions = new Runnable() { - @Override - public void run() { - assertNotNull(testAMRMClient.getRelease()); - assertEquals(2, testAMRMClient.getRelease().size()); - assertTrue(testAMRMClient.getRelease().contains(container1.getId())); - assertTrue(testAMRMClient.getRelease().contains(container2.getId())); - - // Test that state is cleaned up - assertEquals(0, requestState.getRequestsQueue().size()); - assertEquals(0, requestState.getRequestsToCountMap().size()); - assertNull(requestState.getContainersOnAHost("host1")); - assertNull(requestState.getContainersOnAHost("host2")); - } - }; - - // Set up our final asserts before starting the allocator thread - MockContainerListener listener = new MockContainerListener(3, 2, 0, 0, null, releasedContainerAssertions, null, null); - requestState.registerContainerListener(listener); - - containerAllocator.requestContainer(0, "host1"); - - containerAllocator.addContainer(container); - containerAllocator.addContainer(container1); - containerAllocator.addContainer(container2); - - allocatorThread.start(); - - listener.verify(); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerRequestState.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerRequestState.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerRequestState.java deleted file mode 100644 index 402fe78..0000000 --- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerRequestState.java +++ /dev/null @@ -1,221 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.job.yarn; - -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.client.api.AMRMClient; -import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl; -import org.apache.hadoop.yarn.util.ConverterUtils; -import org.apache.samza.job.yarn.util.TestAMRMClientImpl; -import org.apache.samza.job.yarn.util.TestUtil; -import org.junit.Before; -import org.junit.Test; - -import java.util.ArrayList; - -import static org.junit.Assert.*; - -public class TestContainerRequestState { - private AMRMClientAsyncImpl amRmClientAsync; - private TestAMRMClientImpl testAMRMClient; - private static final String ANY_HOST = ContainerRequestState.ANY_HOST; - - @Before - public void setup() { - // Create AMRMClient - testAMRMClient = new TestAMRMClientImpl( - TestUtil.getAppMasterResponse( - false, - new ArrayList<Container>(), - new ArrayList<ContainerStatus>() - )); - amRmClientAsync = TestUtil.getAMClient(testAMRMClient); - } - - /** - * Test state after a request is submitted - */ - @Test - public void testUpdateRequestState() { - // Host-affinity is enabled - ContainerRequestState state = new ContainerRequestState(amRmClientAsync, true); - SamzaContainerRequest request = new SamzaContainerRequest(0, "abc"); - state.updateRequestState(request); - - assertNotNull(testAMRMClient.requests); - assertEquals(1, testAMRMClient.requests.size()); - assertEquals(request.getIssuedRequest(), testAMRMClient.requests.get(0)); - - assertNotNull(state.getRequestsQueue()); - assertTrue(state.getRequestsQueue().size() == 1); - - assertNotNull(state.getRequestsToCountMap()); - assertNotNull(state.getRequestsToCountMap().get("abc")); - assertEquals(1, state.getRequestsToCountMap().get("abc").get()); - - assertNotNull(state.getContainersOnAHost("abc")); - assertEquals(0, state.getContainersOnAHost("abc").size()); - - // Host-affinity is not enabled - ContainerRequestState state1 = new ContainerRequestState(amRmClientAsync, false); - SamzaContainerRequest request1 = new SamzaContainerRequest(1, null); - state1.updateRequestState(request1); - - assertNotNull(testAMRMClient.requests); - assertEquals(2, testAMRMClient.requests.size()); - - AMRMClient.ContainerRequest expectedContainerRequest = request1.getIssuedRequest(); - AMRMClient.ContainerRequest actualContainerRequest = testAMRMClient.requests.get(1); - - assertEquals(expectedContainerRequest.getCapability(), actualContainerRequest.getCapability()); - assertEquals(expectedContainerRequest.getPriority(), actualContainerRequest.getPriority()); - assertNull(actualContainerRequest.getNodes()); - - assertNotNull(state1.getRequestsQueue()); - assertTrue(state1.getRequestsQueue().size() == 1); - - assertNotNull(state1.getRequestsToCountMap()); - assertNull(state1.getRequestsToCountMap().get(ANY_HOST)); - - } - - /** - * Test addContainer() updates the state correctly - */ - @Test - public void testAddContainer() { - // Add container to ANY_LIST when host-affinity is not enabled - ContainerRequestState state = new ContainerRequestState(amRmClientAsync, false); - Container container = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "abc", 123); - state.addContainer(container); - - assertNotNull(state.getRequestsQueue()); - assertNotNull(state.getRequestsToCountMap()); - assertNotNull(state.getContainersOnAHost(ANY_HOST)); - - assertEquals(1, state.getContainersOnAHost(ANY_HOST).size()); - assertEquals(container, state.getContainersOnAHost(ANY_HOST).get(0)); - - // Container Allocated when there is no request in queue - ContainerRequestState state1 = new ContainerRequestState(amRmClientAsync, true); - Container container1 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "zzz", 123); - state1.addContainer(container1); - - assertNotNull(state1.getRequestsQueue()); - assertEquals(0, state1.getRequestsQueue().size()); - - assertNull(state1.getContainersOnAHost("zzz")); - assertNotNull(state1.getContainersOnAHost(ANY_HOST)); - assertEquals(1, state1.getContainersOnAHost(ANY_HOST).size()); - assertEquals(container1, state1.getContainersOnAHost(ANY_HOST).get(0)); - - // Container Allocated on a Requested Host - state1.updateRequestState(new SamzaContainerRequest(0, "abc")); - - assertNotNull(state1.getRequestsQueue()); - assertEquals(1, state1.getRequestsQueue().size()); - - assertNotNull(state1.getRequestsToCountMap()); - assertNotNull(state1.getRequestsToCountMap().get("abc")); - assertEquals(1, state1.getRequestsToCountMap().get("abc").get()); - - state1.addContainer(container); - - assertNotNull(state1.getContainersOnAHost("abc")); - assertEquals(1, state1.getContainersOnAHost("abc").size()); - assertEquals(container, state1.getContainersOnAHost("abc").get(0)); - - // Container Allocated on host that was not requested - Container container2 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000004"), "xyz", 123); - state1.addContainer(container2); - - assertNull(state1.getContainersOnAHost("xyz")); - assertNotNull(state1.getContainersOnAHost(ANY_HOST)); - assertEquals(2, state1.getContainersOnAHost(ANY_HOST).size()); - assertEquals(container2, state1.getContainersOnAHost(ANY_HOST).get(1)); - - // Extra containers were allocated on a host that was requested - Container container3 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000005"), "abc", 123); - state1.addContainer(container3); - - assertEquals(3, state1.getContainersOnAHost(ANY_HOST).size()); - assertEquals(container3, state1.getContainersOnAHost(ANY_HOST).get(2)); - } - - /** - * Test request state after container is assigned to a host - * * Assigned on requested host - * * Assigned on any host - */ - @Test - public void testContainerAssignment() throws Exception { - // Host-affinity enabled - ContainerRequestState state = new ContainerRequestState(amRmClientAsync, true); - SamzaContainerRequest request = new SamzaContainerRequest(0, "abc"); - SamzaContainerRequest request1 = new SamzaContainerRequest(0, "def"); - - state.updateRequestState(request); - state.updateRequestState(request1); - - Container container = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "abc", 123); - Container container1 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "zzz", 123); - state.addContainer(container); - state.addContainer(container1); - - assertEquals(2, state.getRequestsQueue().size()); - assertEquals(2, state.getRequestsToCountMap().size()); - - assertNotNull(state.getContainersOnAHost("abc")); - assertEquals(1, state.getContainersOnAHost("abc").size()); - assertEquals(container, state.getContainersOnAHost("abc").get(0)); - - assertNotNull(state.getContainersOnAHost("def")); - assertEquals(0, state.getContainersOnAHost("def").size()); - - assertNotNull(state.getContainersOnAHost(ANY_HOST)); - assertEquals(1, state.getContainersOnAHost(ANY_HOST).size()); - assertEquals(container1, state.getContainersOnAHost(ANY_HOST).get(0)); - - // Container assigned on the requested host - state.updateStateAfterAssignment(request, "abc", container); - - assertEquals(1, state.getRequestsQueue().size()); - assertEquals(request1, state.getRequestsQueue().peek()); - - assertNotNull(state.getRequestsToCountMap().get("abc")); - assertEquals(0, state.getRequestsToCountMap().get("abc").get()); - - assertNotNull(state.getContainersOnAHost("abc")); - assertEquals(0, state.getContainersOnAHost("abc").size()); - - // Container assigned on any host - state.updateStateAfterAssignment(request1, ANY_HOST, container1); - - assertEquals(0, state.getRequestsQueue().size()); - - assertNotNull(state.getRequestsToCountMap().get("def")); - assertEquals(0, state.getRequestsToCountMap().get("def").get()); - - assertNotNull(state.getContainersOnAHost(ANY_HOST)); - assertEquals(0, state.getContainersOnAHost(ANY_HOST).size()); - - } - -} http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java deleted file mode 100644 index ead7200..0000000 --- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java +++ /dev/null @@ -1,253 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.job.yarn; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.client.api.AMRMClient; -import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; -import org.apache.hadoop.yarn.util.ConverterUtils; -import org.apache.samza.config.Config; -import org.apache.samza.config.MapConfig; -import org.apache.samza.job.yarn.util.MockContainerListener; -import org.apache.samza.job.yarn.util.MockContainerRequestState; -import org.apache.samza.job.yarn.util.MockContainerUtil; -import org.apache.samza.job.yarn.util.TestUtil; -import org.junit.Test; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -public class TestHostAwareContainerAllocator extends TestContainerAllocatorCommon { - - private final Config config = new MapConfig(new HashMap<String, String>() { - { - put("yarn.container.count", "1"); - put("systems.test-system.samza.factory", "org.apache.samza.job.yarn.MockSystemFactory"); - put("yarn.container.memory.mb", "512"); - put("yarn.package.path", "/foo"); - put("task.inputs", "test-system.test-stream"); - put("systems.test-system.samza.key.serde", "org.apache.samza.serializers.JsonSerde"); - put("systems.test-system.samza.msg.serde", "org.apache.samza.serializers.JsonSerde"); - put("yarn.container.retry.count", "1"); - put("yarn.container.retry.window.ms", "1999999999"); - put("yarn.samza.host-affinity.enabled", "true"); - put("yarn.container.request.timeout.ms", "3"); - put("yarn.allocator.sleep.ms", "1"); - } - }); - - @Override - protected Config getConfig() { - return config; - } - - @Override - protected MockContainerRequestState createContainerRequestState(AMRMClientAsync<AMRMClient.ContainerRequest> amClient) { - return new MockContainerRequestState(amClient, true); - } - - /** - * Test request containers with no containerToHostMapping makes the right number of requests - */ - @Test - public void testRequestContainersWithNoMapping() throws Exception { - int containerCount = 4; - Map<Integer, String> containersToHostMapping = new HashMap<Integer, String>(); - for (int i = 0; i < containerCount; i++) { - containersToHostMapping.put(i, null); - } - - allocatorThread.start(); - - containerAllocator.requestContainers(containersToHostMapping); - - assertNotNull(requestState); - - assertNotNull(requestState.getRequestsQueue()); - assertEquals(4, requestState.getRequestsQueue().size()); - - assertNotNull(requestState.getRequestsToCountMap()); - assertEquals(1, requestState.getRequestsToCountMap().keySet().size()); - assertTrue(requestState.getRequestsToCountMap().keySet().contains(ANY_HOST)); - } - - /** - * Add containers to the correct host in the request state - */ - @Test - public void testAddContainerWithHostAffinity() throws Exception { - containerAllocator.requestContainers(new HashMap<Integer, String>() { - { - put(0, "host1"); - put(1, "host3"); - } - }); - - assertNotNull(requestState.getContainersOnAHost("host1")); - assertEquals(0, requestState.getContainersOnAHost("host1").size()); - - assertNotNull(requestState.getContainersOnAHost("host3")); - assertEquals(0, requestState.getContainersOnAHost("host3").size()); - - assertNull(requestState.getContainersOnAHost(ANY_HOST)); - - containerAllocator.addContainer(TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "host1", 123)); - containerAllocator.addContainer(TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000004"), "host2", 123)); - containerAllocator.addContainer(TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "host3", 123)); - - assertNotNull(requestState.getContainersOnAHost("host1")); - assertEquals(1, requestState.getContainersOnAHost("host1").size()); - - assertNotNull(requestState.getContainersOnAHost("host3")); - assertEquals(1, requestState.getContainersOnAHost("host3").size()); - - assertNotNull(requestState.getContainersOnAHost(ANY_HOST)); - assertTrue(requestState.getContainersOnAHost(ANY_HOST).size() == 1); - assertEquals(ConverterUtils.toContainerId("container_1350670447861_0003_01_000004"), - requestState.getContainersOnAHost(ANY_HOST).get(0).getId()); - } - - - @Test - public void testRequestContainers() throws Exception { - Map<Integer, String> containersToHostMapping = new HashMap<Integer, String>() { - { - put(0, "host1"); - put(1, "host2"); - put(2, null); - put(3, "host1"); - } - }; - allocatorThread.start(); - - containerAllocator.requestContainers(containersToHostMapping); - - assertNotNull(testAMRMClient.requests); - assertEquals(4, testAMRMClient.requests.size()); - - assertNotNull(requestState); - - assertNotNull(requestState.getRequestsQueue()); - assertTrue(requestState.getRequestsQueue().size() == 4); - - assertNotNull(requestState.getRequestsToCountMap()); - Map<String, AtomicInteger> requestsMap = requestState.getRequestsToCountMap(); - - assertNotNull(requestsMap.get("host1")); - assertEquals(2, requestsMap.get("host1").get()); - - assertNotNull(requestsMap.get("host2")); - assertEquals(1, requestsMap.get("host2").get()); - - assertNotNull(requestsMap.get(ANY_HOST)); - assertEquals(1, requestsMap.get(ANY_HOST).get()); - } - - /** - * Handles expired requests correctly and assigns ANY_HOST - */ - @Test - public void testExpiredRequestHandling() throws Exception { - Map<Integer, String> containersToHostMapping = new HashMap<Integer, String>() { - { - put(0, "requestedHost1"); - put(1, "requestedHost2"); - } - }; - containerAllocator.requestContainers(containersToHostMapping); - - assertNotNull(requestState.getRequestsQueue()); - assertTrue(requestState.getRequestsQueue().size() == 2); - - assertNotNull(requestState.getRequestsToCountMap()); - assertNotNull(requestState.getRequestsToCountMap().get("requestedHost1")); - assertTrue(requestState.getRequestsToCountMap().get("requestedHost1").get() == 1); - - assertNotNull(requestState.getRequestsToCountMap().get("requestedHost2")); - assertTrue(requestState.getRequestsToCountMap().get("requestedHost2").get() == 1); - - final Container container0 = TestUtil - .getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "availableHost1", 123); - final Container container1 = TestUtil - .getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "availableHost2", 123); - - Runnable addedContainerAssertions = new Runnable() { - @Override - public void run() { - assertNotNull(requestState.getRequestsToCountMap()); - assertNull(requestState.getContainersOnAHost("availableHost1")); - assertNull(requestState.getContainersOnAHost("availableHost2")); - assertNotNull(requestState.getContainersOnAHost(ANY_HOST)); - assertTrue(requestState.getContainersOnAHost(ANY_HOST).size() == 2); - } - }; - - Runnable assignedContainerAssertions = new Runnable() { - @Override - public void run() { - List<Container> anyHostContainers = requestState.getContainersOnAHost(ANY_HOST); - assertTrue(anyHostContainers == null || anyHostContainers.isEmpty()); - - assertNotNull(requestState.getRequestsQueue()); - assertTrue(requestState.getRequestsQueue().size() == 0); - - assertNotNull(requestState.getRequestsToCountMap()); - assertNotNull(requestState.getRequestsToCountMap().get("requestedHost1")); - assertNotNull(requestState.getRequestsToCountMap().get("requestedHost2")); - } - }; - - Runnable runningContainerAssertions = new Runnable() { - @Override - public void run() { - MockContainerUtil mockContainerUtil = (MockContainerUtil) containerUtil; - - assertNotNull(mockContainerUtil.runningContainerList.get("availableHost1")); - assertTrue(mockContainerUtil.runningContainerList.get("availableHost1").contains(container0)); - - assertNotNull(mockContainerUtil.runningContainerList.get("availableHost2")); - assertTrue(mockContainerUtil.runningContainerList.get("availableHost2").contains(container1)); - } - }; - // Set up our final asserts before starting the allocator thread - MockContainerListener listener = new MockContainerListener( - 2, 0, 2, 2, - addedContainerAssertions, - null, - assignedContainerAssertions, - runningContainerAssertions); - requestState.registerContainerListener(listener); - ((MockContainerUtil) containerUtil).registerContainerListener(listener); - - containerAllocator.addContainer(container0); - containerAllocator.addContainer(container1); - - // Start after adding containers to avoid a race condition between the allocator thread - // using the containers and the assertions after the containers are added. - allocatorThread.start(); - - listener.verify(); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/9396ee5c/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaContainerRequest.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaContainerRequest.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaContainerRequest.java deleted file mode 100644 index ad0f4d3..0000000 --- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaContainerRequest.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.job.yarn; - -import org.junit.Test; - -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -public class TestSamzaContainerRequest { - private static final String ANY_HOST = ContainerRequestState.ANY_HOST; - - @Test - public void testPreferredHostIsNeverNull() { - SamzaContainerRequest request = new SamzaContainerRequest(0, null); - - assertNotNull(request.getPreferredHost()); - - // preferredHost is null, it should automatically default to ANY_HOST - assertTrue(request.getPreferredHost().equals(ANY_HOST)); - - SamzaContainerRequest request1 = new SamzaContainerRequest(1, "abc"); - assertNotNull(request1.getPreferredHost()); - assertTrue(request1.getPreferredHost().equals("abc")); - } - - @Test - public void testAnyHostIsNotPassedToYarnRequest() { - SamzaContainerRequest request = new SamzaContainerRequest(0, null); - assertNull(request.getIssuedRequest().getNodes()); - - SamzaContainerRequest request1 = new SamzaContainerRequest(1, ANY_HOST); - assertNull(request1.getIssuedRequest().getNodes()); - } -}
