http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/ManageCluster.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/ManageCluster.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/ManageCluster.scala deleted file mode 100644 index 3e50abe..0000000 --- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/client/ManageCluster.scala +++ /dev/null @@ -1,156 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.experiments.yarn.client - -import java.io.{File, IOException} -import java.net.InetAddress -import scala.concurrent.{Await, Future} - -import akka.actor.{ActorRef, ActorSystem} - -import io.gearpump.cluster.ClientToMaster.{AddWorker, CommandResult, RemoveWorker} -import io.gearpump.cluster.ClusterConfig -import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} -import io.gearpump.experiments.yarn.appmaster.YarnAppMaster.{ActiveConfig, ClusterInfo, GetActiveConfig, Kill, QueryClusterInfo, QueryVersion, Version} -import io.gearpump.experiments.yarn.glue.Records.ApplicationId -import io.gearpump.experiments.yarn.glue.{YarnClient, YarnConfig} -import io.gearpump.util.ActorUtil.askActor -import io.gearpump.util.{AkkaApp, LogUtil} - -/** Manage current Gearpump cluster on YARN */ -class ManageCluster(appId: ApplicationId, appMaster: ActorRef, system: ActorSystem) { - import io.gearpump.experiments.yarn.client.ManageCluster._ - - private val host = InetAddress.getLocalHost.getHostName - implicit val dispatcher = system.dispatcher - - def getConfig: Future[ActiveConfig] = askActor[ActiveConfig](appMaster, GetActiveConfig(host)) - def version: Future[Version] = askActor[Version](appMaster, QueryVersion) - def addWorker(count: Int): Future[CommandResult] = { - askActor[CommandResult](appMaster, AddWorker(count)) - } - - def removeWorker(worker: String): Future[CommandResult] = { - askActor[CommandResult](appMaster, RemoveWorker(worker)) - } - - def shutdown: Future[CommandResult] = askActor[CommandResult](appMaster, Kill) - def queryClusterInfo: Future[ClusterInfo] = askActor[ClusterInfo](appMaster, QueryClusterInfo) - - def command(command: String, parsed: ParseResult): Future[AnyRef] = { - command match { - case GET_CONFIG => - if (parsed.exists(OUTPUT)) { - getConfig.map { conf => - ClusterConfig.saveConfig(conf.config, new File(parsed.getString(OUTPUT))) - conf - } - } else { - throw new IOException(s"Please specify -$OUTPUT option") - } - case ADD_WORKER => - val count = parsed.getString(COUNT).toInt - addWorker(count) - case REMOVE_WORKER => - val containerId = parsed.getString(CONTAINER) - if (containerId == null || containerId.isEmpty) { - throw new IOException(s"Please specify -$CONTAINER option") - } else { - removeWorker(containerId) - } - case KILL => - shutdown - case QUERY => - queryClusterInfo - case VERSION => - version - } - } -} - -object ManageCluster extends AkkaApp with ArgumentsParser { - val GET_CONFIG = "getconfig" - val ADD_WORKER = "addworker" - val REMOVE_WORKER = "removeworker" - val KILL = "kill" - val VERSION = "version" - val QUERY = "query" - val COMMAND = "command" - val CONTAINER = "container" - val OUTPUT = "output" - val COUNT = "count" - val APPID = "appid" - val VERBOSE = "verbose" - - val commands = List(GET_CONFIG, ADD_WORKER, REMOVE_WORKER, KILL, VERSION, QUERY) - - import scala.concurrent.duration._ - val TIME_OUT_SECONDS = 30.seconds - - override val options: Array[(String, CLIOption[Any])] = Array( - COMMAND -> CLIOption[String](s"<${commands.mkString("|")}>", required = true), - APPID -> CLIOption[String]("<Application id, format: application_timestamp_id>", - required = true), - COUNT -> CLIOption("<how many instance to add or remove>", required = false, - defaultValue = Some(1)), - VERBOSE -> CLIOption("<print verbose log on console>", required = false, - defaultValue = Some(false)), - OUTPUT -> CLIOption("<output path for configuration file>", required = false, - defaultValue = Some("")), - CONTAINER -> CLIOption("<container id for master or worker>", required = false, - defaultValue = Some("")) - ) - - override def main(akkaConf: Config, args: Array[String]): Unit = { - - val yarnConfig = new YarnConfig() - val yarnClient = new YarnClient(yarnConfig) - - val parsed = parse(args) - - if (parsed.getBoolean(VERBOSE)) { - LogUtil.verboseLogToConsole() - } - - val appId = parseAppId(parsed.getString(APPID)) - val system = ActorSystem("manageCluster", akkaConf) - - val appMasterResolver = new AppMasterResolver(yarnClient, system) - val appMaster = appMasterResolver.resolve(appId) - - implicit val dispatcher = system.dispatcher - val manager = new ManageCluster(appId, appMaster, system) - - val command = parsed.getString(COMMAND) - val result = manager.command(command, parsed) - - // scalastyle:off println - Console.println(Await.result(result, TIME_OUT_SECONDS)) - // scalastyle:on println - system.terminate() - Await.result(system.whenTerminated, Duration.Inf) - } - - def parseAppId(str: String): ApplicationId = { - val parts = str.split("_") - val timestamp = parts(1).toLong - val id = parts(2).toInt - ApplicationId.newInstance(timestamp, id) - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/ContainerLaunchContext.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/ContainerLaunchContext.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/ContainerLaunchContext.scala deleted file mode 100644 index 6b3385f..0000000 --- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/ContainerLaunchContext.scala +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.experiments.yarn.glue - -import java.io.File -import java.nio.ByteBuffer -import scala.collection.JavaConverters._ - -import org.apache.hadoop.fs.{FileSystem => YarnFileSystem, Path} -import org.apache.hadoop.io.DataOutputBuffer -import org.apache.hadoop.mapreduce.security.TokenCache -import org.apache.hadoop.security.UserGroupInformation -import org.apache.hadoop.yarn.api.ApplicationConstants.Environment -import org.apache.hadoop.yarn.api.records._ -import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.hadoop.yarn.util.ConverterUtils -import org.slf4j.Logger - -import io.gearpump.util.LogUtil - -private[glue] -object ContainerLaunchContext { - private val LOG: Logger = LogUtil.getLogger(getClass) - - def apply(yarnConf: YarnConfiguration, command: String, packagePath: String, configPath: String) - : ContainerLaunchContext = { - val context = Records.newRecord(classOf[ContainerLaunchContext]) - context.setCommands(Seq(command).asJava) - context.setEnvironment(getAppEnv(yarnConf).asJava) - context.setTokens(getToken(yarnConf, packagePath, configPath)) - context.setLocalResources(getAMLocalResourcesMap(yarnConf, packagePath, configPath).asJava) - context - } - - private def getFs(yarnConf: YarnConfiguration) = YarnFileSystem.get(yarnConf) - - private def getAppEnv(yarnConf: YarnConfiguration): Map[String, String] = { - val classPaths = yarnConf.getStrings( - YarnConfiguration.YARN_APPLICATION_CLASSPATH, - YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH.mkString(File.pathSeparator)) - val allPaths = Option(classPaths).getOrElse(Array("")) - - allPaths :+ Environment.PWD.$() + File.separator + "*" + File.pathSeparator - - Map(Environment.CLASSPATH.name -> allPaths.map(_.trim).mkString(File.pathSeparator)) - } - - private def getAMLocalResourcesMap( - yarnConf: YarnConfiguration, packagePath: String, configPath: String) - : Map[String, LocalResource] = { - val fs = getFs(yarnConf) - - Map( - "pack" -> newYarnAppResource(fs, new Path(packagePath), - LocalResourceType.ARCHIVE, LocalResourceVisibility.APPLICATION), - "conf" -> newYarnAppResource(fs, new Path(configPath), - LocalResourceType.FILE, LocalResourceVisibility.APPLICATION)) - } - - private def newYarnAppResource( - fs: YarnFileSystem, path: Path, - resourceType: LocalResourceType, vis: LocalResourceVisibility): LocalResource = { - val qualified = fs.makeQualified(path) - val status = fs.getFileStatus(qualified) - val resource = Records.newRecord(classOf[LocalResource]) - resource.setType(resourceType) - resource.setVisibility(vis) - resource.setResource(ConverterUtils.getYarnUrlFromPath(qualified)) - resource.setTimestamp(status.getModificationTime) - resource.setSize(status.getLen) - resource - } - - private def getToken(yc: YarnConfiguration, packagePath: String, configPath: String) - : ByteBuffer = { - val credentials = UserGroupInformation.getCurrentUser.getCredentials - val dob = new DataOutputBuffer - val dirs = Array(new Path(packagePath), new Path(configPath)) - TokenCache.obtainTokensForNamenodes(credentials, dirs, yc) - credentials.writeTokenStorageToStream(dob) - ByteBuffer.wrap(dob.getData) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/FileSystem.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/FileSystem.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/FileSystem.scala deleted file mode 100644 index acf09ac..0000000 --- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/FileSystem.scala +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.experiments.yarn.glue - -import java.io.{InputStream, OutputStream} -import java.net.ConnectException -import scala.util.{Failure, Success, Try} - -import org.apache.hadoop.fs.Path - -import io.gearpump.util.LogUtil - -class FileSystem(yarnConfig: YarnConfig) { - - private val conf = yarnConfig.conf - private val fs = org.apache.hadoop.fs.FileSystem.get(conf) - - private def LOG = LogUtil.getLogger(getClass) - - def open(file: String): InputStream = exceptionHandler { - val path = new Path(file) - fs.open(path) - } - - def create(file: String): OutputStream = exceptionHandler { - val path = new Path(file) - fs.create(path) - } - - def exists(file: String): Boolean = exceptionHandler { - val path = new Path(file) - fs.exists(path) - } - - def name: String = { - fs.getUri.toString - } - - def getHomeDirectory: String = { - fs.getHomeDirectory.toString - } - - private def exceptionHandler[T](call: => T): T = { - val callTry = Try(call) - callTry match { - case Success(v) => v - case Failure(ex) => - if (ex.isInstanceOf[ConnectException]) { - LOG.error("Please check whether we connect to the right HDFS file system, " + - "current file system is $name." + "\n. Please copy all configs under " + - "$HADOOP_HOME/etc/hadoop into conf/yarnconf directory of Gearpump package, " + - "so that we can use the right File system.", ex) - } - throw ex - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/NMClient.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/NMClient.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/NMClient.scala deleted file mode 100644 index 3e7e668..0000000 --- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/NMClient.scala +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.experiments.yarn.glue - -import java.nio.ByteBuffer - -import akka.actor.ActorRef -import com.typesafe.config.Config -import org.apache.hadoop.yarn.api.records.{ApplicationId => YarnApplicationId, ApplicationReport => YarnApplicationReport, Container => YarnContainer, ContainerId => YarnContainerId, ContainerStatus => YarnContainerStatus, NodeId => YarnNodeId, Resource => YarnResource} -import org.apache.hadoop.yarn.client.api.async.NMClientAsync -import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl - -import io.gearpump.experiments.yarn.appmaster.YarnAppMaster.ContainerStarted -import io.gearpump.experiments.yarn.glue.Records._ -import io.gearpump.util.LogUtil -/** - * Adapter for node manager client - */ -class NMClient(yarnConf: YarnConfig, config: Config) extends NMClientAsync.CallbackHandler { - - private val LOG = LogUtil.getLogger(getClass) - - private var reportTo: ActorRef = null - private var client: NMClientAsyncImpl = null - - def start(reportTo: ActorRef): Unit = { - LOG.info("Starting Node Manager Client NMClient...") - this.reportTo = reportTo - client = new NMClientAsyncImpl(this) - client.init(yarnConf.conf) - client.start() - } - - private[glue] - override def onContainerStarted( - containerId: YarnContainerId, allServiceResponse: java.util.Map[String, ByteBuffer]) { - LOG.info(s"Container started : $containerId, " + allServiceResponse) - reportTo ! ContainerStarted(containerId) - } - - private[glue] - override def onContainerStatusReceived( - containerId: YarnContainerId, containerStatus: YarnContainerStatus) { - LOG.info(s"Container status received : $containerId, status $containerStatus") - } - - private[glue] - override def onContainerStopped(containerId: YarnContainerId) { - LOG.error(s"Container stopped : $containerId") - } - - private[glue] - override def onGetContainerStatusError(containerId: YarnContainerId, throwable: Throwable) { - LOG.error(s"Container exception : $containerId", throwable) - } - - private[glue] - override def onStartContainerError(containerId: YarnContainerId, throwable: Throwable) { - LOG.error(s"Container exception : $containerId", throwable) - } - - private[glue] - override def onStopContainerError(containerId: YarnContainerId, throwable: Throwable) { - LOG.error(s"Container exception : $containerId", throwable) - } - - def launchCommand( - container: Container, command: String, packagePath: String, configPath: String): Unit = { - LOG.info(s"Launching command : $command on container" + - s": ${container.getId}, host ip : ${container.getNodeId.getHost}") - val context = ContainerLaunchContext(yarnConf.conf, command, packagePath, configPath) - client.startContainerAsync(container, context) - } - - def stopContainer(containerId: ContainerId, nodeId: NodeId): Unit = { - LOG.info(s"Stop container ${containerId.toString} on node: ${nodeId.toString} ") - client.stopContainerAsync(containerId, nodeId) - } - - def stop(): Unit = { - LOG.info(s"Shutdown NMClient") - client.stop() - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/RMClient.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/RMClient.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/RMClient.scala deleted file mode 100644 index 7b9d83c..0000000 --- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/RMClient.scala +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.experiments.yarn.glue - -import scala.collection.JavaConverters._ - -import akka.actor.ActorRef -import org.apache.hadoop.yarn.api.records.{ApplicationId => YarnApplicationId, ApplicationReport => YarnApplicationReport, Container => YarnContainer, ContainerId => YarnContainerId, ContainerStatus => YarnContainerStatus, FinalApplicationStatus, NodeId => YarnNodeId, NodeReport, Priority, Resource => YarnResource} -import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest -import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync -import org.slf4j.Logger - -import io.gearpump.experiments.yarn.appmaster.YarnAppMaster.{AppMasterRegistered, ContainersAllocated, ContainersCompleted, ResourceManagerException, ShutdownApplication} -import io.gearpump.experiments.yarn.glue.Records._ -import io.gearpump.util.LogUtil - -/** - * Adapter for resource manager client - */ -class RMClient(yarnConf: YarnConfig) extends AMRMClientAsync.CallbackHandler { - - private val LOG: Logger = LogUtil.getLogger(getClass) - - private var reportTo: ActorRef = null - private var client: AMRMClientAsync[ContainerRequest] = null - - def start(reportTo: ActorRef): Unit = { - LOG.info("Starting Resource Manager Client RMClient...") - this.reportTo = reportTo - client = startAMRMClient - } - - private def startAMRMClient: AMRMClientAsync[ContainerRequest] = { - val timeIntervalMs = 1000 // ms - val amrmClient = AMRMClientAsync.createAMRMClientAsync[ContainerRequest](timeIntervalMs, this) - amrmClient.init(yarnConf.conf) - amrmClient.start() - amrmClient - } - - override def getProgress: Float = 0.5F - - private var allocatedContainers = Set.empty[YarnContainerId] - - private[glue] - override def onContainersAllocated(containers: java.util.List[YarnContainer]) { - val newContainers = containers.asScala.toList.filterNot(container => - allocatedContainers.contains(container.getId)) - allocatedContainers ++= newContainers.map(_.getId) - LOG.info(s"New allocated ${newContainers.size} containers") - reportTo ! ContainersAllocated(newContainers.map(yarnContainerToContainer(_))) - } - - private[glue] - override def onContainersCompleted(completedContainers: java.util.List[YarnContainerStatus]) - : Unit = { - LOG.info(s"Got response from RM. Completed containers=${completedContainers.size()}") - reportTo ! ContainersCompleted( - completedContainers.asScala.toList.map(yarnContainerStatusToContainerStatus(_))) - } - - private[glue] - override def onError(ex: Throwable) { - LOG.info("Error occurred") - reportTo ! ResourceManagerException(ex) - } - - private[glue] - override def onNodesUpdated(updatedNodes: java.util.List[NodeReport]): Unit = { - LOG.info("onNodesUpdates") - } - - private[glue] - override def onShutdownRequest() { - LOG.info("Shutdown requested") - reportTo ! ShutdownApplication - } - - def requestContainers(containers: List[Resource]): Unit = { - LOG.info(s"request Resource, slots: ${containers.length}, ${containers.mkString("\n")}") - containers.foreach(resource => { - client.addContainerRequest(createContainerRequest(resource)) - }) - } - - private def createContainerRequest(capability: Resource): ContainerRequest = { - LOG.info("creating ContainerRequest") - val priorityRecord = org.apache.hadoop.yarn.util.Records.newRecord(classOf[Priority]) - priorityRecord.setPriority(0) - val priority = Priority.newInstance(0) - new ContainerRequest(capability, null, null, priority) - } - - def registerAppMaster(host: String, port: Int, url: String): Unit = { - LOG.info(s"Received RegisterAMMessage! $host:$port $url") - val response = client.registerApplicationMaster(host, port, url) - LOG.info("Received RegisterAppMasterResponse ") - reportTo ! AppMasterRegistered - } - - def shutdownApplication(): Unit = { - LOG.info(s"Shutdown application") - client.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, "success", null) - client.stop() - } - - def failApplication(ex: Throwable): Unit = { - LOG.error(s"Application failed! ", ex) - client.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, ex.getMessage, null) - client.stop() - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/Records.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/Records.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/Records.scala deleted file mode 100644 index 7b863fa..0000000 --- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/Records.scala +++ /dev/null @@ -1,219 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.experiments.yarn.glue - -import scala.language.implicitConversions - -import org.apache.hadoop.yarn.api.records.{ApplicationId => YarnApplicationId, ApplicationReport => YarnApplicationReport, ApplicationSubmissionContext, Container => YarnContainer, ContainerId => YarnContainerId, ContainerStatus => YarnContainerStatus, NodeId => YarnNodeId, Resource => YarnResource, YarnApplicationState} -import org.apache.hadoop.yarn.util.{Records => YarnRecords} - -object Records { - def newRecord[T](clazz: Class[T]): T = YarnRecords.newRecord(clazz) - - def newAppSubmissionContext: ApplicationSubmissionContext = { - YarnRecords.newRecord(classOf[ApplicationSubmissionContext]) - } - - class ApplicationId(private[glue] val impl: YarnApplicationId) { - def getId: Int = impl.getId - - override def toString: String = impl.toString - - override def equals(other: Any): Boolean = { - if (other.isInstanceOf[ApplicationId]) { - impl.equals(other.asInstanceOf[ApplicationId].impl) - } else { - false - } - } - - override def hashCode(): Int = { - impl.hashCode() - } - } - - object ApplicationId { - def newInstance(timestamp: Long, id: Int): ApplicationId = { - YarnApplicationId.newInstance(timestamp, id) - } - } - - class ApplicationReport(private[glue] val impl: YarnApplicationReport) { - def getApplicationId: ApplicationId = impl.getApplicationId - - def getDiagnostics: String = impl.getDiagnostics - - def getFinishTime: Long = impl.getFinishTime - - def getOriginalTrackingUrl: String = impl.getOriginalTrackingUrl - - def getYarnApplicationState: YarnApplicationState = impl.getYarnApplicationState - - override def toString: String = impl.toString - } - class Resource(private[glue] val impl: YarnResource) { - override def toString: String = impl.toString - - override def equals(other: Any): Boolean = { - if (other.isInstanceOf[Resource]) { - impl.equals(other.asInstanceOf[Resource].impl) - } else { - false - } - } - - override def hashCode(): Int = { - impl.hashCode() - } - } - - object Resource { - def newInstance(memory: Int, vCores: Int): Resource = { - YarnResource.newInstance(memory, vCores) - } - } - - class Container(private[glue] val impl: YarnContainer) { - def getId: ContainerId = impl.getId - - def getNodeHttpAddress: String = impl.getNodeHttpAddress - - def getNodeId: NodeId = impl.getNodeId - - override def toString: String = impl.toString - - override def equals(other: Any): Boolean = { - if (other.isInstanceOf[Container]) { - impl.equals(other.asInstanceOf[Container].impl) - } else { - false - } - } - - override def hashCode(): Int = { - impl.hashCode() - } - } - - class ContainerId(private[glue] val impl: YarnContainerId) { - override def toString: String = impl.toString - - override def equals(other: Any): Boolean = { - if (other.isInstanceOf[ContainerId]) { - impl.equals(other.asInstanceOf[ContainerId].impl) - } else { - false - } - } - - override def hashCode(): Int = { - impl.hashCode() - } - } - - object ContainerId { - def fromString(worker: String): ContainerId = YarnContainerId.fromString(worker) - } - - class NodeId(private[glue] val impl: YarnNodeId) { - def getHost: String = impl.getHost - - override def toString: String = impl.toString - - override def equals(other: Any): Boolean = { - if (other.isInstanceOf[NodeId]) { - impl.equals(other.asInstanceOf[NodeId].impl) - } else { - false - } - } - - override def hashCode(): Int = { - impl.hashCode() - } - } - - class ContainerStatus(private[glue] val impl: YarnContainerStatus) { - def getDiagnostics: String = impl.getDiagnostics - - def getContainerId: ContainerId = impl.getContainerId - - def getExitStatus: Int = impl.getExitStatus - - override def toString: String = impl.toString - } - - private[glue] implicit def yarnResourceToResource(res: YarnResource): Resource = { - new Resource(res) - } - - private[glue] implicit def resourceToYarnResource(res: Resource): YarnResource = { - res.impl - } - - private[glue] implicit def yarnAppIdToAppId(yarn: YarnApplicationId): ApplicationId = { - new ApplicationId(yarn) - } - - private[glue] implicit def appIdToYarnAppId(app: ApplicationId): YarnApplicationId = { - app.impl - } - - private[glue] implicit def yarnReportToReport(yarn: YarnApplicationReport): ApplicationReport = { - new ApplicationReport(yarn) - } - - private[glue] implicit def reportToYarnReport(app: ApplicationReport): YarnApplicationReport = { - app.impl - } - - private[glue] implicit def yarnContainerToContainer(yarn: YarnContainer): Container = { - new Container(yarn) - } - - private[glue] implicit def containerToYarnContainer(app: Container): YarnContainer = { - app.impl - } - - private[glue] implicit def yarnContainerStatusToContainerStatus(yarn: YarnContainerStatus) - : ContainerStatus = { - new ContainerStatus(yarn) - } - - private[glue] implicit def containerStatusToYarnContainerStatus(app: ContainerStatus) - : YarnContainerStatus = { - app.impl - } - - private[glue] implicit def containerIdToYarnContainerId(app: ContainerId): YarnContainerId = { - app.impl - } - - private[glue] implicit def yarnContainerIdToContainerId(yarn: YarnContainerId): ContainerId = { - new ContainerId(yarn) - } - - private[glue] implicit def nodeIdToYarnNodeId(app: NodeId): YarnNodeId = { - app.impl - } - - private[glue] implicit def yarnNodeIdToNodeId(yarn: YarnNodeId): NodeId = { - new NodeId(yarn) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/YarnClient.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/YarnClient.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/YarnClient.scala deleted file mode 100644 index db7d5d7..0000000 --- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/YarnClient.scala +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.experiments.yarn.glue - -import org.apache.hadoop.yarn.api.records.YarnApplicationState -import org.apache.hadoop.yarn.client.api - -import io.gearpump.experiments.yarn.glue.Records._ -import io.gearpump.util.LogUtil - -/** - * Adapter for api.YarnClient - */ -class YarnClient(yarn: YarnConfig) { - - val LOG = LogUtil.getLogger(getClass) - - private val client: api.YarnClient = api.YarnClient.createYarnClient - client.init(yarn.conf) - client.start() - LOG.info("Starting YarnClient...") - - def createApplication: ApplicationId = { - val app = client.createApplication() - val response = app.getNewApplicationResponse() - LOG.info("Create application, appId: " + response.getApplicationId()) - response.getApplicationId() - } - - def getApplicationReport(appId: ApplicationId): ApplicationReport = { - client.getApplicationReport(appId) - } - - def submit( - name: String, appId: ApplicationId, command: String, resource: Resource, queue: String, - packagePath: String, configPath: String): ApplicationId = { - - val appContext = Records.newAppSubmissionContext - appContext.setApplicationName(name) - appContext.setApplicationId(appId) - - val containerContext = ContainerLaunchContext(yarn.conf, command, packagePath, configPath) - appContext.setAMContainerSpec(containerContext) - appContext.setResource(resource) - appContext.setQueue(queue) - - LOG.info(s"Submit Application $appId to YARN...") - client.submitApplication(appContext) - } - - def awaitApplication(appId: ApplicationId, timeoutMilliseconds: Long = Long.MaxValue) - : ApplicationReport = { - import org.apache.hadoop.yarn.api.records.YarnApplicationState._ - val terminated = Set(FINISHED, KILLED, FAILED, RUNNING) - var result: ApplicationReport = null - var done = false - - val start = System.currentTimeMillis() - def timeout: Boolean = { - val now = System.currentTimeMillis() - if (now - start > timeoutMilliseconds) { - true - } else { - false - } - } - - while (!done && !timeout) { - val report = client.getApplicationReport(appId) - val status = report.getYarnApplicationState - if (terminated.contains(status)) { - done = true - result = report - } else { - Console.print(".") - Thread.sleep(1000) - } - } - - if (timeout) { - throw new Exception(s"Launch Application $appId timeout...") - } - result - } - - def stop(): Unit = { - client.stop() - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/YarnConfig.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/YarnConfig.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/YarnConfig.scala deleted file mode 100644 index 87f199a..0000000 --- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/YarnConfig.scala +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.experiments.yarn.glue - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.yarn.conf.YarnConfiguration - -/** - * wrapper for yarn configuration - */ -case class YarnConfig(conf: YarnConfiguration = new YarnConfiguration(new Configuration(true))) { - def writeXml(out: java.io.Writer): Unit = conf.writeXml(out) - - def resourceManager: String = { - conf.get("yarn.resourcemanager.hostname") - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/package.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/package.scala b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/package.scala deleted file mode 100644 index 24adbaa..0000000 --- a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/package.scala +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.experiments.yarn - - -/** - * YARN facade to decouple Gearpump with YARN. - */ -package object glue { -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/Constants.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/Constants.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/Constants.scala new file mode 100644 index 0000000..33c3e97 --- /dev/null +++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/Constants.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.experiments.yarn + +object Constants { + val APPMASTER_NAME = "gearpump.yarn.applicationmaster.name" + val APPMASTER_COMMAND = "gearpump.yarn.applicationmaster.command" + val APPMASTER_MEMORY = "gearpump.yarn.applicationmaster.memory" + val APPMASTER_VCORES = "gearpump.yarn.applicationmaster.vcores" + val APPMASTER_QUEUE = "gearpump.yarn.applicationmaster.queue" + + val PACKAGE_PATH = "gearpump.yarn.client.package-path" + val CONFIG_PATH = "gearpump.yarn.client.config-path" + + val MASTER_COMMAND = "gearpump.yarn.master.command" + val MASTER_MEMORY = "gearpump.yarn.master.memory" + val MASTER_VCORES = "gearpump.yarn.master.vcores" + + val WORKER_COMMAND = "gearpump.yarn.worker.command" + val WORKER_CONTAINERS = "gearpump.yarn.worker.containers" + val WORKER_MEMORY = "gearpump.yarn.worker.memory" + val WORKER_VCORES = "gearpump.yarn.worker.vcores" + + val SERVICES_ENABLED = "gearpump.yarn.services.enabled" + + val LOCAL_DIRS = org.apache.hadoop.yarn.api.ApplicationConstants.Environment.LOCAL_DIRS.$$() + val CONTAINER_ID = org.apache.hadoop.yarn.api.ApplicationConstants.Environment.CONTAINER_ID.$$() + val LOG_DIR_EXPANSION_VAR = org.apache.hadoop.yarn.api.ApplicationConstants.LOG_DIR_EXPANSION_VAR + val NODEMANAGER_HOST = org.apache.hadoop.yarn.api.ApplicationConstants.Environment.NM_HOST.$$() +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/Command.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/Command.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/Command.scala new file mode 100644 index 0000000..711506a --- /dev/null +++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/Command.scala @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.experiments.yarn.appmaster + +import com.typesafe.config.Config + +import org.apache.gearpump.cluster.main.{Master, Worker} +import org.apache.gearpump.experiments.yarn.Constants._ +import org.apache.gearpump.transport.HostPort +import org.apache.gearpump.util.Constants + +/** Command to start a YARN container */ +trait Command { + def get: String + override def toString: String = get +} + +abstract class AbstractCommand extends Command { + protected def config: Config + def version: String + def classPath: Array[String] = { + Array( + s"conf", + s"pack/$version/conf", + s"pack/$version/lib/daemon/*", + s"pack/$version/lib/*" + ) + } + + protected def buildCommand( + java: String, properties: Array[String], mainClazz: String, cliOpts: Array[String]) + : String = { + val exe = config.getString(java) + + s"$exe -cp ${classPath.mkString(":")}:" + + "$CLASSPATH " + properties.mkString(" ") + + s" $mainClazz ${cliOpts.mkString(" ")} 2>&1 | /usr/bin/tee -a ${LOG_DIR_EXPANSION_VAR}/stderr" + } + + protected def clazz(any: AnyRef): String = { + val name = any.getClass.getName + if (name.endsWith("$")) { + name.dropRight(1) + } else { + name + } + } +} + +case class MasterCommand(config: Config, version: String, masterAddr: HostPort) + extends AbstractCommand { + + def get: String = { + val masterArguments = Array(s"-ip ${masterAddr.host}", s"-port ${masterAddr.port}") + + val properties = Array( + s"-D${Constants.GEARPUMP_CLUSTER_MASTERS}.0=${masterAddr.host}:${masterAddr.port}", + s"-D${Constants.GEARPUMP_HOSTNAME}=${masterAddr.host}", + s"-D${Constants.GEARPUMP_MASTER_RESOURCE_MANAGER_CONTAINER_ID}=${CONTAINER_ID}", + s"-D${Constants.GEARPUMP_HOME}=${LOCAL_DIRS}/${CONTAINER_ID}/pack/$version", + s"-D${Constants.GEARPUMP_LOG_DAEMON_DIR}=${LOG_DIR_EXPANSION_VAR}", + s"-D${Constants.GEARPUMP_LOG_APPLICATION_DIR}=${LOG_DIR_EXPANSION_VAR}") + + buildCommand(MASTER_COMMAND, properties, clazz(Master), masterArguments) + } +} + +case class WorkerCommand(config: Config, version: String, masterAddr: HostPort, workerHost: String) + extends AbstractCommand { + + def get: String = { + val properties = Array( + s"-D${Constants.GEARPUMP_CLUSTER_MASTERS}.0=${masterAddr.host}:${masterAddr.port}", + s"-D${Constants.GEARPUMP_LOG_DAEMON_DIR}=${LOG_DIR_EXPANSION_VAR}", + s"-D${Constants.GEARPUMP_WORKER_RESOURCE_MANAGER_CONTAINER_ID}=${CONTAINER_ID}", + s"-D${Constants.GEARPUMP_HOME}=${LOCAL_DIRS}/${CONTAINER_ID}/pack/$version", + s"-D${Constants.GEARPUMP_LOG_APPLICATION_DIR}=${LOG_DIR_EXPANSION_VAR}", + s"-D${Constants.GEARPUMP_HOSTNAME}=$workerHost") + + buildCommand(WORKER_COMMAND, properties, clazz(Worker), Array.empty[String]) + } +} + +case class AppMasterCommand(config: Config, version: String, args: Array[String]) + extends AbstractCommand { + + override val classPath = Array( + "conf", + s"pack/$version/conf", + s"pack/$version/dashboard", + s"pack/$version/lib/*", + s"pack/$version/lib/daemon/*", + s"pack/$version/lib/services/*", + s"pack/$version/lib/yarn/*" + ) + + def get: String = { + val properties = Array( + s"-D${Constants.GEARPUMP_HOME}=${LOCAL_DIRS}/${CONTAINER_ID}/pack/$version", + s"-D${Constants.GEARPUMP_FULL_SCALA_VERSION}=$version", + s"-D${Constants.GEARPUMP_LOG_DAEMON_DIR}=${LOG_DIR_EXPANSION_VAR}", + s"-D${Constants.GEARPUMP_LOG_APPLICATION_DIR}=${LOG_DIR_EXPANSION_VAR}", + s"-D${Constants.GEARPUMP_HOSTNAME}=${NODEMANAGER_HOST}") + + val arguments = Array(s"") ++ args + + buildCommand(APPMASTER_COMMAND, properties, clazz(YarnAppMaster), + arguments) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/UIService.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/UIService.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/UIService.scala new file mode 100644 index 0000000..1194f0b --- /dev/null +++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/UIService.scala @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.experiments.yarn.appmaster + +import akka.actor._ +import com.typesafe.config.{ConfigFactory, ConfigValueFactory} +import org.apache.gearpump.cluster.ClusterConfig +import org.apache.gearpump.services.main.Services +import org.apache.gearpump.transport.HostPort +import org.apache.gearpump.util.{ActorUtil, Constants, LogUtil} + +import scala.concurrent.Future + +trait UIFactory { + def props(masters: List[HostPort], host: String, port: Int): Props +} + +/** Wrapper of UI server */ +class UIService(masters: List[HostPort], host: String, port: Int) extends Actor { + private val LOG = LogUtil.getLogger(getClass) + + private val supervisor = ActorUtil.getFullPath(context.system, context.parent.path) + private var configFile: java.io.File = null + + private implicit val dispatcher = context.dispatcher + + override def postStop(): Unit = { + if (configFile != null) { + configFile.delete() + configFile = null + } + + // TODO: fix this + // Hack around to Kill the UI server + Services.kill() + } + + override def preStart(): Unit = { + Future(start()) + } + + def start(): Unit = { + val mastersArg = masters.mkString(",") + LOG.info(s"Launching services -master $mastersArg") + + configFile = java.io.File.createTempFile("uiserver", ".conf") + + val config = context.system.settings.config. + withValue(Constants.GEARPUMP_SERVICE_HOST, ConfigValueFactory.fromAnyRef(host)). + withValue(Constants.GEARPUMP_SERVICE_HTTP, ConfigValueFactory.fromAnyRef(port.toString)). + withValue(Constants.NETTY_TCP_HOSTNAME, ConfigValueFactory.fromAnyRef(host)) + + ClusterConfig.saveConfig(config, configFile) + + val master = masters.head + + ConfigFactory.invalidateCaches() + launch(supervisor, master.host, master.port, configFile.toString) + } + + // Launch the UI server + def launch(supervisor: String, masterHost: String, masterPort: Int, configFile: String): Unit = { + Services.main(Array("-supervisor", supervisor, "-master", s"$masterHost:$masterPort" + , "-conf", configFile)) + } + + override def receive: Receive = { + case _ => + LOG.error(s"Unknown message received") + } +} + +object UIService extends UIFactory { + override def props(masters: List[HostPort], host: String, port: Int): Props = { + Props(new UIService(masters, host, port)) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala new file mode 100644 index 0000000..97577fb --- /dev/null +++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala @@ -0,0 +1,385 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.experiments.yarn.appmaster + +import java.io.IOException +import java.util.concurrent.TimeUnit + +import akka.actor._ +import akka.util.Timeout +import com.typesafe.config.ConfigValueFactory +import org.apache.commons.httpclient.HttpClient +import org.apache.commons.httpclient.methods.GetMethod +import org.apache.gearpump.cluster.ClientToMaster._ +import org.apache.gearpump.cluster.ClusterConfig +import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption} +import org.apache.gearpump.experiments.yarn.Constants._ +import org.apache.gearpump.experiments.yarn.glue.Records._ +import org.apache.gearpump.experiments.yarn.glue.{NMClient, RMClient, YarnConfig} +import org.apache.gearpump.transport.HostPort +import org.apache.gearpump.util._ +import org.slf4j.Logger + +import scala.collection.JavaConverters._ +import scala.concurrent.Await +import scala.concurrent.duration.Duration + +/** + * YARN AppMaster. YARN AppMaster is responsible to start Gearpump masters, workers, UI server as + * YARN containers. + * + * NOTE: It is different with Gearpump AppMaster. Gearpump AppMaster is a sub-process of worker. + */ +class YarnAppMaster(rmClient: RMClient, nmClient: NMClient, + packagePath: String, hdfsConfDir: String, + uiFactory: UIFactory) + extends Actor { + + private val LOG: Logger = LogUtil.getLogger(getClass) + private val akkaConf = context.system.settings.config + private val servicesEnabled = akkaConf.getString(SERVICES_ENABLED).toBoolean + private var uiStarted = false + private val host = akkaConf.getString(Constants.GEARPUMP_HOSTNAME) + + private val port = Util.findFreePort().get + + private val trackingURL = "http://" + host + ":" + port + + // TODO: for now, only one master is supported. + private val masterCount = 1 + private val masterMemory = akkaConf.getString(MASTER_MEMORY).toInt + private val masterVCores = akkaConf.getString(MASTER_VCORES).toInt + + private var workerCount = akkaConf.getString(WORKER_CONTAINERS).toInt + private val workerMemory = akkaConf.getString(WORKER_MEMORY).toInt + private val workerVCores = akkaConf.getString(WORKER_VCORES).toInt + + val rootPath = System.getProperty(Constants.GEARPUMP_FULL_SCALA_VERSION) + + rmClient.start(self) + nmClient.start(self) + + def receive: Receive = null + + private def registerAppMaster(): Unit = { + val target = host + ":" + port + rmClient.registerAppMaster(host, port, trackingURL) + } + + registerAppMaster + context.become(waitForAppMasterRegistered) + + import org.apache.gearpump.experiments.yarn.appmaster.YarnAppMaster._ + + def waitForAppMasterRegistered: Receive = { + case AppMasterRegistered => + LOG.info("YarnAppMaster registration completed") + requestMasterContainers(masterCount) + context.become(startingMasters(remain = masterCount, List.empty[MasterInfo])) + } + + private def startingMasters(remain: Int, masters: List[MasterInfo]): Receive = box { + case ContainersAllocated(containers) => + LOG.info(s"ContainersAllocated: containers allocated for master(remain=$remain), count: " + + containers.size) + val count = Math.min(containers.length, remain) + val newMasters = (0 until count).toList.map { index => + val container = containers(index) + MasterInfo(container.getId, container.getNodeId, launchMaster(container)) + } + + // Stops un-used containers + containers.drop(count).map { container => + nmClient.stopContainer(container.getId, container.getNodeId) + } + + context.become(startingMasters(remain, newMasters ++ masters)) + case ContainerStarted(containerId) => + LOG.info(s"ContainerStarted: container ${containerId} started for master(remain=$remain) ") + if (remain > 1) { + context.become(startingMasters(remain - 1, masters)) + } else { + requestWorkerContainers(workerCount) + context.become(startingWorkers(workerCount, masters, List.empty[WorkerInfo])) + } + } + + private def box(receive: Receive): Receive = { + onError orElse receive orElse unHandled + } + + private def startingWorkers(remain: Int, masters: List[MasterInfo], workers: List[WorkerInfo]) + : Receive = { + box { + case ContainersAllocated(containers) => + LOG.info(s"ContainersAllocated: containers allocated for workers(remain=$remain), " + + s"count: " + containers.size) + + val count = Math.min(containers.length, remain) + val newWorkers = (0 until count).toList.map { index => + val container = containers(index) + launchWorker(container, masters) + WorkerInfo(container.getId, container.getNodeId) + } + + // Stops un-used containers + containers.drop(count).map { container => + nmClient.stopContainer(container.getId, container.getNodeId) + } + context.become(startingWorkers(remain, masters, workers ++ newWorkers)) + case ContainerStarted(containerId) => + LOG.info(s"ContainerStarted: container $containerId started for worker(remain=$remain)") + // The last one + if (remain > 1) { + context.become(startingWorkers(remain - 1, masters, workers)) + } else { + if (servicesEnabled && !uiStarted) { + context.actorOf(uiFactory.props(masters.map(_.host), host, port)) + uiStarted = true + } + context.become(service(effectiveConfig(masters.map(_.host)), masters, workers)) + } + } + } + + private def effectiveConfig(masters: List[HostPort]): Config = { + val masterList = masters.map(pair => s"${pair.host}:${pair.port}") + val config = context.system.settings.config + config.withValue(Constants.GEARPUMP_CLUSTER_MASTERS, + ConfigValueFactory.fromIterable(masterList.asJava)) + } + + private def onError: Receive = { + case ContainersCompleted(containers) => + // TODO: we should recover the failed container from this... + containers.foreach { status => + if (status.getExitStatus != 0) { + LOG.error(s"ContainersCompleted: container ${status.getContainerId}" + + s" failed with exit code ${status.getExitStatus}, msg: ${status.getDiagnostics}") + } else { + LOG.info(s"ContainersCompleted: container ${status.getContainerId} completed") + } + } + case ShutdownApplication => + LOG.error("ShutdownApplication") + nmClient.stop() + rmClient.shutdownApplication() + context.stop(self) + case ResourceManagerException(ex) => + LOG.error("ResourceManagerException: " + ex.getMessage, ex) + nmClient.stop() + rmClient.failApplication(ex) + context.stop(self) + case Kill => + LOG.info("Kill: User asked to shutdown the application") + sender ! CommandResult(success = true) + self ! ShutdownApplication + } + + private def service(config: Config, masters: List[MasterInfo], workers: List[WorkerInfo]) + : Receive = box { + case GetActiveConfig(clientHost) => + LOG.info("GetActiveConfig: Get active configuration for client: " + clientHost) + val filtered = ClusterConfig.filterOutDefaultConfig( + config.withValue(Constants.GEARPUMP_HOSTNAME, + ConfigValueFactory.fromAnyRef(clientHost))) + sender ! ActiveConfig(filtered) + case QueryVersion => + LOG.info("QueryVersion") + sender ! Version(Util.version) + case QueryClusterInfo => + LOG.info("QueryClusterInfo") + val masterContainers = masters.map { master => + master.id.toString + s"(${master.nodeId.toString})" + } + + val workerContainers = workers.map { worker => + worker.id.toString + s"(${worker.nodeId.toString})" + } + sender ! ClusterInfo(masterContainers, workerContainers) + case AddMaster => + sender ! CommandResult(success = false, "Not Implemented") + case RemoveMaster(masterId) => + sender ! CommandResult(success = false, "Not Implemented") + case AddWorker(count) => + if (count == 0) { + sender ! CommandResult(success = true) + } else { + LOG.info("AddWorker: Start to add new workers, count: " + count) + workerCount += count + requestWorkerContainers(count) + context.become(startingWorkers(count, masters, workers)) + sender ! CommandResult(success = true) + } + case RemoveWorker(worker) => + val workerId = ContainerId.fromString(worker) + LOG.info(s"RemoveWorker: remove worker $workerId") + val info = workers.find(_.id.toString == workerId.toString) + if (info.isDefined) { + nmClient.stopContainer(info.get.id, info.get.nodeId) + sender ! CommandResult(success = true) + val remainWorkers = workers.filter(_.id != info.get.id) + context.become(service(config, masters, remainWorkers)) + } else { + sender ! CommandResult(success = false, "failed to find worker " + worker) + } + } + + private def unHandled: Receive = { + case other => + LOG.info(s"Received unknown message $other") + } + + private def requestMasterContainers(masters: Int) = { + LOG.info(s"Request resource for masters($masters)") + val containers = (1 to masters).map( + i => Resource.newInstance(masterMemory, masterVCores) + ).toList + rmClient.requestContainers(containers) + } + + private def launchMaster(container: Container): HostPort = { + LOG.info(s"Launch Master on container " + container.getNodeHttpAddress) + val host = container.getNodeId.getHost + + val port = Util.findFreePort().get + + LOG.info("=============PORT" + port) + val masterCommand = MasterCommand(akkaConf, rootPath, HostPort(host, port)) + nmClient.launchCommand(container, masterCommand.get, packagePath, hdfsConfDir) + HostPort(host, port) + } + + private def requestWorkerContainers(workers: Int): Unit = { + LOG.info(s"Request resource for workers($workers)") + val containers = (1 to workers).map( + i => Resource.newInstance(workerMemory, workerVCores) + ).toList + + rmClient.requestContainers(containers) + } + + private def launchWorker(container: Container, masters: List[MasterInfo]): Unit = { + LOG.info(s"Launch Worker on container " + container.getNodeHttpAddress) + val workerHost = container.getNodeId.getHost + val workerCommand = WorkerCommand(akkaConf, rootPath, masters.head.host, workerHost) + nmClient.launchCommand(container, workerCommand.get, packagePath, hdfsConfDir) + } +} + +object YarnAppMaster extends AkkaApp with ArgumentsParser { + val LOG: Logger = LogUtil.getLogger(getClass) + + override val options: Array[(String, CLIOption[Any])] = Array( + "conf" -> CLIOption[String]("<Gearpump configuration directory on HDFS>", required = true), + "package" -> CLIOption[String]("<Gearpump package path on HDFS>", required = true) + ) + + override def akkaConfig: Config = { + ClusterConfig.ui() + } + + override def main(akkaConf: Config, args: Array[String]): Unit = { + implicit val timeout = Timeout(5, TimeUnit.SECONDS) + implicit val system = ActorSystem("GearpumpAM", akkaConf) + + val yarnConf = new YarnConfig() + + val confDir = parse(args).getString("conf") + val packagePath = parse(args).getString("package") + + LOG.info("HADOOP_CONF_DIR: " + System.getenv("HADOOP_CONF_DIR")) + LOG.info("YARN Resource Manager: " + yarnConf.resourceManager) + + val rmClient = new RMClient(yarnConf) + val nmClient = new NMClient(yarnConf, akkaConf) + val appMaster = system.actorOf(Props(new YarnAppMaster(rmClient, + nmClient, packagePath, confDir, UIService))) + + val daemon = system.actorOf(Props(new Daemon(appMaster))) + Await.result(system.whenTerminated, Duration.Inf) + LOG.info("YarnAppMaster is shutdown") + } + + class Daemon(appMaster: ActorRef) extends Actor { + context.watch(appMaster) + + override def receive: Actor.Receive = { + case Terminated(actor) => + if (actor.compareTo(appMaster) == 0) { + LOG.info(s"YarnAppMaster ${appMaster.path.toString} is terminated, " + + s"shutting down current ActorSystem") + context.system.terminate() + context.stop(self) + } + } + } + + case class ResourceManagerException(throwable: Throwable) + case object ShutdownApplication + case class ContainersRequest(containers: List[Resource]) + case class ContainersAllocated(containers: List[Container]) + case class ContainersCompleted(containers: List[ContainerStatus]) + case class ContainerStarted(containerId: ContainerId) + case object AppMasterRegistered + + case class GetActiveConfig(clientHost: String) + + case object QueryClusterInfo + case class ClusterInfo(masters: List[String], workers: List[String]) { + override def toString: String = { + val separator = "\n" + val masterSection = "masters: " + separator + masters.mkString("\n") + "\n" + + val workerSection = "workers: " + separator + workers.mkString("\n") + "\n" + masterSection + workerSection + } + } + + case object Kill + case class ActiveConfig(config: Config) + + case object QueryVersion + + case class Version(version: String) + + case class MasterInfo(id: ContainerId, nodeId: NodeId, host: HostPort) + + case class WorkerInfo(id: ContainerId, nodeId: NodeId) + + def getAppMaster(report: ApplicationReport, system: ActorSystem): ActorRef = { + val client = new HttpClient() + val appMasterPath = s"${report.getOriginalTrackingUrl}/supervisor-actor-path" + val get = new GetMethod(appMasterPath) + var status = client.executeMethod(get) + + if (status != 200) { + // Sleeps a little bit, and try again + Thread.sleep(3000) + status = client.executeMethod(get) + } + + if (status == 200) { + AkkaHelper.actorFor(system, get.getResponseBodyAsString) + } else { + throw new IOException("Fail to resolve AppMaster address, please make sure " + + s"${report.getOriginalTrackingUrl} is accessible...") + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/AppMasterResolver.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/AppMasterResolver.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/AppMasterResolver.scala new file mode 100644 index 0000000..9fb69b2 --- /dev/null +++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/AppMasterResolver.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.experiments.yarn.client + +import java.io.IOException + +import akka.actor.{ActorRef, ActorSystem} +import org.apache.commons.httpclient.HttpClient +import org.apache.commons.httpclient.methods.GetMethod +import org.apache.gearpump.experiments.yarn.glue.Records.ApplicationId +import org.apache.gearpump.experiments.yarn.glue.YarnClient +import org.apache.gearpump.util.{AkkaHelper, LogUtil} + +import scala.util.Try + +/** + * Resolves AppMaster ActorRef + */ +class AppMasterResolver(yarnClient: YarnClient, system: ActorSystem) { + val LOG = LogUtil.getLogger(getClass) + val RETRY_INTERVAL_MS = 3000 // ms + + def resolve(appId: ApplicationId, timeoutSeconds: Int = 30): ActorRef = { + val appMaster = retry(connect(appId), 1 + timeoutSeconds * 1000 / RETRY_INTERVAL_MS) + appMaster + } + + private def connect(appId: ApplicationId): ActorRef = { + val report = yarnClient.getApplicationReport(appId) + val client = new HttpClient() + val appMasterPath = s"${report.getOriginalTrackingUrl}/supervisor-actor-path" + LOG.info(s"appMasterPath=$appMasterPath") + val get = new GetMethod(appMasterPath) + val status = client.executeMethod(get) + if (status == 200) { + val response = get.getResponseBodyAsString + LOG.info("Successfully resolved AppMaster address: " + response) + AkkaHelper.actorFor(system, response) + } else { + throw new IOException("Fail to resolve AppMaster address, please make sure " + + s"${report.getOriginalTrackingUrl} is accessible...") + } + } + + private def retry(fun: => ActorRef, times: Int): ActorRef = { + var index = 0 + var result: ActorRef = null + while (index < times && result == null) { + Thread.sleep(RETRY_INTERVAL_MS) + index += 1 + val tryConnect = Try(fun) + if (tryConnect.isFailure) { + LOG.error(s"Failed to connect YarnAppMaster(tried $index)... " + + tryConnect.failed.get.getMessage) + } else { + result = tryConnect.get + } + } + result + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/Client.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/Client.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/Client.scala new file mode 100644 index 0000000..9ec2eae --- /dev/null +++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/Client.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experiments.yarn.client + +import org.apache.gearpump.util.LogUtil +import org.slf4j.Logger + +/** Command line tool to launch a Gearpump cluster on YARN, and also to manage Gearpump cluster */ +object Client { + + private val LOG: Logger = LogUtil.getLogger(getClass) + val LAUNCH = "launch" + + val commands = Map(LAUNCH -> LaunchCluster) ++ + ManageCluster.commands.map(key => (key, ManageCluster)).toMap + + def usage(): Unit = { + val keys = commands.keys.toList.sorted + // scalastyle:off println + Console.err.println("Usage: " + "<" + keys.mkString("|") + ">") + // scalastyle:on println + } + + def main(args: Array[String]): Unit = { + if (args.length == 0) { + usage() + } else { + val key = args(0) + val command = commands.get(key) + command match { + case Some(command) => + if (key == LAUNCH) { + val remainArgs = args.drop(1) + command.main(remainArgs) + } else { + val commandArg = Array("-" + ManageCluster.COMMAND, key) + val remainArgs = args.drop(1) + val updatedArgs = commandArg ++ args.drop(1) + command.main(updatedArgs) + } + case None => + usage + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/LaunchCluster.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/LaunchCluster.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/LaunchCluster.scala new file mode 100644 index 0000000..2475728 --- /dev/null +++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/LaunchCluster.scala @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.experiments.yarn.client + +import java.io.{File, IOException, OutputStreamWriter} +import java.net.InetAddress +import java.util.zip.ZipInputStream + +import akka.actor.ActorSystem +import com.typesafe.config.{Config, ConfigValueFactory} +import org.apache.gearpump.cluster.ClusterConfig +import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} +import org.apache.gearpump.experiments.yarn.Constants +import org.apache.gearpump.experiments.yarn.appmaster.AppMasterCommand +import org.apache.gearpump.experiments.yarn.appmaster.YarnAppMaster.{ActiveConfig, GetActiveConfig} +import org.apache.gearpump.experiments.yarn.glue.Records.{ApplicationId, Resource} +import org.apache.gearpump.experiments.yarn.glue.{FileSystem, YarnClient, YarnConfig} +import org.apache.gearpump.util.ActorUtil.askActor +import org.apache.gearpump.util.{AkkaApp, LogUtil, Util} +import org.slf4j.Logger + +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, Future} + +/** + * Launch Gearpump on YARN + */ +class LaunchCluster( + akka: Config, + yarnConf: YarnConfig, + yarnClient: YarnClient, + fs: FileSystem, + actorSystem: ActorSystem, + appMasterResolver: AppMasterResolver, + version: String = Util.version) { + + import org.apache.gearpump.experiments.yarn.Constants._ + private implicit val dispatcher = actorSystem.dispatcher + + private val LOG: Logger = LogUtil.getLogger(getClass) + private val host = InetAddress.getLocalHost.getHostName + private val queue = akka.getString(APPMASTER_QUEUE) + private val memory = akka.getString(APPMASTER_MEMORY).toInt + private val vcore = akka.getString(APPMASTER_VCORES).toInt + + def submit(appName: String, packagePath: String): ApplicationId = { + LOG.info("Starting AM") + + // First step, check the version, to make sure local version matches remote version + if (!packagePath.endsWith(".zip")) { + throw new IOException(s"YarnClient only support .zip distribution package," + + s" now it is ${packagePath}. Please download the zip " + + "package from website or use sbt assembly packArchiveZip to build one.") + } + + if (!fs.exists(packagePath)) { + throw new IOException(s"Cannot find package ${packagePath} on HDFS ${fs.name}. ") + } + + val rootEntry = rootEntryPath(zip = packagePath) + + if (!rootEntry.contains(version)) { + throw new IOException(s"Check version failed! Local gearpump binary" + + s" version $version doesn't match with remote path $packagePath") + } + + val resource = Resource.newInstance(memory, vcore) + val appId = yarnClient.createApplication + + // uploads the configs to HDFS home directory of current user. + val configPath = uploadConfigToHDFS(appId) + + val command = AppMasterCommand(akka, rootEntry, Array(s"-conf $configPath", + s"-package $packagePath")) + + yarnClient.submit(appName, appId, command.get, resource, queue, packagePath, configPath) + + LOG.info("Waiting application to finish...") + val report = yarnClient.awaitApplication(appId, LaunchCluster.TIMEOUT_MILLISECONDS) + LOG.info(s"Application $appId finished with state ${report.getYarnApplicationState} " + + s"at ${report.getFinishTime}, info: ${report.getDiagnostics}") + + // scalastyle:off println + Console.println("================================================") + Console.println("==Application Id: " + appId) + // scalastyle:on println + appId + } + + def saveConfig(appId: ApplicationId, output: String): Future[File] = { + LOG.info(s"Trying to download active configuration to output path: " + output) + LOG.info(s"Resolving YarnAppMaster ActorRef for application " + appId) + val appMaster = appMasterResolver.resolve(appId) + LOG.info(s"appMaster=${appMaster.path} host=$host") + val future = askActor[ActiveConfig](appMaster, GetActiveConfig(host)).map(_.config) + future.map { config => + val out = new File(output) + ClusterConfig.saveConfig(config, out) + out + } + } + + private def uploadConfigToHDFS(appId: ApplicationId): String = { + // Uses personal home directory so that it will not conflict with other users + // conf path pattern: /user/<userid>/.gearpump_application_<timestamp>_<id>/conf + val confDir = s"${fs.getHomeDirectory}/.gearpump_${appId}/conf/" + LOG.info(s"Uploading configuration files to remote HDFS(under $confDir)...") + + // Copies config from local to remote. + val remoteConfFile = s"$confDir/gear.conf" + var out = fs.create(remoteConfFile) + var writer = new OutputStreamWriter(out) + + val cleanedConfig = ClusterConfig.filterOutDefaultConfig(akka) + + writer.write(cleanedConfig.root().render()) + writer.close() + + // Saves yarn-site.xml to remote + val yarn_site_xml = s"$confDir/yarn-site.xml" + out = fs.create(yarn_site_xml) + writer = new OutputStreamWriter(out) + yarnConf.writeXml(writer) + writer.close() + + // Saves log4j.properties to remote + val log4j_properties = s"$confDir/log4j.properties" + val log4j = LogUtil.loadConfiguration + out = fs.create(log4j_properties) + writer = new OutputStreamWriter(out) + log4j.store(writer, "gearpump on yarn") + writer.close() + confDir.toString + } + + private def rootEntryPath(zip: String): String = { + val stream = new ZipInputStream(fs.open(zip)) + val entry = stream.getNextEntry() + val name = entry.getName + name.substring(0, entry.getName.indexOf("/")) + } +} + +object LaunchCluster extends AkkaApp with ArgumentsParser { + + val PACKAGE = "package" + val NAME = "name" + val VERBOSE = "verbose" + val OUTPUT = "output" + + override protected def akkaConfig: Config = { + ClusterConfig.default() + } + + override val options: Array[(String, CLIOption[Any])] = Array( + PACKAGE -> CLIOption[String]("<Please specify the gearpump.zip package path on HDFS. " + + "If not specified, we will use default value /user/gearpump/gearpump.zip>", required = false), + NAME -> CLIOption[String]("<Application name showed in YARN>", required = false, + defaultValue = Some("Gearpump")), + VERBOSE -> CLIOption("<print verbose log on console>", required = false, + defaultValue = Some(false)), + OUTPUT -> CLIOption("<output path for configuration file>", required = false, + defaultValue = None) + ) + private val TIMEOUT_MILLISECONDS = 30 * 1000 + + override def main(inputAkkaConf: Config, args: Array[String]): Unit = { + val parsed = parse(args) + if (parsed.getBoolean(VERBOSE)) { + LogUtil.verboseLogToConsole() + } + + val yarnConfig = new YarnConfig() + val fs = new FileSystem(yarnConfig) + val yarnClient = new YarnClient(yarnConfig) + val akkaConf = updateConf(inputAkkaConf, parsed) + val actorSystem = ActorSystem("launchCluster", akkaConf) + val appMasterResolver = new AppMasterResolver(yarnClient, actorSystem) + + val client = new LaunchCluster(akkaConf, yarnConfig, yarnClient, fs, + actorSystem, appMasterResolver) + + val name = parsed.getString(NAME) + val appId = client.submit(name, akkaConf.getString(Constants.PACKAGE_PATH)) + + if (parsed.exists(OUTPUT)) { + import scala.concurrent.duration._ + Await.result(client.saveConfig(appId, parsed.getString(OUTPUT)), + TIMEOUT_MILLISECONDS.milliseconds) + } + + yarnClient.stop() + actorSystem.terminate() + Await.result(actorSystem.whenTerminated, Duration.Inf) + } + + private def updateConf(akka: Config, parsed: ParseResult): Config = { + if (parsed.exists(PACKAGE)) { + akka.withValue(Constants.PACKAGE_PATH, + ConfigValueFactory.fromAnyRef(parsed.getString(PACKAGE))) + } else { + akka + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/ManageCluster.scala ---------------------------------------------------------------------- diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/ManageCluster.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/ManageCluster.scala new file mode 100644 index 0000000..5b12346 --- /dev/null +++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/ManageCluster.scala @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.experiments.yarn.client + +import java.io.{File, IOException} +import java.net.InetAddress +import scala.concurrent.{Await, Future} + +import akka.actor.{ActorRef, ActorSystem} + +import org.apache.gearpump.cluster.ClientToMaster.{AddWorker, CommandResult, RemoveWorker} +import org.apache.gearpump.cluster.ClusterConfig +import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult} +import org.apache.gearpump.experiments.yarn.appmaster.YarnAppMaster.{ActiveConfig, ClusterInfo, GetActiveConfig, Kill, QueryClusterInfo, QueryVersion, Version} +import org.apache.gearpump.experiments.yarn.glue.Records.ApplicationId +import org.apache.gearpump.experiments.yarn.glue.{YarnClient, YarnConfig} +import org.apache.gearpump.util.ActorUtil.askActor +import org.apache.gearpump.util.{AkkaApp, LogUtil} + +/** Manage current Gearpump cluster on YARN */ +class ManageCluster(appId: ApplicationId, appMaster: ActorRef, system: ActorSystem) { + import org.apache.gearpump.experiments.yarn.client.ManageCluster._ + + private val host = InetAddress.getLocalHost.getHostName + implicit val dispatcher = system.dispatcher + + def getConfig: Future[ActiveConfig] = askActor[ActiveConfig](appMaster, GetActiveConfig(host)) + def version: Future[Version] = askActor[Version](appMaster, QueryVersion) + def addWorker(count: Int): Future[CommandResult] = { + askActor[CommandResult](appMaster, AddWorker(count)) + } + + def removeWorker(worker: String): Future[CommandResult] = { + askActor[CommandResult](appMaster, RemoveWorker(worker)) + } + + def shutdown: Future[CommandResult] = askActor[CommandResult](appMaster, Kill) + def queryClusterInfo: Future[ClusterInfo] = askActor[ClusterInfo](appMaster, QueryClusterInfo) + + def command(command: String, parsed: ParseResult): Future[AnyRef] = { + command match { + case GET_CONFIG => + if (parsed.exists(OUTPUT)) { + getConfig.map { conf => + ClusterConfig.saveConfig(conf.config, new File(parsed.getString(OUTPUT))) + conf + } + } else { + throw new IOException(s"Please specify -$OUTPUT option") + } + case ADD_WORKER => + val count = parsed.getString(COUNT).toInt + addWorker(count) + case REMOVE_WORKER => + val containerId = parsed.getString(CONTAINER) + if (containerId == null || containerId.isEmpty) { + throw new IOException(s"Please specify -$CONTAINER option") + } else { + removeWorker(containerId) + } + case KILL => + shutdown + case QUERY => + queryClusterInfo + case VERSION => + version + } + } +} + +object ManageCluster extends AkkaApp with ArgumentsParser { + val GET_CONFIG = "getconfig" + val ADD_WORKER = "addworker" + val REMOVE_WORKER = "removeworker" + val KILL = "kill" + val VERSION = "version" + val QUERY = "query" + val COMMAND = "command" + val CONTAINER = "container" + val OUTPUT = "output" + val COUNT = "count" + val APPID = "appid" + val VERBOSE = "verbose" + + val commands = List(GET_CONFIG, ADD_WORKER, REMOVE_WORKER, KILL, VERSION, QUERY) + + import scala.concurrent.duration._ + val TIME_OUT_SECONDS = 30.seconds + + override val options: Array[(String, CLIOption[Any])] = Array( + COMMAND -> CLIOption[String](s"<${commands.mkString("|")}>", required = true), + APPID -> CLIOption[String]("<Application id, format: application_timestamp_id>", + required = true), + COUNT -> CLIOption("<how many instance to add or remove>", required = false, + defaultValue = Some(1)), + VERBOSE -> CLIOption("<print verbose log on console>", required = false, + defaultValue = Some(false)), + OUTPUT -> CLIOption("<output path for configuration file>", required = false, + defaultValue = Some("")), + CONTAINER -> CLIOption("<container id for master or worker>", required = false, + defaultValue = Some("")) + ) + + override def main(akkaConf: Config, args: Array[String]): Unit = { + + val yarnConfig = new YarnConfig() + val yarnClient = new YarnClient(yarnConfig) + + val parsed = parse(args) + + if (parsed.getBoolean(VERBOSE)) { + LogUtil.verboseLogToConsole() + } + + val appId = parseAppId(parsed.getString(APPID)) + val system = ActorSystem("manageCluster", akkaConf) + + val appMasterResolver = new AppMasterResolver(yarnClient, system) + val appMaster = appMasterResolver.resolve(appId) + + implicit val dispatcher = system.dispatcher + val manager = new ManageCluster(appId, appMaster, system) + + val command = parsed.getString(COMMAND) + val result = manager.command(command, parsed) + + // scalastyle:off println + Console.println(Await.result(result, TIME_OUT_SECONDS)) + // scalastyle:on println + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + } + + def parseAppId(str: String): ApplicationId = { + val parts = str.split("_") + val timestamp = parts(1).toLong + val id = parts(2).toInt + ApplicationId.newInstance(timestamp, id) + } +} \ No newline at end of file
