http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/ContainerLaunchContext.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/ContainerLaunchContext.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/ContainerLaunchContext.scala new file mode 100644 index 0000000..7e5d9ec --- /dev/null +++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/ContainerLaunchContext.scala @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.experiments.yarn.glue + +import java.io.File +import java.nio.ByteBuffer +import scala.collection.JavaConverters._ + +import org.apache.hadoop.fs.{FileSystem => YarnFileSystem, Path} +import org.apache.hadoop.io.DataOutputBuffer +import org.apache.hadoop.mapreduce.security.TokenCache +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment +import org.apache.hadoop.yarn.api.records._ +import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.hadoop.yarn.util.ConverterUtils +import org.slf4j.Logger + +import org.apache.gearpump.util.LogUtil + +private[glue] +object ContainerLaunchContext { + private val LOG: Logger = LogUtil.getLogger(getClass) + + def apply(yarnConf: YarnConfiguration, command: String, packagePath: String, configPath: String) + : ContainerLaunchContext = { + val context = Records.newRecord(classOf[ContainerLaunchContext]) + context.setCommands(Seq(command).asJava) + context.setEnvironment(getAppEnv(yarnConf).asJava) + context.setTokens(getToken(yarnConf, packagePath, configPath)) + context.setLocalResources(getAMLocalResourcesMap(yarnConf, packagePath, configPath).asJava) + context + } + + private def getFs(yarnConf: YarnConfiguration) = YarnFileSystem.get(yarnConf) + + private def getAppEnv(yarnConf: YarnConfiguration): Map[String, String] = { + val classPaths = yarnConf.getStrings( + YarnConfiguration.YARN_APPLICATION_CLASSPATH, + YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH.mkString(File.pathSeparator)) + val allPaths = Option(classPaths).getOrElse(Array("")) + + allPaths :+ Environment.PWD.$() + File.separator + "*" + File.pathSeparator + + Map(Environment.CLASSPATH.name -> allPaths.map(_.trim).mkString(File.pathSeparator)) + } + + private def getAMLocalResourcesMap( + yarnConf: YarnConfiguration, packagePath: String, configPath: String) + : Map[String, LocalResource] = { + val fs = getFs(yarnConf) + + Map( + "pack" -> newYarnAppResource(fs, new Path(packagePath), + LocalResourceType.ARCHIVE, LocalResourceVisibility.APPLICATION), + "conf" -> newYarnAppResource(fs, new Path(configPath), + LocalResourceType.FILE, LocalResourceVisibility.APPLICATION)) + } + + private def newYarnAppResource( + fs: YarnFileSystem, path: Path, + resourceType: LocalResourceType, vis: LocalResourceVisibility): LocalResource = { + val qualified = fs.makeQualified(path) + val status = fs.getFileStatus(qualified) + val resource = Records.newRecord(classOf[LocalResource]) + resource.setType(resourceType) + resource.setVisibility(vis) + resource.setResource(ConverterUtils.getYarnUrlFromPath(qualified)) + resource.setTimestamp(status.getModificationTime) + resource.setSize(status.getLen) + resource + } + + private def getToken(yc: YarnConfiguration, packagePath: String, configPath: String) + : ByteBuffer = { + val credentials = UserGroupInformation.getCurrentUser.getCredentials + val dob = new DataOutputBuffer + val dirs = Array(new Path(packagePath), new Path(configPath)) + TokenCache.obtainTokensForNamenodes(credentials, dirs, yc) + credentials.writeTokenStorageToStream(dob) + ByteBuffer.wrap(dob.getData) + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/FileSystem.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/FileSystem.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/FileSystem.scala new file mode 100644 index 0000000..dcb53e9 --- /dev/null +++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/FileSystem.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.experiments.yarn.glue + +import java.io.{InputStream, OutputStream} +import java.net.ConnectException + +import org.apache.gearpump.util.LogUtil +import org.apache.hadoop.fs.Path + +import scala.util.{Failure, Success, Try} + +class FileSystem(yarnConfig: YarnConfig) { + + private val conf = yarnConfig.conf + private val fs = org.apache.hadoop.fs.FileSystem.get(conf) + + private def LOG = LogUtil.getLogger(getClass) + + def open(file: String): InputStream = exceptionHandler { + val path = new Path(file) + fs.open(path) + } + + def create(file: String): OutputStream = exceptionHandler { + val path = new Path(file) + fs.create(path) + } + + def exists(file: String): Boolean = exceptionHandler { + val path = new Path(file) + fs.exists(path) + } + + def name: String = { + fs.getUri.toString + } + + def getHomeDirectory: String = { + fs.getHomeDirectory.toString + } + + private def exceptionHandler[T](call: => T): T = { + val callTry = Try(call) + callTry match { + case Success(v) => v + case Failure(ex) => + if (ex.isInstanceOf[ConnectException]) { + LOG.error("Please check whether we connect to the right HDFS file system, " + + "current file system is $name." + "\n. Please copy all configs under " + + "$HADOOP_HOME/etc/hadoop into conf/yarnconf directory of Gearpump package, " + + "so that we can use the right File system.", ex) + } + throw ex + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/NMClient.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/NMClient.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/NMClient.scala new file mode 100644 index 0000000..59f3832 --- /dev/null +++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/NMClient.scala @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.experiments.yarn.glue + +import java.nio.ByteBuffer + +import akka.actor.ActorRef +import com.typesafe.config.Config +import org.apache.gearpump.experiments.yarn.appmaster.YarnAppMaster.ContainerStarted +import org.apache.gearpump.experiments.yarn.glue.Records._ +import org.apache.gearpump.util.LogUtil +import org.apache.hadoop.yarn.api.records.{ApplicationId => YarnApplicationId, ApplicationReport => YarnApplicationReport, Container => YarnContainer, ContainerId => YarnContainerId, ContainerStatus => YarnContainerStatus, NodeId => YarnNodeId, Resource => YarnResource} +import org.apache.hadoop.yarn.client.api.async.NMClientAsync +import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl +/** + * Adapter for node manager client + */ +class NMClient(yarnConf: YarnConfig, config: Config) extends NMClientAsync.CallbackHandler { + + private val LOG = LogUtil.getLogger(getClass) + + private var reportTo: ActorRef = null + private var client: NMClientAsyncImpl = null + + def start(reportTo: ActorRef): Unit = { + LOG.info("Starting Node Manager Client NMClient...") + this.reportTo = reportTo + client = new NMClientAsyncImpl(this) + client.init(yarnConf.conf) + client.start() + } + + private[glue] + override def onContainerStarted( + containerId: YarnContainerId, allServiceResponse: java.util.Map[String, ByteBuffer]) { + LOG.info(s"Container started : $containerId, " + allServiceResponse) + reportTo ! ContainerStarted(containerId) + } + + private[glue] + override def onContainerStatusReceived( + containerId: YarnContainerId, containerStatus: YarnContainerStatus) { + LOG.info(s"Container status received : $containerId, status $containerStatus") + } + + private[glue] + override def onContainerStopped(containerId: YarnContainerId) { + LOG.error(s"Container stopped : $containerId") + } + + private[glue] + override def onGetContainerStatusError(containerId: YarnContainerId, throwable: Throwable) { + LOG.error(s"Container exception : $containerId", throwable) + } + + private[glue] + override def onStartContainerError(containerId: YarnContainerId, throwable: Throwable) { + LOG.error(s"Container exception : $containerId", throwable) + } + + private[glue] + override def onStopContainerError(containerId: YarnContainerId, throwable: Throwable) { + LOG.error(s"Container exception : $containerId", throwable) + } + + def launchCommand( + container: Container, command: String, packagePath: String, configPath: String): Unit = { + LOG.info(s"Launching command : $command on container" + + s": ${container.getId}, host ip : ${container.getNodeId.getHost}") + val context = ContainerLaunchContext(yarnConf.conf, command, packagePath, configPath) + client.startContainerAsync(container, context) + } + + def stopContainer(containerId: ContainerId, nodeId: NodeId): Unit = { + LOG.info(s"Stop container ${containerId.toString} on node: ${nodeId.toString} ") + client.stopContainerAsync(containerId, nodeId) + } + + def stop(): Unit = { + LOG.info(s"Shutdown NMClient") + client.stop() + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/RMClient.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/RMClient.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/RMClient.scala new file mode 100644 index 0000000..629e233 --- /dev/null +++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/RMClient.scala @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.experiments.yarn.glue + +import akka.actor.ActorRef +import org.apache.gearpump.experiments.yarn.appmaster.YarnAppMaster.{AppMasterRegistered, ContainersAllocated, ContainersCompleted, ResourceManagerException, ShutdownApplication} +import org.apache.gearpump.experiments.yarn.glue.Records._ +import org.apache.gearpump.util.LogUtil +import org.apache.hadoop.yarn.api.records.{ApplicationId => YarnApplicationId, ApplicationReport => YarnApplicationReport, Container => YarnContainer, ContainerId => YarnContainerId, ContainerStatus => YarnContainerStatus, FinalApplicationStatus, NodeId => YarnNodeId, NodeReport, Priority, Resource => YarnResource} +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync +import org.slf4j.Logger + +import scala.collection.JavaConverters._ + +/** + * Adapter for resource manager client + */ +class RMClient(yarnConf: YarnConfig) extends AMRMClientAsync.CallbackHandler { + + private val LOG: Logger = LogUtil.getLogger(getClass) + + private var reportTo: ActorRef = null + private var client: AMRMClientAsync[ContainerRequest] = null + + def start(reportTo: ActorRef): Unit = { + LOG.info("Starting Resource Manager Client RMClient...") + this.reportTo = reportTo + client = startAMRMClient + } + + private def startAMRMClient: AMRMClientAsync[ContainerRequest] = { + val timeIntervalMs = 1000 // ms + val amrmClient = AMRMClientAsync.createAMRMClientAsync[ContainerRequest](timeIntervalMs, this) + amrmClient.init(yarnConf.conf) + amrmClient.start() + amrmClient + } + + override def getProgress: Float = 0.5F + + private var allocatedContainers = Set.empty[YarnContainerId] + + private[glue] + override def onContainersAllocated(containers: java.util.List[YarnContainer]) { + val newContainers = containers.asScala.toList.filterNot(container => + allocatedContainers.contains(container.getId)) + allocatedContainers ++= newContainers.map(_.getId) + LOG.info(s"New allocated ${newContainers.size} containers") + reportTo ! ContainersAllocated(newContainers.map(yarnContainerToContainer(_))) + } + + private[glue] + override def onContainersCompleted(completedContainers: java.util.List[YarnContainerStatus]) + : Unit = { + LOG.info(s"Got response from RM. Completed containers=${completedContainers.size()}") + reportTo ! ContainersCompleted( + completedContainers.asScala.toList.map(yarnContainerStatusToContainerStatus(_))) + } + + private[glue] + override def onError(ex: Throwable) { + LOG.info("Error occurred") + reportTo ! ResourceManagerException(ex) + } + + private[glue] + override def onNodesUpdated(updatedNodes: java.util.List[NodeReport]): Unit = { + LOG.info("onNodesUpdates") + } + + private[glue] + override def onShutdownRequest() { + LOG.info("Shutdown requested") + reportTo ! ShutdownApplication + } + + def requestContainers(containers: List[Resource]): Unit = { + LOG.info(s"request Resource, slots: ${containers.length}, ${containers.mkString("\n")}") + containers.foreach(resource => { + client.addContainerRequest(createContainerRequest(resource)) + }) + } + + private def createContainerRequest(capability: Resource): ContainerRequest = { + LOG.info("creating ContainerRequest") + val priorityRecord = org.apache.hadoop.yarn.util.Records.newRecord(classOf[Priority]) + priorityRecord.setPriority(0) + val priority = Priority.newInstance(0) + new ContainerRequest(capability, null, null, priority) + } + + def registerAppMaster(host: String, port: Int, url: String): Unit = { + LOG.info(s"Received RegisterAMMessage! $host:$port $url") + val response = client.registerApplicationMaster(host, port, url) + LOG.info("Received RegisterAppMasterResponse ") + reportTo ! AppMasterRegistered + } + + def shutdownApplication(): Unit = { + LOG.info(s"Shutdown application") + client.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, "success", null) + client.stop() + } + + def failApplication(ex: Throwable): Unit = { + LOG.error(s"Application failed! ", ex) + client.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, ex.getMessage, null) + client.stop() + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/Records.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/Records.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/Records.scala new file mode 100644 index 0000000..ca729ce --- /dev/null +++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/Records.scala @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.experiments.yarn.glue + +import org.apache.hadoop.yarn.api.records.{ApplicationId => YarnApplicationId, ApplicationReport => YarnApplicationReport, ApplicationSubmissionContext, Container => YarnContainer, ContainerId => YarnContainerId, ContainerStatus => YarnContainerStatus, NodeId => YarnNodeId, Resource => YarnResource, YarnApplicationState} +import org.apache.hadoop.yarn.util.{Records => YarnRecords} + +import scala.language.implicitConversions + +object Records { + def newRecord[T](clazz: Class[T]): T = YarnRecords.newRecord(clazz) + + def newAppSubmissionContext: ApplicationSubmissionContext = { + YarnRecords.newRecord(classOf[ApplicationSubmissionContext]) + } + + class ApplicationId(private[glue] val impl: YarnApplicationId) { + def getId: Int = impl.getId + + override def toString: String = impl.toString + + override def equals(other: Any): Boolean = { + if (other.isInstanceOf[ApplicationId]) { + impl.equals(other.asInstanceOf[ApplicationId].impl) + } else { + false + } + } + + override def hashCode(): Int = { + impl.hashCode() + } + } + + object ApplicationId { + def newInstance(timestamp: Long, id: Int): ApplicationId = { + YarnApplicationId.newInstance(timestamp, id) + } + } + + class ApplicationReport(private[glue] val impl: YarnApplicationReport) { + def getApplicationId: ApplicationId = impl.getApplicationId + + def getDiagnostics: String = impl.getDiagnostics + + def getFinishTime: Long = impl.getFinishTime + + def getOriginalTrackingUrl: String = impl.getOriginalTrackingUrl + + def getYarnApplicationState: YarnApplicationState = impl.getYarnApplicationState + + override def toString: String = impl.toString + } + class Resource(private[glue] val impl: YarnResource) { + override def toString: String = impl.toString + + override def equals(other: Any): Boolean = { + if (other.isInstanceOf[Resource]) { + impl.equals(other.asInstanceOf[Resource].impl) + } else { + false + } + } + + override def hashCode(): Int = { + impl.hashCode() + } + } + + object Resource { + def newInstance(memory: Int, vCores: Int): Resource = { + YarnResource.newInstance(memory, vCores) + } + } + + class Container(private[glue] val impl: YarnContainer) { + def getId: ContainerId = impl.getId + + def getNodeHttpAddress: String = impl.getNodeHttpAddress + + def getNodeId: NodeId = impl.getNodeId + + override def toString: String = impl.toString + + override def equals(other: Any): Boolean = { + if (other.isInstanceOf[Container]) { + impl.equals(other.asInstanceOf[Container].impl) + } else { + false + } + } + + override def hashCode(): Int = { + impl.hashCode() + } + } + + class ContainerId(private[glue] val impl: YarnContainerId) { + override def toString: String = impl.toString + + override def equals(other: Any): Boolean = { + if (other.isInstanceOf[ContainerId]) { + impl.equals(other.asInstanceOf[ContainerId].impl) + } else { + false + } + } + + override def hashCode(): Int = { + impl.hashCode() + } + } + + object ContainerId { + def fromString(worker: String): ContainerId = YarnContainerId.fromString(worker) + } + + class NodeId(private[glue] val impl: YarnNodeId) { + def getHost: String = impl.getHost + + override def toString: String = impl.toString + + override def equals(other: Any): Boolean = { + if (other.isInstanceOf[NodeId]) { + impl.equals(other.asInstanceOf[NodeId].impl) + } else { + false + } + } + + override def hashCode(): Int = { + impl.hashCode() + } + } + + class ContainerStatus(private[glue] val impl: YarnContainerStatus) { + def getDiagnostics: String = impl.getDiagnostics + + def getContainerId: ContainerId = impl.getContainerId + + def getExitStatus: Int = impl.getExitStatus + + override def toString: String = impl.toString + } + + private[glue] implicit def yarnResourceToResource(res: YarnResource): Resource = { + new Resource(res) + } + + private[glue] implicit def resourceToYarnResource(res: Resource): YarnResource = { + res.impl + } + + private[glue] implicit def yarnAppIdToAppId(yarn: YarnApplicationId): ApplicationId = { + new ApplicationId(yarn) + } + + private[glue] implicit def appIdToYarnAppId(app: ApplicationId): YarnApplicationId = { + app.impl + } + + private[glue] implicit def yarnReportToReport(yarn: YarnApplicationReport): ApplicationReport = { + new ApplicationReport(yarn) + } + + private[glue] implicit def reportToYarnReport(app: ApplicationReport): YarnApplicationReport = { + app.impl + } + + private[glue] implicit def yarnContainerToContainer(yarn: YarnContainer): Container = { + new Container(yarn) + } + + private[glue] implicit def containerToYarnContainer(app: Container): YarnContainer = { + app.impl + } + + private[glue] implicit def yarnContainerStatusToContainerStatus(yarn: YarnContainerStatus) + : ContainerStatus = { + new ContainerStatus(yarn) + } + + private[glue] implicit def containerStatusToYarnContainerStatus(app: ContainerStatus) + : YarnContainerStatus = { + app.impl + } + + private[glue] implicit def containerIdToYarnContainerId(app: ContainerId): YarnContainerId = { + app.impl + } + + private[glue] implicit def yarnContainerIdToContainerId(yarn: YarnContainerId): ContainerId = { + new ContainerId(yarn) + } + + private[glue] implicit def nodeIdToYarnNodeId(app: NodeId): YarnNodeId = { + app.impl + } + + private[glue] implicit def yarnNodeIdToNodeId(yarn: YarnNodeId): NodeId = { + new NodeId(yarn) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/YarnClient.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/YarnClient.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/YarnClient.scala new file mode 100644 index 0000000..634dd0e --- /dev/null +++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/YarnClient.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.experiments.yarn.glue + +import org.apache.gearpump.experiments.yarn.glue.Records._ +import org.apache.gearpump.util.LogUtil +import org.apache.hadoop.yarn.client.api + +/** + * Adapter for api.YarnClient + */ +class YarnClient(yarn: YarnConfig) { + + val LOG = LogUtil.getLogger(getClass) + + private val client: api.YarnClient = api.YarnClient.createYarnClient + client.init(yarn.conf) + client.start() + LOG.info("Starting YarnClient...") + + def createApplication: ApplicationId = { + val app = client.createApplication() + val response = app.getNewApplicationResponse() + LOG.info("Create application, appId: " + response.getApplicationId()) + response.getApplicationId() + } + + def getApplicationReport(appId: ApplicationId): ApplicationReport = { + client.getApplicationReport(appId) + } + + def submit( + name: String, appId: ApplicationId, command: String, resource: Resource, queue: String, + packagePath: String, configPath: String): ApplicationId = { + + val appContext = Records.newAppSubmissionContext + appContext.setApplicationName(name) + appContext.setApplicationId(appId) + + val containerContext = ContainerLaunchContext(yarn.conf, command, packagePath, configPath) + appContext.setAMContainerSpec(containerContext) + appContext.setResource(resource) + appContext.setQueue(queue) + + LOG.info(s"Submit Application $appId to YARN...") + client.submitApplication(appContext) + } + + def awaitApplication(appId: ApplicationId, timeoutMilliseconds: Long = Long.MaxValue) + : ApplicationReport = { + import org.apache.hadoop.yarn.api.records.YarnApplicationState._ + val terminated = Set(FINISHED, KILLED, FAILED, RUNNING) + var result: ApplicationReport = null + var done = false + + val start = System.currentTimeMillis() + def timeout: Boolean = { + val now = System.currentTimeMillis() + if (now - start > timeoutMilliseconds) { + true + } else { + false + } + } + + while (!done && !timeout) { + val report = client.getApplicationReport(appId) + val status = report.getYarnApplicationState + if (terminated.contains(status)) { + done = true + result = report + } else { + Console.print(".") + Thread.sleep(1000) + } + } + + if (timeout) { + throw new Exception(s"Launch Application $appId timeout...") + } + result + } + + def stop(): Unit = { + client.stop() + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/YarnConfig.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/YarnConfig.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/YarnConfig.scala new file mode 100644 index 0000000..4b927de --- /dev/null +++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/YarnConfig.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.experiments.yarn.glue + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.yarn.conf.YarnConfiguration + +/** + * wrapper for yarn configuration + */ +case class YarnConfig(conf: YarnConfiguration = new YarnConfiguration(new Configuration(true))) { + def writeXml(out: java.io.Writer): Unit = conf.writeXml(out) + + def resourceManager: String = { + conf.get("yarn.resourcemanager.hostname") + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/package.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/package.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/package.scala new file mode 100644 index 0000000..33e5778 --- /dev/null +++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/glue/package.scala @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.experiments.yarn + +/** + * YARN facade to decouple Gearpump with YARN. + */ +package object glue { +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/CommandSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/CommandSpec.scala b/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/CommandSpec.scala deleted file mode 100644 index 2a6bf38..0000000 --- a/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/CommandSpec.scala +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.experiments.yarn.appmaster - -import com.typesafe.config.ConfigFactory -import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} - -import io.gearpump.cluster.TestUtil -import io.gearpump.transport.HostPort - -class CommandSpec extends FlatSpec with Matchers with BeforeAndAfterAll { - val config = ConfigFactory.parseString( - - """ - | - |gearpump { - | yarn { - | client { - | package -path = "/user/gearpump/gearpump.zip" - | } - | - | applicationmaster { - | ## Memory of YarnAppMaster - | command = "$JAVA_HOME/bin/java -Xmx512m" - | memory = "512" - | vcores = "1" - | queue = "default" - | } - | - | master { - | ## Memory of master daemon - | command = "$JAVA_HOME/bin/java -Xmx512m" - | memory = "512" - | vcores = "1" - | } - | - | worker { - | ## memory of worker daemon - | command = "$JAVA_HOME/bin/java -Xmx512m" - | containers = "4" - | ## This also contains all memory for child executors. - | memory = "4096" - | vcores = "1" - | } - | services { - | enabled = true - | } - | } - |} - """.stripMargin).withFallback(TestUtil.DEFAULT_CONFIG) - - "MasterCommand" should "create correct command line" in { - val version = "gearpump-0.1" - val master = MasterCommand(config, version, HostPort("127.0.0.1", 8080)) - - // scalastyle:off line.size.limit - val expected = "$JAVA_HOME/bin/java -Xmx512m -cp conf:pack/gearpump-0.1/conf:pack/gearpump-0.1/lib/daemon/*:pack/gearpump-0.1/lib/*:$CLASSPATH -Dgearpump.cluster.masters.0=127.0.0.1:8080 -Dgearpump.hostname=127.0.0.1 -Dgearpump.master-resource-manager-container-id={{CONTAINER_ID}} -Dgearpump.home={{LOCAL_DIRS}}/{{CONTAINER_ID}}/pack/gearpump-0.1 -Dgearpump.log.daemon.dir=<LOG_DIR> -Dgearpump.log.application.dir=<LOG_DIR> io.gearpump.cluster.main.Master -ip 127.0.0.1 -port 8080 2>&1 | /usr/bin/tee -a <LOG_DIR>/stderr" - // scalastyle:on line.size.limit - assert(master.get == expected) - } - - "WorkerCommand" should "create correct command line" in { - val version = "gearpump-0.1" - val worker = WorkerCommand(config, version, HostPort("127.0.0.1", 8080), "worker-machine") - // scalastyle:off line.size.limit - val expected = "$JAVA_HOME/bin/java -Xmx512m -cp conf:pack/gearpump-0.1/conf:pack/gearpump-0.1/lib/daemon/*:pack/gearpump-0.1/lib/*:$CLASSPATH -Dgearpump.cluster.masters.0=127.0.0.1:8080 -Dgearpump.log.daemon.dir=<LOG_DIR> -Dgearpump.worker-resource-manager-container-id={{CONTAINER_ID}} -Dgearpump.home={{LOCAL_DIRS}}/{{CONTAINER_ID}}/pack/gearpump-0.1 -Dgearpump.log.application.dir=<LOG_DIR> -Dgearpump.hostname=worker-machine io.gearpump.cluster.main.Worker 2>&1 | /usr/bin/tee -a <LOG_DIR>/stderr" - // scalastyle:on line.size.limit - assert(worker.get == expected) - } - - "AppMasterCommand" should "create correct command line" in { - val version = "gearpump-0.1" - val appmaster = AppMasterCommand(config, version, Array("arg1", "arg2", "arg3")) - // scalastyle:off line.size.limit - val expected = "$JAVA_HOME/bin/java -Xmx512m -cp conf:pack/gearpump-0.1/conf:pack/gearpump-0.1/dashboard:pack/gearpump-0.1/lib/*:pack/gearpump-0.1/lib/daemon/*:pack/gearpump-0.1/lib/services/*:pack/gearpump-0.1/lib/yarn/*:$CLASSPATH -Dgearpump.home={{LOCAL_DIRS}}/{{CONTAINER_ID}}/pack/gearpump-0.1 -Dgearpump.binary-version-with-scala-version=gearpump-0.1 -Dgearpump.log.daemon.dir=<LOG_DIR> -Dgearpump.log.application.dir=<LOG_DIR> -Dgearpump.hostname={{NM_HOST}} io.gearpump.experiments.yarn.appmaster.YarnAppMaster arg1 arg2 arg3 2>&1 | /usr/bin/tee -a <LOG_DIR>/stderr" - // scalastyle:on line.size.limit - assert(appmaster.get == expected) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/UIServiceSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/UIServiceSpec.scala b/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/UIServiceSpec.scala deleted file mode 100644 index f8f9fe8..0000000 --- a/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/UIServiceSpec.scala +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.experiments.yarn.appmaster - -import scala.concurrent.Await -import scala.concurrent.duration.Duration - -import akka.actor.{ActorRef, ActorSystem, Props} -import akka.testkit.TestProbe -import com.typesafe.config.ConfigFactory -import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} - -import io.gearpump.cluster.TestUtil -import io.gearpump.experiments.yarn.appmaster.UIServiceSpec.{Info, MockUI} -import io.gearpump.transport.HostPort -import io.gearpump.util.Constants - -class UIServiceSpec extends FlatSpec with Matchers with BeforeAndAfterAll { - implicit var system: ActorSystem = null - - override def beforeAll(): Unit = { - system = ActorSystem(getClass.getSimpleName, TestUtil.DEFAULT_CONFIG) - } - - override def afterAll(): Unit = { - system.terminate() - Await.result(system.whenTerminated, Duration.Inf) - } - - it should "start UI server correctly" in { - val probe = TestProbe() - val masters = List( - HostPort("127.0.0.1", 3000), - HostPort("127.0.0.1", 3001), - HostPort("127.0.0.1", 3002) - ) - val host = "local" - val port = 8091 - - val ui = system.actorOf(Props(new MockUI(masters, host, port, probe.ref))) - - probe.expectMsgPF() { - case info: Info => { - assert(info.masterHost == "127.0.0.1") - assert(info.masterPort == 3000) - val conf = ConfigFactory.parseFile(new java.io.File(info.configFile)) - assert(conf.getString(Constants.GEARPUMP_SERVICE_HOST) == host) - assert(conf.getString(Constants.GEARPUMP_SERVICE_HTTP) == "8091") - assert(conf.getString(Constants.NETTY_TCP_HOSTNAME) == host) - } - } - - system.stop(ui) - } -} - -object UIServiceSpec { - - case class Info(supervisor: String, masterHost: String, masterPort: Int, configFile: String) - - class MockUI(masters: List[HostPort], host: String, port: Int, probe: ActorRef) - extends UIService(masters, host, port) { - - override def launch( - supervisor: String, masterHost: String, masterPort: Int, configFile: String): Unit = { - probe ! Info(supervisor, masterHost, masterPort, configFile) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMasterSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMasterSpec.scala b/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMasterSpec.scala deleted file mode 100644 index 84d6d37..0000000 --- a/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMasterSpec.scala +++ /dev/null @@ -1,221 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.experiments.yarn.appmaster - -import scala.concurrent.Await -import scala.concurrent.duration.Duration - -import akka.actor.{Actor, ActorSystem, Props} -import akka.testkit.{TestActorRef, TestProbe} -import com.typesafe.config.ConfigFactory -import org.mockito.ArgumentCaptor -import org.mockito.Matchers._ -import org.mockito.Mockito._ -import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} - -import io.gearpump.cluster.ClientToMaster.{AddWorker, CommandResult, RemoveWorker} -import io.gearpump.cluster.TestUtil -import io.gearpump.experiments.yarn.Constants -import io.gearpump.experiments.yarn.appmaster.YarnAppMaster.{ActiveConfig, AppMasterRegistered, ClusterInfo, ContainerStarted, ContainersAllocated, GetActiveConfig, Kill, QueryClusterInfo, QueryVersion, ResourceManagerException, Version} -import io.gearpump.experiments.yarn.appmaster.YarnAppMasterSpec.UI -import io.gearpump.experiments.yarn.glue.Records.{Container, Resource, _} -import io.gearpump.experiments.yarn.glue.{NMClient, RMClient} -import io.gearpump.transport.HostPort - -class YarnAppMasterSpec extends FlatSpec with Matchers with BeforeAndAfterAll { - - val config = ConfigFactory.parseString( - - """ - |gearpump { - | yarn { - | client { - | package -path = "/user/gearpump/gearpump.zip" - | } - | - | applicationmaster { - | ## Memory of YarnAppMaster - | command = "$JAVA_HOME/bin/java -Xmx512m" - | memory = "512" - | vcores = "1" - | queue = "default" - | } - | - | master { - | ## Memory of master daemon - | command = "$JAVA_HOME/bin/java -Xmx512m" - | memory = "512" - | vcores = "1" - | } - | - | worker { - | ## memory of worker daemon - | command = "$JAVA_HOME/bin/java -Xmx512m" - | containers = "4" - | ## This also contains all memory for child executors. - | memory = "4096" - | vcores = "1" - | } - | services { - | enabled = true - | } - | } - |} - """.stripMargin).withFallback(TestUtil.DEFAULT_CONFIG) - - val masterCount = 1 - val workerCount = config.getString(Constants.WORKER_CONTAINERS).toInt - - implicit var system: ActorSystem = null - val packagePath = "/user/gearpump/gearpump.zip" - val configPath = "/user/my/conf" - - override def beforeAll(): Unit = { - system = ActorSystem("test", config) - } - - override def afterAll(): Unit = { - system.terminate() - Await.result(system.whenTerminated, Duration.Inf) - } - - private def startAppMaster(): (TestActorRef[YarnAppMaster], TestProbe, NMClient, RMClient) = { - val rmClient = mock(classOf[RMClient]) - val nmClient = mock(classOf[NMClient]) - val ui = mock(classOf[UIFactory]) - when(ui.props(any[List[HostPort]], anyString, anyInt)).thenReturn(Props(new UI)) - - val appMaster = TestActorRef[YarnAppMaster](Props( - new YarnAppMaster(rmClient, nmClient, packagePath, configPath, ui))) - - verify(rmClient).start(appMaster) - verify(nmClient).start(appMaster) - verify(rmClient).registerAppMaster(anyString, anyInt, anyString) - - appMaster ! AppMasterRegistered - - val masterResources = ArgumentCaptor.forClass(classOf[List[Resource]]) - verify(rmClient).requestContainers(masterResources.capture()) - assert(masterResources.getValue.size == masterCount) - - val masterContainer = mock(classOf[Container]) - val mockNode = mock(classOf[NodeId]) - val mockId = mock(classOf[ContainerId]) - when(masterContainer.getNodeId).thenReturn(mockNode) - when(masterContainer.getId).thenReturn(mockId) - - // Launchs master - appMaster ! ContainersAllocated(List.fill(masterCount)(masterContainer)) - verify(nmClient, - times(masterCount)).launchCommand(any[Container], anyString, anyString, anyString) - - // Master containers started - (0 until masterCount).foreach(_ => appMaster ! ContainerStarted(mockId)) - - // Transition to start workers - val workerResources = ArgumentCaptor.forClass(classOf[List[Resource]]) - verify(rmClient, times(2)).requestContainers(workerResources.capture()) - assert(workerResources.getValue.size == workerCount) - - // Launchs workers - val workerContainer = mock(classOf[Container]) - when(workerContainer.getNodeId).thenReturn(mockNode) - val workerContainerId = ContainerId.fromString("container_1449802454214_0034_01_000006") - when(workerContainer.getId).thenReturn(workerContainerId) - appMaster ! ContainersAllocated(List.fill(workerCount)(workerContainer)) - verify(nmClient, times(workerCount + masterCount)) - .launchCommand(any[Container], anyString, anyString, anyString) - - // Worker containers started - (0 until workerCount).foreach(_ => appMaster ! ContainerStarted(mockId)) - - // Starts UI server - verify(ui, times(1)).props(any[List[HostPort]], anyString, anyInt) - - // Application Ready... - val client = TestProbe() - - // Gets active config - appMaster.tell(GetActiveConfig("client"), client.ref) - client.expectMsgType[ActiveConfig] - - // Queries version - appMaster.tell(QueryVersion, client.ref) - client.expectMsgType[Version] - - // Queries version - appMaster.tell(QueryClusterInfo, client.ref) - client.expectMsgType[ClusterInfo] - - // Adds worker - val newWorkerCount = 2 - appMaster.tell(AddWorker(newWorkerCount), client.ref) - client.expectMsgType[CommandResult] - val newWorkerResources = ArgumentCaptor.forClass(classOf[List[Resource]]) - verify(rmClient, times(3)).requestContainers(newWorkerResources.capture()) - assert(newWorkerResources.getValue.size == newWorkerCount) - - // New container allocated - appMaster ! ContainersAllocated(List.fill(newWorkerCount)(workerContainer)) - verify(nmClient, times(workerCount + masterCount + newWorkerCount)). - launchCommand(any[Container], anyString, anyString, anyString) - - // New worker containers started - (0 until newWorkerCount).foreach(_ => appMaster ! ContainerStarted(mockId)) - - // Same UI server - verify(ui, times(1)).props(any[List[HostPort]], anyString, anyInt) - - // Removes worker - appMaster.tell(RemoveWorker(workerContainerId.toString), client.ref) - client.expectMsgType[CommandResult] - verify(nmClient).stopContainer(any[ContainerId], any[NodeId]) - - (appMaster, client: TestProbe, nmClient, rmClient) - } - - it should "start master, worker and UI on YARN" in { - val env = startAppMaster() - val (appMaster, client, nmClient, rmClient) = env - - // Kills the app - appMaster.tell(Kill, client.ref) - client.expectMsgType[CommandResult] - verify(nmClient, times(1)).stop() - verify(rmClient, times(1)).shutdownApplication() - } - - it should "handle resource manager errors" in { - val env = startAppMaster() - val (appMaster, client, nmClient, rmClient) = env - - // on error - val ex = new Exception("expected resource manager exception") - appMaster.tell(ResourceManagerException(ex), client.ref) - verify(nmClient, times(1)).stop() - verify(rmClient, times(1)).failApplication(ex) - } -} - -object YarnAppMasterSpec { - - class UI extends Actor { - def receive: Receive = null - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/client/LaunchClusterSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/client/LaunchClusterSpec.scala b/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/client/LaunchClusterSpec.scala deleted file mode 100644 index 3bd7f4f..0000000 --- a/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/client/LaunchClusterSpec.scala +++ /dev/null @@ -1,232 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.experiments.yarn.client - -import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream, OutputStream} -import java.util.Random -import java.util.zip.{ZipEntry, ZipOutputStream} -import scala.concurrent.Await -import scala.concurrent.duration.Duration -import scala.util.Try - -import akka.actor.ActorSystem -import akka.testkit.TestProbe -import com.typesafe.config.ConfigFactory -import org.mockito.Matchers._ -import org.mockito.Mockito._ -import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} - -import io.gearpump.cluster.TestUtil -import io.gearpump.experiments.yarn.appmaster.YarnAppMaster.{ActiveConfig, GetActiveConfig} -import io.gearpump.experiments.yarn.glue.Records._ -import io.gearpump.experiments.yarn.glue.{FileSystem, YarnClient, YarnConfig} -import io.gearpump.util.FileUtils -class LaunchClusterSpec extends FlatSpec with Matchers with BeforeAndAfterAll { - implicit var system: ActorSystem = null - - val rand = new Random() - - private def randomArray(size: Int): Array[Byte] = { - val array = new Array[Byte](size) - rand.nextBytes(array) - array - } - val appId = ApplicationId.newInstance(0L, 0) - - val akka = ConfigFactory.parseString( - - """ - |gearpump { - | yarn { - | client { - | package -path = "/user/gearpump/gearpump.zip" - | } - | - | applicationmaster { - | ## Memory of YarnAppMaster - | command = "$JAVA_HOME/bin/java -Xmx512m" - | memory = "512" - | vcores = "1" - | queue = "default" - | } - | - | master { - | ## Memory of master daemon - | command = "$JAVA_HOME/bin/java -Xmx512m" - | containers = "2" - | memory = "512" - | vcores = "1" - | } - | - | worker { - | ## memory of worker daemon - | command = "$JAVA_HOME/bin/java -Xmx512m" - | containers = "4" - | ## This also contains all memory for child executors. - | memory = "4096" - | vcores = "1" - | } - | services { - | enabled = true - | } - | } - |} - """.stripMargin).withFallback(TestUtil.DEFAULT_CONFIG) - - override def beforeAll(): Unit = { - system = ActorSystem(getClass.getSimpleName, akka) - } - - override def afterAll(): Unit = { - system.terminate() - Await.result(system.whenTerminated, Duration.Inf) - } - - it should "reject non-zip files" in { - val yarnConfig = mock(classOf[YarnConfig]) - val yarnClient = mock(classOf[YarnClient]) - val fs = mock(classOf[FileSystem]) - val appMasterResolver = mock(classOf[AppMasterResolver]) - - val launcher = new LaunchCluster(akka, yarnConfig, yarnClient, fs, system, appMasterResolver) - val packagePath = "gearpump.zip2" - assert(Try(launcher.submit("gearpump", packagePath)).isFailure) - } - - it should "reject if we cannot find the package file on HDFS" in { - val yarnConfig = mock(classOf[YarnConfig]) - val yarnClient = mock(classOf[YarnClient]) - val fs = mock(classOf[FileSystem]) - val appMasterResolver = mock(classOf[AppMasterResolver]) - - val launcher = new LaunchCluster(akka, yarnConfig, yarnClient, fs, system, appMasterResolver) - val packagePath = "gearpump.zip" - when(fs.exists(anyString)).thenReturn(false) - assert(Try(launcher.submit("gearpump", packagePath)).isFailure) - } - - it should "throw when package exists on HDFS, but the file is corrupted" in { - val yarnConfig = mock(classOf[YarnConfig]) - val yarnClient = mock(classOf[YarnClient]) - val fs = mock(classOf[FileSystem]) - val appMasterResolver = mock(classOf[AppMasterResolver]) - - val launcher = new LaunchCluster(akka, yarnConfig, yarnClient, fs, system, appMasterResolver) - val packagePath = "gearpump.zip" - when(fs.exists(anyString)).thenReturn(true) - - val content = new ByteArrayInputStream(randomArray(10)) - when(fs.open(anyString)).thenReturn(content) - assert(Try(launcher.submit("gearpump", packagePath)).isFailure) - content.close() - } - - it should "throw when the HDFS package version is not consistent with local version" in { - val yarnConfig = mock(classOf[YarnConfig]) - val yarnClient = mock(classOf[YarnClient]) - val fs = mock(classOf[FileSystem]) - val appMasterResolver = mock(classOf[AppMasterResolver]) - - val version = "gearpump-0.2" - val launcher = new LaunchCluster(akka, yarnConfig, yarnClient, fs, system, - appMasterResolver, version) - val packagePath = "gearpump.zip" - when(fs.exists(anyString)).thenReturn(true) - - val oldVesion = "gearpump-0.1" - when(fs.open(anyString)).thenReturn(zipInputStream(oldVesion)) - assert(Try(launcher.submit("gearpump", packagePath)).isFailure) - } - - it should "upload config file to HDFS when submitting" in { - val yarnConfig = mock(classOf[YarnConfig]) - val yarnClient = mock(classOf[YarnClient]) - val fs = mock(classOf[FileSystem]) - val appMasterResolver = mock(classOf[AppMasterResolver]) - - val version = "gearpump-0.2" - val launcher = new LaunchCluster(akka, yarnConfig, yarnClient, - fs, system, appMasterResolver, version) - val packagePath = "gearpump.zip" - - val out = mock(classOf[OutputStream]) - when(fs.exists(anyString)).thenReturn(true) - when(fs.create(anyString)).thenReturn(out) - when(fs.getHomeDirectory).thenReturn("/root") - - when(fs.open(anyString)).thenReturn(zipInputStream(version)) - - val report = mock(classOf[ApplicationReport]) - when(yarnClient.awaitApplication(any[ApplicationId], anyLong())).thenReturn(report) - - when(report.getApplicationId).thenReturn(appId) - when(yarnClient.createApplication).thenReturn(appId) - assert(appId == launcher.submit("gearpump", packagePath)) - - // 3 Config files are uploaded to HDFS, one is akka.conf, - // one is yarn-site.xml, one is log4j.properties. - verify(fs, times(3)).create(anyString) - verify(out, times(3)).close() - - // val workerResources = ArgumentCaptor.forClass(classOf[List[Resource]]) - // scalastyle:off line.size.limit - val expectedCommand = "$JAVA_HOME/bin/java -Xmx512m -cp conf:pack/gearpump-0.2/conf:pack/gearpump-0.2/dashboard:pack/gearpump-0.2/lib/*:pack/gearpump-0.2/lib/daemon/*:pack/gearpump-0.2/lib/services/*:pack/gearpump-0.2/lib/yarn/*:$CLASSPATH -Dgearpump.home={{LOCAL_DIRS}}/{{CONTAINER_ID}}/pack/gearpump-0.2 -Dgearpump.binary-version-with-scala-version=gearpump-0.2 -Dgearpump.log.daemon.dir=<LOG_DIR> -Dgearpump.log.application.dir=<LOG_DIR> -Dgearpump.hostname={{NM_HOST}} io.gearpump.experiments.yarn.appmaster.YarnAppMaster -conf /root/.gearpump_application_0_0000/conf/ -package gearpump.zip 2>&1 | /usr/bin/tee -a <LOG_DIR>/stderr" - // scalastyle:on line.size.limit - verify(yarnClient).submit("gearpump", appId, expectedCommand, - Resource.newInstance(512, 1), "default", - "gearpump.zip", "/root/.gearpump_application_0_0000/conf/") - } - - it should "save active config from Gearpump cluster" in { - val yarnConfig = mock(classOf[YarnConfig]) - val yarnClient = mock(classOf[YarnClient]) - val fs = mock(classOf[FileSystem]) - val appMasterResolver = mock(classOf[AppMasterResolver]) - val appMaster = TestProbe() - - val version = "gearpump-0.2" - val launcher = new LaunchCluster(akka, yarnConfig, yarnClient, fs, system, - appMasterResolver, version) - val outputPath = java.io.File.createTempFile("LaunchClusterSpec", ".conf") - - when(appMasterResolver.resolve(any[ApplicationId], anyInt)).thenReturn(appMaster.ref) - val fileFuture = launcher.saveConfig(appId, outputPath.getPath) - appMaster.expectMsgType[GetActiveConfig] - appMaster.reply(ActiveConfig(ConfigFactory.empty())) - - import scala.concurrent.duration._ - val file = Await.result(fileFuture, 30.seconds).asInstanceOf[java.io.File] - - assert(!FileUtils.read(file).isEmpty) - file.delete() - } - - private def zipInputStream(version: String): InputStream = { - val bytes = new ByteArrayOutputStream(1000) - val zipOut = new ZipOutputStream(bytes) - - // Not available on BufferedOutputStream - zipOut.putNextEntry(new ZipEntry(s"$version/README.md")) - zipOut.write("README".getBytes()) - // Not available on BufferedOutputStream - zipOut.closeEntry() - zipOut.close() - new ByteArrayInputStream(bytes.toByteArray) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/client/ManageClusterSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/client/ManageClusterSpec.scala b/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/client/ManageClusterSpec.scala deleted file mode 100644 index b324ece..0000000 --- a/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/client/ManageClusterSpec.scala +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.experiments.yarn.client - -import scala.concurrent.Await -import scala.concurrent.duration.Duration - -import akka.actor.ActorSystem -import akka.testkit.TestProbe -import com.typesafe.config.ConfigFactory -import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} - -import io.gearpump.cluster.ClientToMaster.{AddWorker, CommandResult, RemoveWorker} -import io.gearpump.cluster.TestUtil -import io.gearpump.cluster.main.ParseResult -import io.gearpump.experiments.yarn.appmaster.YarnAppMaster.{ActiveConfig, ClusterInfo, GetActiveConfig, Kill, QueryClusterInfo, QueryVersion, Version} -import io.gearpump.experiments.yarn.client.ManageCluster._ -import io.gearpump.experiments.yarn.glue.Records.ApplicationId -import io.gearpump.util.FileUtils - -class ManageClusterSpec extends FlatSpec with Matchers with BeforeAndAfterAll { - - implicit var system: ActorSystem = null - - override def beforeAll(): Unit = { - system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) - } - - override def afterAll(): Unit = { - system.terminate() - Await.result(system.whenTerminated, Duration.Inf) - } - - it should "getConfig from remote Gearpump" in { - val appId = ApplicationId.newInstance(0L, 0) - val appMaster = TestProbe() - val manager = new ManageCluster(appId, appMaster.ref, system) - - val output = java.io.File.createTempFile("managerClusterSpec", ".conf") - - val future = manager.command(GET_CONFIG, new ParseResult(Map("output" -> output.toString), - Array.empty[String])) - appMaster.expectMsgType[GetActiveConfig] - appMaster.reply(ActiveConfig(ConfigFactory.empty())) - import scala.concurrent.duration._ - Await.result(future, 30.seconds) - - val content = FileUtils.read(output) - assert(content.length > 0) - output.delete() - } - - it should "addworker" in { - val appId = ApplicationId.newInstance(0L, 0) - val appMaster = TestProbe() - val manager = new ManageCluster(appId, appMaster.ref, system) - - val future = manager.command(ADD_WORKER, new ParseResult(Map("count" -> 1.toString), - Array.empty[String])) - appMaster.expectMsg(AddWorker(1)) - appMaster.reply(CommandResult(true)) - import scala.concurrent.duration._ - val result = Await.result(future, 30.seconds).asInstanceOf[CommandResult] - assert(result.success) - } - - it should "removeworker" in { - val appId = ApplicationId.newInstance(0L, 0) - val appMaster = TestProbe() - val manager = new ManageCluster(appId, appMaster.ref, system) - - val future = manager.command(REMOVE_WORKER, new ParseResult(Map("container" -> "1"), - Array.empty[String])) - appMaster.expectMsg(RemoveWorker("1")) - appMaster.reply(CommandResult(true)) - import scala.concurrent.duration._ - val result = Await.result(future, 30.seconds).asInstanceOf[CommandResult] - assert(result.success) - } - - it should "get version" in { - val appId = ApplicationId.newInstance(0L, 0) - val appMaster = TestProbe() - val manager = new ManageCluster(appId, appMaster.ref, system) - val future = manager.command(VERSION, new ParseResult(Map("container" -> "1"), - Array.empty[String])) - appMaster.expectMsg(QueryVersion) - appMaster.reply(Version("version 0.1")) - import scala.concurrent.duration._ - val result = Await.result(future, 30.seconds).asInstanceOf[Version] - assert(result.version == "version 0.1") - } - - it should "get cluster info" in { - val appId = ApplicationId.newInstance(0L, 0) - val appMaster = TestProbe() - val manager = new ManageCluster(appId, appMaster.ref, system) - - val output = java.io.File.createTempFile("managerClusterSpec", ".conf") - - val future = manager.command(QUERY, new ParseResult(Map.empty[String, String], - Array.empty[String])) - appMaster.expectMsg(QueryClusterInfo) - appMaster.reply(ClusterInfo(List("master"), List("worker"))) - import scala.concurrent.duration._ - val result = Await.result(future, 30.seconds).asInstanceOf[ClusterInfo] - assert(result.masters.sameElements(List("master"))) - assert(result.workers.sameElements(List("worker"))) - } - - it should "kill the cluster" in { - val appId = ApplicationId.newInstance(0L, 0) - val appMaster = TestProbe() - val manager = new ManageCluster(appId, appMaster.ref, system) - - val output = java.io.File.createTempFile("managerClusterSpec", ".conf") - - val future = manager.command(KILL, new ParseResult(Map("container" -> "1"), - Array.empty[String])) - appMaster.expectMsg(Kill) - appMaster.reply(CommandResult(true)) - import scala.concurrent.duration._ - val result = Await.result(future, 30.seconds).asInstanceOf[CommandResult] - assert(result.success) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/appmaster/CommandSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/appmaster/CommandSpec.scala b/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/appmaster/CommandSpec.scala new file mode 100644 index 0000000..c4c5a65 --- /dev/null +++ b/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/appmaster/CommandSpec.scala @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.experiments.yarn.appmaster + +import com.typesafe.config.ConfigFactory +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} + +import org.apache.gearpump.cluster.TestUtil +import org.apache.gearpump.transport.HostPort + +class CommandSpec extends FlatSpec with Matchers with BeforeAndAfterAll { + val config = ConfigFactory.parseString( + + """ + | + |gearpump { + | yarn { + | client { + | package -path = "/user/gearpump/gearpump.zip" + | } + | + | applicationmaster { + | ## Memory of YarnAppMaster + | command = "$JAVA_HOME/bin/java -Xmx512m" + | memory = "512" + | vcores = "1" + | queue = "default" + | } + | + | master { + | ## Memory of master daemon + | command = "$JAVA_HOME/bin/java -Xmx512m" + | memory = "512" + | vcores = "1" + | } + | + | worker { + | ## memory of worker daemon + | command = "$JAVA_HOME/bin/java -Xmx512m" + | containers = "4" + | ## This also contains all memory for child executors. + | memory = "4096" + | vcores = "1" + | } + | services { + | enabled = true + | } + | } + |} + """.stripMargin).withFallback(TestUtil.DEFAULT_CONFIG) + + "MasterCommand" should "create correct command line" in { + val version = "gearpump-0.1" + val master = MasterCommand(config, version, HostPort("127.0.0.1", 8080)) + + // scalastyle:off line.size.limit + val expected = "$JAVA_HOME/bin/java -Xmx512m -cp conf:pack/gearpump-0.1/conf:pack/gearpump-0.1/lib/daemon/*:pack/gearpump-0.1/lib/*:$CLASSPATH -Dgearpump.cluster.masters.0=127.0.0.1:8080 -Dgearpump.hostname=127.0.0.1 -Dgearpump.master-resource-manager-container-id={{CONTAINER_ID}} -Dgearpump.home={{LOCAL_DIRS}}/{{CONTAINER_ID}}/pack/gearpump-0.1 -Dgearpump.log.daemon.dir=<LOG_DIR> -Dgearpump.log.application.dir=<LOG_DIR> org.apache.gearpump.cluster.main.Master -ip 127.0.0.1 -port 8080 2>&1 | /usr/bin/tee -a <LOG_DIR>/stderr" + // scalastyle:on line.size.limit + assert(master.get == expected) + } + + "WorkerCommand" should "create correct command line" in { + val version = "gearpump-0.1" + val worker = WorkerCommand(config, version, HostPort("127.0.0.1", 8080), "worker-machine") + // scalastyle:off line.size.limit + val expected = "$JAVA_HOME/bin/java -Xmx512m -cp conf:pack/gearpump-0.1/conf:pack/gearpump-0.1/lib/daemon/*:pack/gearpump-0.1/lib/*:$CLASSPATH -Dgearpump.cluster.masters.0=127.0.0.1:8080 -Dgearpump.log.daemon.dir=<LOG_DIR> -Dgearpump.worker-resource-manager-container-id={{CONTAINER_ID}} -Dgearpump.home={{LOCAL_DIRS}}/{{CONTAINER_ID}}/pack/gearpump-0.1 -Dgearpump.log.application.dir=<LOG_DIR> -Dgearpump.hostname=worker-machine org.apache.gearpump.cluster.main.Worker 2>&1 | /usr/bin/tee -a <LOG_DIR>/stderr" + // scalastyle:on line.size.limit + assert(worker.get == expected) + } + + "AppMasterCommand" should "create correct command line" in { + val version = "gearpump-0.1" + val appmaster = AppMasterCommand(config, version, Array("arg1", "arg2", "arg3")) + // scalastyle:off line.size.limit + val expected = "$JAVA_HOME/bin/java -Xmx512m -cp conf:pack/gearpump-0.1/conf:pack/gearpump-0.1/dashboard:pack/gearpump-0.1/lib/*:pack/gearpump-0.1/lib/daemon/*:pack/gearpump-0.1/lib/services/*:pack/gearpump-0.1/lib/yarn/*:$CLASSPATH -Dgearpump.home={{LOCAL_DIRS}}/{{CONTAINER_ID}}/pack/gearpump-0.1 -Dgearpump.binary-version-with-scala-version=gearpump-0.1 -Dgearpump.log.daemon.dir=<LOG_DIR> -Dgearpump.log.application.dir=<LOG_DIR> -Dgearpump.hostname={{NM_HOST}} org.apache.gearpump.experiments.yarn.appmaster.YarnAppMaster arg1 arg2 arg3 2>&1 | /usr/bin/tee -a <LOG_DIR>/stderr" + // scalastyle:on line.size.limit + assert(appmaster.get == expected) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/appmaster/UIServiceSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/appmaster/UIServiceSpec.scala b/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/appmaster/UIServiceSpec.scala new file mode 100644 index 0000000..34af145 --- /dev/null +++ b/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/appmaster/UIServiceSpec.scala @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.experiments.yarn.appmaster + +import akka.actor.{ActorRef, ActorSystem, Props} +import akka.testkit.TestProbe +import com.typesafe.config.ConfigFactory +import org.apache.gearpump.cluster.TestUtil +import org.apache.gearpump.experiments.yarn.appmaster.UIServiceSpec.{Info, MockUI} +import org.apache.gearpump.transport.HostPort +import org.apache.gearpump.util.Constants +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} + +import scala.concurrent.Await +import scala.concurrent.duration.Duration + +class UIServiceSpec extends FlatSpec with Matchers with BeforeAndAfterAll { + implicit var system: ActorSystem = null + + override def beforeAll(): Unit = { + system = ActorSystem(getClass.getSimpleName, TestUtil.DEFAULT_CONFIG) + } + + override def afterAll(): Unit = { + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + } + + it should "start UI server correctly" in { + val probe = TestProbe() + val masters = List( + HostPort("127.0.0.1", 3000), + HostPort("127.0.0.1", 3001), + HostPort("127.0.0.1", 3002) + ) + val host = "local" + val port = 8091 + + val ui = system.actorOf(Props(new MockUI(masters, host, port, probe.ref))) + + probe.expectMsgPF() { + case info: Info => { + assert(info.masterHost == "127.0.0.1") + assert(info.masterPort == 3000) + val conf = ConfigFactory.parseFile(new java.io.File(info.configFile)) + assert(conf.getString(Constants.GEARPUMP_SERVICE_HOST) == host) + assert(conf.getString(Constants.GEARPUMP_SERVICE_HTTP) == "8091") + assert(conf.getString(Constants.NETTY_TCP_HOSTNAME) == host) + } + } + + system.stop(ui) + } +} + +object UIServiceSpec { + + case class Info(supervisor: String, masterHost: String, masterPort: Int, configFile: String) + + class MockUI(masters: List[HostPort], host: String, port: Int, probe: ActorRef) + extends UIService(masters, host, port) { + + override def launch( + supervisor: String, masterHost: String, masterPort: Int, configFile: String): Unit = { + probe ! Info(supervisor, masterHost, masterPort, configFile) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/appmaster/YarnAppMasterSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/appmaster/YarnAppMasterSpec.scala b/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/appmaster/YarnAppMasterSpec.scala new file mode 100644 index 0000000..055e822 --- /dev/null +++ b/experiments/yarn/src/test/scala/org/apache/gearpump/experiments/yarn/appmaster/YarnAppMasterSpec.scala @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.experiments.yarn.appmaster + +import akka.actor.{Actor, ActorSystem, Props} +import akka.testkit.{TestActorRef, TestProbe} +import com.typesafe.config.ConfigFactory +import org.apache.gearpump.cluster.ClientToMaster.{AddWorker, CommandResult, RemoveWorker} +import org.apache.gearpump.cluster.TestUtil +import org.apache.gearpump.experiments.yarn.Constants +import org.apache.gearpump.experiments.yarn.appmaster.YarnAppMaster.{ActiveConfig, AppMasterRegistered, ClusterInfo, ContainerStarted, ContainersAllocated, GetActiveConfig, Kill, QueryClusterInfo, QueryVersion, ResourceManagerException, Version} +import org.apache.gearpump.experiments.yarn.appmaster.YarnAppMasterSpec.UI +import org.apache.gearpump.experiments.yarn.glue.Records.{Container, Resource, _} +import org.apache.gearpump.experiments.yarn.glue.{NMClient, RMClient} +import org.apache.gearpump.transport.HostPort +import org.mockito.ArgumentCaptor +import org.mockito.Matchers._ +import org.mockito.Mockito._ +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} + +import scala.concurrent.Await +import scala.concurrent.duration.Duration + +class YarnAppMasterSpec extends FlatSpec with Matchers with BeforeAndAfterAll { + + val config = ConfigFactory.parseString( + + """ + |gearpump { + | yarn { + | client { + | package -path = "/user/gearpump/gearpump.zip" + | } + | + | applicationmaster { + | ## Memory of YarnAppMaster + | command = "$JAVA_HOME/bin/java -Xmx512m" + | memory = "512" + | vcores = "1" + | queue = "default" + | } + | + | master { + | ## Memory of master daemon + | command = "$JAVA_HOME/bin/java -Xmx512m" + | memory = "512" + | vcores = "1" + | } + | + | worker { + | ## memory of worker daemon + | command = "$JAVA_HOME/bin/java -Xmx512m" + | containers = "4" + | ## This also contains all memory for child executors. + | memory = "4096" + | vcores = "1" + | } + | services { + | enabled = true + | } + | } + |} + """.stripMargin).withFallback(TestUtil.DEFAULT_CONFIG) + + val masterCount = 1 + val workerCount = config.getString(Constants.WORKER_CONTAINERS).toInt + + implicit var system: ActorSystem = null + val packagePath = "/user/gearpump/gearpump.zip" + val configPath = "/user/my/conf" + + override def beforeAll(): Unit = { + system = ActorSystem("test", config) + } + + override def afterAll(): Unit = { + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + } + + private def startAppMaster(): (TestActorRef[YarnAppMaster], TestProbe, NMClient, RMClient) = { + val rmClient = mock(classOf[RMClient]) + val nmClient = mock(classOf[NMClient]) + val ui = mock(classOf[UIFactory]) + when(ui.props(any[List[HostPort]], anyString, anyInt)).thenReturn(Props(new UI)) + + val appMaster = TestActorRef[YarnAppMaster](Props( + new YarnAppMaster(rmClient, nmClient, packagePath, configPath, ui))) + + verify(rmClient).start(appMaster) + verify(nmClient).start(appMaster) + verify(rmClient).registerAppMaster(anyString, anyInt, anyString) + + appMaster ! AppMasterRegistered + + val masterResources = ArgumentCaptor.forClass(classOf[List[Resource]]) + verify(rmClient).requestContainers(masterResources.capture()) + assert(masterResources.getValue.size == masterCount) + + val masterContainer = mock(classOf[Container]) + val mockNode = mock(classOf[NodeId]) + val mockId = mock(classOf[ContainerId]) + when(masterContainer.getNodeId).thenReturn(mockNode) + when(masterContainer.getId).thenReturn(mockId) + + // Launchs master + appMaster ! ContainersAllocated(List.fill(masterCount)(masterContainer)) + verify(nmClient, + times(masterCount)).launchCommand(any[Container], anyString, anyString, anyString) + + // Master containers started + (0 until masterCount).foreach(_ => appMaster ! ContainerStarted(mockId)) + + // Transition to start workers + val workerResources = ArgumentCaptor.forClass(classOf[List[Resource]]) + verify(rmClient, times(2)).requestContainers(workerResources.capture()) + assert(workerResources.getValue.size == workerCount) + + // Launchs workers + val workerContainer = mock(classOf[Container]) + when(workerContainer.getNodeId).thenReturn(mockNode) + val workerContainerId = ContainerId.fromString("container_1449802454214_0034_01_000006") + when(workerContainer.getId).thenReturn(workerContainerId) + appMaster ! ContainersAllocated(List.fill(workerCount)(workerContainer)) + verify(nmClient, times(workerCount + masterCount)) + .launchCommand(any[Container], anyString, anyString, anyString) + + // Worker containers started + (0 until workerCount).foreach(_ => appMaster ! ContainerStarted(mockId)) + + // Starts UI server + verify(ui, times(1)).props(any[List[HostPort]], anyString, anyInt) + + // Application Ready... + val client = TestProbe() + + // Gets active config + appMaster.tell(GetActiveConfig("client"), client.ref) + client.expectMsgType[ActiveConfig] + + // Queries version + appMaster.tell(QueryVersion, client.ref) + client.expectMsgType[Version] + + // Queries version + appMaster.tell(QueryClusterInfo, client.ref) + client.expectMsgType[ClusterInfo] + + // Adds worker + val newWorkerCount = 2 + appMaster.tell(AddWorker(newWorkerCount), client.ref) + client.expectMsgType[CommandResult] + val newWorkerResources = ArgumentCaptor.forClass(classOf[List[Resource]]) + verify(rmClient, times(3)).requestContainers(newWorkerResources.capture()) + assert(newWorkerResources.getValue.size == newWorkerCount) + + // New container allocated + appMaster ! ContainersAllocated(List.fill(newWorkerCount)(workerContainer)) + verify(nmClient, times(workerCount + masterCount + newWorkerCount)). + launchCommand(any[Container], anyString, anyString, anyString) + + // New worker containers started + (0 until newWorkerCount).foreach(_ => appMaster ! ContainerStarted(mockId)) + + // Same UI server + verify(ui, times(1)).props(any[List[HostPort]], anyString, anyInt) + + // Removes worker + appMaster.tell(RemoveWorker(workerContainerId.toString), client.ref) + client.expectMsgType[CommandResult] + verify(nmClient).stopContainer(any[ContainerId], any[NodeId]) + + (appMaster, client: TestProbe, nmClient, rmClient) + } + + it should "start master, worker and UI on YARN" in { + val env = startAppMaster() + val (appMaster, client, nmClient, rmClient) = env + + // Kills the app + appMaster.tell(Kill, client.ref) + client.expectMsgType[CommandResult] + verify(nmClient, times(1)).stop() + verify(rmClient, times(1)).shutdownApplication() + } + + it should "handle resource manager errors" in { + val env = startAppMaster() + val (appMaster, client, nmClient, rmClient) = env + + // on error + val ex = new Exception("expected resource manager exception") + appMaster.tell(ResourceManagerException(ex), client.ref) + verify(nmClient, times(1)).stop() + verify(rmClient, times(1)).failApplication(ex) + } +} + +object YarnAppMasterSpec { + + class UI extends Actor { + def receive: Receive = null + } +} \ No newline at end of file
