http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/io/gearpump/cluster/worker/Worker.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/worker/Worker.scala b/daemon/src/main/scala/io/gearpump/cluster/worker/Worker.scala deleted file mode 100644 index 0568641..0000000 --- a/daemon/src/main/scala/io/gearpump/cluster/worker/Worker.scala +++ /dev/null @@ -1,580 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.cluster.worker - -import java.io.File -import java.lang.management.ManagementFactory -import java.net.URL -import java.util.concurrent.{Executors, TimeUnit} -import scala.concurrent.duration._ -import scala.concurrent.{ExecutionContext, Future, Promise} -import scala.util.{Failure, Success, Try} - -import akka.actor.SupervisorStrategy.Stop -import akka.actor._ -import com.typesafe.config.{ConfigValueFactory, Config, ConfigFactory} -import org.slf4j.Logger - -import io.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData} -import io.gearpump.cluster.AppMasterToWorker._ -import io.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryWorkerConfig} -import io.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, WorkerConfig} -import io.gearpump.cluster.MasterToWorker._ -import io.gearpump.cluster.WorkerToAppMaster._ -import io.gearpump.cluster.WorkerToMaster._ -import io.gearpump.cluster.master.Master.MasterInfo -import io.gearpump.cluster.scheduler.Resource -import io.gearpump.cluster.worker.Worker.ExecutorWatcher -import io.gearpump.cluster.{ClusterConfig, ExecutorJVMConfig} -import io.gearpump.jarstore.JarStoreService -import io.gearpump.metrics.Metrics.ReportMetrics -import io.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService} -import io.gearpump.util.ActorSystemBooter.Daemon -import io.gearpump.util.Constants._ -import io.gearpump.util.HistoryMetricsService.HistoryMetricsConfig -import io.gearpump.util.{TimeOutScheduler, _} - -/** - * Worker is used to track the resource on single machine, it is like - * the node manager of YARN. - * - * @param masterProxy masterProxy is used to resolve the master - */ -private[cluster] class Worker(masterProxy: ActorRef) extends Actor with TimeOutScheduler { - private val systemConfig: Config = context.system.settings.config - - private val address = ActorUtil.getFullPath(context.system, self.path) - private var resource = Resource.empty - private var allocatedResources = Map[ActorRef, Resource]() - private var executorsInfo = Map[ActorRef, ExecutorSlots]() - private var id: WorkerId = WorkerId.unspecified - private val createdTime = System.currentTimeMillis() - private var masterInfo: MasterInfo = null - private var executorNameToActor = Map.empty[String, ActorRef] - private val executorProcLauncher: ExecutorProcessLauncher = getExecutorProcLauncher() - private val jarStoreService = JarStoreService.get(systemConfig) - jarStoreService.init(systemConfig, context.system) - - private val ioPool = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool()) - private val resourceUpdateTimeoutMs = 30000 // Milliseconds - - private var totalSlots: Int = 0 - - val metricsEnabled = systemConfig.getBoolean(GEARPUMP_METRIC_ENABLED) - var historyMetricsService: Option[ActorRef] = None - - override def receive: Receive = null - var LOG: Logger = LogUtil.getLogger(getClass) - - def service: Receive = - appMasterMsgHandler orElse - clientMessageHandler orElse - metricsService orElse - terminationWatch(masterInfo.master) orElse - ActorUtil.defaultMsgHandler(self) - - def metricsService: Receive = { - case query: QueryHistoryMetrics => - if (historyMetricsService.isEmpty) { - // Returns empty metrics so that we don't hang the UI - sender ! HistoryMetrics(query.path, List.empty[HistoryMetricsItem]) - } else { - historyMetricsService.get forward query - } - } - - private var metricsInitialized = false - - val getHistoryMetricsConfig = HistoryMetricsConfig(systemConfig) - - private def initializeMetrics(): Unit = { - // Registers jvm metrics - val metricsSetName = "worker" + WorkerId.render(id) - Metrics(context.system).register(new JvmMetricsSet(metricsSetName)) - - historyMetricsService = if (metricsEnabled) { - val historyMetricsService = { - context.actorOf(Props(new HistoryMetricsService(metricsSetName, getHistoryMetricsConfig))) - } - - val metricsReportService = context.actorOf(Props( - new MetricsReporterService(Metrics(context.system)))) - historyMetricsService.tell(ReportMetrics, metricsReportService) - Some(historyMetricsService) - } else { - None - } - } - - def waitForMasterConfirm(timeoutTicker: Cancellable): Receive = { - - // If master get disconnected, the WorkerRegistered may be triggered multiple times. - case WorkerRegistered(id, masterInfo) => - this.id = id - - // Adds the flag check, so that we don't re-initialize the metrics when worker re-register - // itself. - if (!metricsInitialized) { - initializeMetrics() - metricsInitialized = true - } - - this.masterInfo = masterInfo - timeoutTicker.cancel() - context.watch(masterInfo.master) - this.LOG = LogUtil.getLogger(getClass, worker = id) - LOG.info(s"Worker is registered. " + - s"actor path: ${ActorUtil.getFullPath(context.system, self.path)} ....") - sendMsgWithTimeOutCallBack(masterInfo.master, ResourceUpdate(self, id, resource), - resourceUpdateTimeoutMs, updateResourceTimeOut()) - context.become(service) - } - - private def updateResourceTimeOut(): Unit = { - LOG.error(s"Update worker resource time out") - } - - def appMasterMsgHandler: Receive = { - case shutdown@ShutdownExecutor(appId, executorId, reason: String) => - val actorName = ActorUtil.actorNameForExecutor(appId, executorId) - val executorToStop = executorNameToActor.get(actorName) - if (executorToStop.isDefined) { - LOG.info(s"Shutdown executor ${actorName}(${executorToStop.get.path.toString}) " + - s"due to: $reason") - executorToStop.get.forward(shutdown) - } else { - LOG.error(s"Cannot find executor $actorName, ignore this message") - sender ! ShutdownExecutorFailed(s"Can not find executor $executorId for app $appId") - } - case launch: LaunchExecutor => - LOG.info(s"$launch") - if (resource < launch.resource) { - sender ! ExecutorLaunchRejected("There is no free resource on this machine") - } else { - val actorName = ActorUtil.actorNameForExecutor(launch.appId, launch.executorId) - - val executor = context.actorOf(Props(classOf[ExecutorWatcher], launch, masterInfo, ioPool, - jarStoreService, executorProcLauncher)) - executorNameToActor += actorName -> executor - - resource = resource - launch.resource - allocatedResources = allocatedResources + (executor -> launch.resource) - - reportResourceToMaster() - executorsInfo += executor -> - ExecutorSlots(launch.appId, launch.executorId, launch.resource.slots) - context.watch(executor) - } - case UpdateResourceFailed(reason, ex) => - LOG.error(reason) - context.stop(self) - case UpdateResourceSucceed => - LOG.info(s"Update resource succeed") - case GetWorkerData(workerId) => - val aliveFor = System.currentTimeMillis() - createdTime - val logDir = LogUtil.daemonLogDir(systemConfig).getAbsolutePath - val userDir = System.getProperty("user.dir") - sender ! WorkerData(WorkerSummary( - id, "active", - address, - aliveFor, - logDir, - executorsInfo.values.toArray, - totalSlots, - resource.slots, - userDir, - jvmName = ManagementFactory.getRuntimeMXBean().getName(), - resourceManagerContainerId = systemConfig.getString( - GEARPUMP_WORKER_RESOURCE_MANAGER_CONTAINER_ID), - historyMetricsConfig = getHistoryMetricsConfig) - ) - case ChangeExecutorResource(appId, executorId, usedResource) => - for (executor <- executorActorRef(appId, executorId); - allocatedResource <- allocatedResources.get(executor)) { - - allocatedResources += executor -> usedResource - resource = resource + allocatedResource - usedResource - reportResourceToMaster() - - if (usedResource == Resource(0)) { - allocatedResources -= executor - // stop executor if there is no resource binded to it. - LOG.info(s"Shutdown executor $executorId because the resource used is zero") - executor ! ShutdownExecutor(appId, executorId, - "Shutdown executor because the resource used is zero") - } - } - } - - private def reportResourceToMaster(): Unit = { - sendMsgWithTimeOutCallBack(masterInfo.master, - ResourceUpdate(self, id, resource), resourceUpdateTimeoutMs, updateResourceTimeOut()) - } - - private def executorActorRef(appId: Int, executorId: Int): Option[ActorRef] = { - val actorName = ActorUtil.actorNameForExecutor(appId, executorId) - executorNameToActor.get(actorName) - } - - def clientMessageHandler: Receive = { - case QueryWorkerConfig(workerId) => - if (this.id == workerId) { - sender ! WorkerConfig(ClusterConfig.filterOutDefaultConfig(systemConfig)) - } else { - sender ! WorkerConfig(ConfigFactory.empty) - } - } - - private def retryRegisterWorker(workerId: WorkerId, timeOutSeconds: Int): Cancellable = { - repeatActionUtil( - seconds = timeOutSeconds, - action = () => { - masterProxy ! RegisterWorker(workerId) - }, - onTimeout = () => { - LOG.error(s"Failed to register the worker $workerId after retrying for $timeOutSeconds " + - s"seconds, abort and kill the worker...") - self ! PoisonPill - }) - } - - def terminationWatch(master: ActorRef): Receive = { - case Terminated(actor) => - if (actor.compareTo(master) == 0) { - // Parent master is down, no point to keep worker anymore. Let's make suicide to free - // resources - LOG.info(s"Master cannot be contacted, find a new master ...") - context.become(waitForMasterConfirm(retryRegisterWorker(id, timeOutSeconds = 30))) - } else if (ActorUtil.isChildActorPath(self, actor)) { - // One executor is down, - LOG.info(s"Executor is down ${getExecutorName(actor)}") - - val allocated = allocatedResources.get(actor) - if (allocated.isDefined) { - resource = resource + allocated.get - executorsInfo -= actor - allocatedResources = allocatedResources - actor - sendMsgWithTimeOutCallBack(master, ResourceUpdate(self, id, resource), - resourceUpdateTimeoutMs, updateResourceTimeOut()) - } - } - } - - private def getExecutorName(actorRef: ActorRef): Option[String] = { - executorNameToActor.find(_._2 == actorRef).map(_._1) - } - - private def getExecutorProcLauncher(): ExecutorProcessLauncher = { - val launcherClazz = Class.forName( - systemConfig.getString(GEARPUMP_EXECUTOR_PROCESS_LAUNCHER)) - launcherClazz.getConstructor(classOf[Config]).newInstance(systemConfig) - .asInstanceOf[ExecutorProcessLauncher] - } - - import context.dispatcher - override def preStart(): Unit = { - LOG.info(s"RegisterNewWorker") - totalSlots = systemConfig.getInt(GEARPUMP_WORKER_SLOTS) - this.resource = Resource(totalSlots) - masterProxy ! RegisterNewWorker - context.become(waitForMasterConfirm(registerTimeoutTicker(seconds = 30))) - } - - private def registerTimeoutTicker(seconds: Int): Cancellable = { - repeatActionUtil(seconds, () => Unit, () => { - LOG.error(s"Failed to register new worker to Master after waiting for $seconds seconds, " + - s"abort and kill the worker...") - self ! PoisonPill - }) - } - - private def repeatActionUtil(seconds: Int, action: () => Unit, onTimeout: () => Unit) - : Cancellable = { - val cancelTimeout = context.system.scheduler.schedule(Duration.Zero, - Duration(2, TimeUnit.SECONDS))(action()) - val cancelSuicide = context.system.scheduler.scheduleOnce(seconds.seconds)(onTimeout()) - new Cancellable { - def cancel(): Boolean = { - val result1 = cancelTimeout.cancel() - val result2 = cancelSuicide.cancel() - result1 && result2 - } - - def isCancelled: Boolean = { - cancelTimeout.isCancelled && cancelSuicide.isCancelled - } - } - } - - override def postStop(): Unit = { - LOG.info(s"Worker is going down....") - ioPool.shutdown() - context.system.terminate() - } -} - -private[cluster] object Worker { - - case class ExecutorResult(result: Try[Int]) - - class ExecutorWatcher( - launch: LaunchExecutor, - masterInfo: MasterInfo, - ioPool: ExecutionContext, - jarStoreService: JarStoreService, - procLauncher: ExecutorProcessLauncher) extends Actor { - import launch.{appId, executorId, resource} - - private val LOG: Logger = LogUtil.getLogger(getClass, app = appId, executor = executorId) - - val executorConfig: Config = { - val workerConfig = context.system.settings.config - - val submissionConfig = Option(launch.executorJvmConfig).flatMap { jvmConfig => - Option(jvmConfig.executorAkkaConfig) - }.getOrElse(ConfigFactory.empty()) - - resolveExecutorConfig(workerConfig, submissionConfig) - } - - // For some config, worker has priority, for others, user Application submission config - // have priorities. - private def resolveExecutorConfig(workerConfig: Config, submissionConfig: Config): Config = { - val config = submissionConfig.withoutPath(GEARPUMP_HOSTNAME) - .withoutPath(GEARPUMP_CLUSTER_MASTERS) - .withoutPath(GEARPUMP_HOME) - .withoutPath(GEARPUMP_LOG_DAEMON_DIR) - .withoutPath(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS) - // Falls back to workerConfig - .withFallback(workerConfig) - - // Minimum supported akka.scheduler.tick-duration on Windows is 10ms - val duration = config.getInt(AKKA_SCHEDULER_TICK_DURATION) - val updatedConf = if (akka.util.Helpers.isWindows && duration < 10) { - LOG.warn(s"$AKKA_SCHEDULER_TICK_DURATION on Windows must be larger than 10ms, set to 10ms") - config.withValue(AKKA_SCHEDULER_TICK_DURATION, ConfigValueFactory.fromAnyRef(10)) - } else { - config - } - - // Excludes reference.conf, and JVM properties.. - ClusterConfig.filterOutDefaultConfig(updatedConf) - } - - implicit val executorService = ioPool - - private val executorHandler = { - val ctx = launch.executorJvmConfig - - if (executorConfig.getBoolean(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS)) { - new ExecutorHandler { - val exitPromise = Promise[Int]() - val app = context.actorOf(Props(new InJvmExecutor(launch, exitPromise))) - - override def destroy(): Unit = { - context.stop(app) - } - override def exitValue: Future[Int] = { - exitPromise.future - } - } - } else { - createProcess(ctx) - } - } - - private def createProcess(ctx: ExecutorJVMConfig): ExecutorHandler = { - - val process = Future { - val jarPath = ctx.jar.map { appJar => - val tempFile = File.createTempFile(appJar.name, ".jar") - jarStoreService.copyToLocalFile(tempFile, appJar.filePath) - val file = new URL("file:" + tempFile) - file.getFile - } - - val configFile = { - val configFile = File.createTempFile("gearpump", ".conf") - ClusterConfig.saveConfig(executorConfig, configFile) - val file = new URL("file:" + configFile) - file.getFile - } - - val classPath = filterOutDaemonLib(Util.getCurrentClassPath) ++ - ctx.classPath.map(path => expandEnviroment(path)) ++ - jarPath.map(Array(_)).getOrElse(Array.empty[String]) - - val appLogDir = executorConfig.getString(GEARPUMP_LOG_APPLICATION_DIR) - val logArgs = List( - s"-D${GEARPUMP_APPLICATION_ID}=${launch.appId}", - s"-D${GEARPUMP_EXECUTOR_ID}=${launch.executorId}", - s"-D${GEARPUMP_MASTER_STARTTIME}=${getFormatedTime(masterInfo.startTime)}", - s"-D${GEARPUMP_LOG_APPLICATION_DIR}=${appLogDir}") - val configArgs = List(s"-D${GEARPUMP_CUSTOM_CONFIG_FILE}=$configFile") - - val username = List(s"-D${GEARPUMP_USERNAME}=${ctx.username}") - - // Remote debug executor process - val remoteDebugFlag = executorConfig.getBoolean(GEARPUMP_REMOTE_DEBUG_EXECUTOR_JVM) - val remoteDebugConfig = if (remoteDebugFlag) { - val availablePort = Util.findFreePort().get - List( - "-Xdebug", - s"-Xrunjdwp:server=y,transport=dt_socket,address=${availablePort},suspend=n", - s"-D${GEARPUMP_REMOTE_DEBUG_PORT}=$availablePort" - ) - } else { - List.empty[String] - } - - val verboseGCFlag = executorConfig.getBoolean(GEARPUMP_VERBOSE_GC) - val verboseGCConfig = if (verboseGCFlag) { - List( - s"-Xloggc:${appLogDir}/gc-app${launch.appId}-executor-${launch.executorId}.log", - "-verbose:gc", - "-XX:+PrintGCDetails", - "-XX:+PrintGCDateStamps", - "-XX:+PrintTenuringDistribution", - "-XX:+PrintGCApplicationConcurrentTime", - "-XX:+PrintGCApplicationStoppedTime" - ) - } else { - List.empty[String] - } - - val ipv4 = List(s"-D${PREFER_IPV4}=true") - - val options = ctx.jvmArguments ++ username ++ - logArgs ++ remoteDebugConfig ++ verboseGCConfig ++ ipv4 ++ configArgs - - LOG.info(s"Launch executor, classpath: ${classPath.mkString(File.pathSeparator)}") - val process = procLauncher.createProcess(appId, executorId, resource, executorConfig, - options, classPath, ctx.mainClass, ctx.arguments) - - ProcessInfo(process, jarPath, configFile) - } - - new ExecutorHandler { - - var destroyed = false - - override def destroy(): Unit = { - LOG.info(s"Destroy executor process ${ctx.mainClass}") - if (!destroyed) { - destroyed = true - process.foreach { info => - info.process.destroy() - info.jarPath.foreach(new File(_).delete()) - new File(info.configFile).delete() - } - } - } - - override def exitValue: Future[Int] = { - process.flatMap { info => - val exit = info.process.exitValue() - if (exit == 0) { - Future.successful(0) - } else { - Future.failed[Int](new Exception(s"Executor exit with failure, exit value: $exit, " + - s"error summary: ${info.process.logger.error}")) - } - } - } - } - } - - private def expandEnviroment(path: String): String = { - // TODO: extend this to support more environment. - path.replace(s"<${GEARPUMP_HOME}>", executorConfig.getString(GEARPUMP_HOME)) - } - - override def preStart(): Unit = { - executorHandler.exitValue.onComplete { value => - procLauncher.cleanProcess(appId, executorId) - val result = ExecutorResult(value) - self ! result - } - } - - override def postStop(): Unit = { - executorHandler.destroy() - } - - // The folders are under ${GEARPUMP_HOME} - val daemonPathPattern = List("lib" + File.separator + "daemon", "lib" + - File.separator + "yarn") - - override def receive: Receive = { - case ShutdownExecutor(appId, executorId, reason: String) => - executorHandler.destroy() - sender ! ShutdownExecutorSucceed(appId, executorId) - context.stop(self) - case ExecutorResult(executorResult) => - executorResult match { - case Success(exit) => LOG.info("Executor exit normally with exit value " + exit) - case Failure(e) => LOG.error("Executor exit with errors", e) - } - context.stop(self) - } - - private def getFormatedTime(timestamp: Long): String = { - val datePattern = "yyyy-MM-dd-HH-mm" - val format = new java.text.SimpleDateFormat(datePattern) - format.format(timestamp) - } - - private def filterOutDaemonLib(classPath: Array[String]): Array[String] = { - classPath.filterNot(matchDaemonPattern(_)) - } - - private def matchDaemonPattern(path: String): Boolean = { - daemonPathPattern.exists(path.contains(_)) - } - } - - trait ExecutorHandler { - def destroy(): Unit - def exitValue: Future[Int] - } - - case class ProcessInfo(process: RichProcess, jarPath: Option[String], configFile: String) - - /** - * Starts the executor in the same JVM as worker. - */ - class InJvmExecutor(launch: LaunchExecutor, exit: Promise[Int]) - extends Daemon(launch.executorJvmConfig.arguments(0), launch.executorJvmConfig.arguments(1)) { - private val exitCode = 0 - - override val supervisorStrategy = - OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1.minute) { - case ex: Throwable => - LOG.error(s"system $name stopped ", ex) - exit.failure(ex) - Stop - } - - override def postStop(): Unit = { - if (!exit.isCompleted) { - exit.success(exitCode) - } - } - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/io/gearpump/jarstore/dfs/DFSJarStoreService.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/jarstore/dfs/DFSJarStoreService.scala b/daemon/src/main/scala/io/gearpump/jarstore/dfs/DFSJarStoreService.scala deleted file mode 100644 index 305bdc1..0000000 --- a/daemon/src/main/scala/io/gearpump/jarstore/dfs/DFSJarStoreService.scala +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.jarstore.dfs - -import java.io.File - -import akka.actor.ActorSystem -import com.typesafe.config.Config -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path -import org.apache.hadoop.fs.permission.{FsAction, FsPermission} -import org.slf4j.Logger - -import io.gearpump.jarstore.{FilePath, JarStoreService} -import io.gearpump.util.{Constants, LogUtil} - -/** - * DFSJarStoreService store the uploaded jar on HDFS - */ -class DFSJarStoreService extends JarStoreService { - private val LOG: Logger = LogUtil.getLogger(getClass) - private var rootPath: Path = null - - override val scheme: String = "hdfs" - - override def init(config: Config, actorRefFactory: ActorSystem): Unit = { - rootPath = new Path(config.getString(Constants.GEARPUMP_APP_JAR_STORE_ROOT_PATH)) - val fs = rootPath.getFileSystem(new Configuration()) - if (!fs.exists(rootPath)) { - fs.mkdirs(rootPath, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)) - } - } - - /** - * This function will copy the remote file to local file system, called from client side. - * - * @param localFile The destination of file path - * @param remotePath The remote file path from JarStore - */ - override def copyToLocalFile(localFile: File, remotePath: FilePath): Unit = { - LOG.info(s"Copying to local file: ${localFile.getAbsolutePath} from ${remotePath}") - val filePath = new Path(rootPath, remotePath.path) - val fs = filePath.getFileSystem(new Configuration()) - val target = new Path(localFile.toURI().toString) - fs.copyToLocalFile(filePath, target) - } - - /** - * This function will copy the local file to the remote JarStore, called from client side. - * - * @param localFile The local file - */ - override def copyFromLocal(localFile: File): FilePath = { - val remotePath = FilePath(Math.abs(new java.util.Random().nextLong()).toString) - LOG.info(s"Copying from local file: ${localFile.getAbsolutePath} to ${remotePath}") - val filePath = new Path(rootPath, remotePath.path) - val fs = filePath.getFileSystem(new Configuration()) - fs.copyFromLocalFile(new Path(localFile.toURI.toString), filePath) - remotePath - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/io/gearpump/jarstore/local/LocalJarStore.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/jarstore/local/LocalJarStore.scala b/daemon/src/main/scala/io/gearpump/jarstore/local/LocalJarStore.scala deleted file mode 100644 index fa1a240..0000000 --- a/daemon/src/main/scala/io/gearpump/jarstore/local/LocalJarStore.scala +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.jarstore.local - -import java.io.File - -import akka.actor.{Actor, Stash} -import akka.pattern.pipe -import org.slf4j.Logger - -import io.gearpump.cluster.ClientToMaster.{GetJarStoreServer, JarStoreServerAddress} -import io.gearpump.util._ - -/** - * LocalJarStore store the uploaded jar on local disk. - */ -class LocalJarStore(rootDirPath: String) extends Actor with Stash { - private val LOG: Logger = LogUtil.getLogger(getClass) - - val host = context.system.settings.config.getString(Constants.GEARPUMP_HOSTNAME) - val rootDirectory = new File(rootDirPath) - - FileUtils.forceMkdir(rootDirectory) - - val server = new FileServer(context.system, host, 0, rootDirectory) - - implicit val timeout = Constants.FUTURE_TIMEOUT - implicit val executionContext = context.dispatcher - - server.start pipeTo self - - def receive: Receive = { - case FileServer.Port(port) => - context.become(listen(port)) - unstashAll() - case _ => - stash() - } - - def listen(port: Int): Receive = { - case GetJarStoreServer => - sender ! JarStoreServerAddress(s"http://$host:$port/") - } - - override def postStop(): Unit = { - server.stop - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/io/gearpump/jarstore/local/LocalJarStoreService.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/jarstore/local/LocalJarStoreService.scala b/daemon/src/main/scala/io/gearpump/jarstore/local/LocalJarStoreService.scala deleted file mode 100644 index 969ce90..0000000 --- a/daemon/src/main/scala/io/gearpump/jarstore/local/LocalJarStoreService.scala +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.jarstore.local - -import java.io.File -import java.util.concurrent.TimeUnit -import scala.collection.JavaConverters._ -import scala.concurrent.duration.Duration -import scala.concurrent.{Await, ExecutionContext, Future} - -import akka.actor.{ActorRef, ActorSystem} -import akka.pattern.ask -import com.typesafe.config.Config -import org.slf4j.Logger - -import io.gearpump.cluster.ClientToMaster.{GetJarStoreServer, JarStoreServerAddress} -import io.gearpump.cluster.master.MasterProxy -import io.gearpump.jarstore.{FilePath, JarStoreService} -import io.gearpump.util._ - -/** - * LocalJarStoreService store the uploaded jar on local disk. - */ -class LocalJarStoreService extends JarStoreService { - private def LOG: Logger = LogUtil.getLogger(getClass) - private implicit val timeout = Constants.FUTURE_TIMEOUT - private var system: akka.actor.ActorSystem = null - private var master: ActorRef = null - private implicit def dispatcher: ExecutionContext = system.dispatcher - - override val scheme: String = "file" - - override def init(config: Config, system: ActorSystem): Unit = { - this.system = system - val masters = config.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS) - .asScala.flatMap(Util.parseHostList) - master = system.actorOf(MasterProxy.props(masters), s"masterproxy${Util.randInt()}") - } - - private lazy val client = (master ? GetJarStoreServer).asInstanceOf[Future[JarStoreServerAddress]] - .map { address => - val client = new FileServer.Client(system, address.url) - client - } - - /** - * This function will copy the remote file to local file system, called from client side. - * - * @param localFile The destination of file path - * @param remotePath The remote file path from JarStore - */ - override def copyToLocalFile(localFile: File, remotePath: FilePath): Unit = { - LOG.info(s"Copying to local file: ${localFile.getAbsolutePath} from $remotePath") - val future = client.flatMap(_.download(remotePath, localFile)) - Await.ready(future, Duration(60, TimeUnit.SECONDS)) - } - - /** - * This function will copy the local file to the remote JarStore, called from client side. - * @param localFile The local file - */ - override def copyFromLocal(localFile: File): FilePath = { - val future = client.flatMap(_.upload(localFile)) - Await.result(future, Duration(60, TimeUnit.SECONDS)) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/io/gearpump/util/FileDirective.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/util/FileDirective.scala b/daemon/src/main/scala/io/gearpump/util/FileDirective.scala deleted file mode 100644 index 1824a22..0000000 --- a/daemon/src/main/scala/io/gearpump/util/FileDirective.scala +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.util - -import java.io.File -import scala.concurrent.{ExecutionContext, Future} - -import akka.http.scaladsl.model.{HttpEntity, MediaTypes, Multipart} -import akka.http.scaladsl.server.Directives._ -import akka.http.scaladsl.server._ -import akka.stream.Materializer -import akka.stream.scaladsl.FileIO -import akka.util.ByteString - -/** - * FileDirective is a set of Akka-http directive to upload/download - * huge binary files to/from Akka-Http server. - */ -object FileDirective { - - // Form field name - type Name = String - - val CHUNK_SIZE = 262144 - - /** - * File information after a file is uploaded to server. - * - * @param originFileName original file name when user upload it in browser. - * @param file file name after the file is saved to server. - * @param length the length of the file - */ - case class FileInfo(originFileName: String, file: File, length: Long) - - class Form(val fields: Map[Name, FormField]) { - def getFile(fieldName: String): Option[FileInfo] = { - fields.get(fieldName).flatMap { - case Left(file) => Option(file) - case Right(_) => None - } - } - - def getValue(fieldName: String): Option[String] = { - fields.get(fieldName).flatMap { - case Left(_) => None - case Right(value) => Option(value) - } - } - } - - type FormField = Either[FileInfo, String] - - /** - * directive to uploadFile, it store the uploaded files - * to temporary directory, and return a Map from form field name - * to FileInfo. - */ - def uploadFile: Directive1[Form] = { - uploadFileTo(null) - } - - /** - * Store the uploaded files to specific rootDirectory. - * - * @param rootDirectory directory to store the files. - * @return - */ - def uploadFileTo(rootDirectory: File): Directive1[Form] = { - Directive[Tuple1[Form]] { inner => - extractMaterializer {implicit mat => - extractExecutionContext {implicit ec => - uploadFileImpl(rootDirectory)(mat, ec) { filesFuture => - ctx => { - filesFuture.map(map => inner(Tuple1(map))).flatMap(route => route(ctx)) - } - } - } - } - } - } - - // Downloads file from server - def downloadFile(file: File): Route = { - val responseEntity = HttpEntity( - MediaTypes.`application/octet-stream`, - file.length, - FileIO.fromFile(file, CHUNK_SIZE)) - complete(responseEntity) - } - - private def uploadFileImpl(rootDirectory: File)(implicit mat: Materializer, ec: ExecutionContext) - : Directive1[Future[Form]] = { - Directive[Tuple1[Future[Form]]] { inner => - entity(as[Multipart.FormData]) { (formdata: Multipart.FormData) => - val form = formdata.parts.mapAsync(1) { p => - if (p.filename.isDefined) { - - // Reserve the suffix - val targetPath = File.createTempFile(s"userfile_${p.name}_", - s"${p.filename.getOrElse("")}", rootDirectory) - val written = p.entity.dataBytes.runWith(FileIO.toFile(targetPath)) - written.map(written => - if (written.count > 0) { - Map(p.name -> Left(FileInfo(p.filename.get, targetPath, written.count))) - } else { - Map.empty[Name, FormField] - }) - } else { - val valueFuture = p.entity.dataBytes.runFold(ByteString.empty) {(total, input) => - total ++ input - } - valueFuture.map{value => - Map(p.name -> Right(value.utf8String)) - } - } - }.runFold(new Form(Map.empty[Name, FormField])) {(set, value) => - new Form(set.fields ++ value) - } - - inner(Tuple1(form)) - } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/io/gearpump/util/FileServer.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/util/FileServer.scala b/daemon/src/main/scala/io/gearpump/util/FileServer.scala deleted file mode 100644 index bf389f7..0000000 --- a/daemon/src/main/scala/io/gearpump/util/FileServer.scala +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.util - -import java.io.File -import scala.concurrent.{ExecutionContext, Future} - -import akka.actor.ActorSystem -import akka.http.scaladsl.Http -import akka.http.scaladsl.Http.ServerBinding -import akka.http.scaladsl.marshalling.Marshal -import akka.http.scaladsl.model.Uri.{Path, Query} -import akka.http.scaladsl.model.{HttpEntity, HttpRequest, MediaTypes, Multipart, _} -import akka.http.scaladsl.server.Directives._ -import akka.http.scaladsl.server._ -import akka.http.scaladsl.unmarshalling.Unmarshal -import akka.stream.ActorMaterializer -import akka.stream.scaladsl.{FileIO, Sink, Source} -import spray.json.DefaultJsonProtocol._ -import spray.json.JsonFormat - -import io.gearpump.jarstore.FilePath -import io.gearpump.util.FileDirective._ -import io.gearpump.util.FileServer.Port - -/** - * A simple file server implemented with akka-http to store/fetch large - * binary files. - */ -class FileServer(system: ActorSystem, host: String, port: Int = 0, rootDirectory: File) { - import system.dispatcher - implicit val actorSystem = system - implicit val materializer = ActorMaterializer() - implicit def ec: ExecutionContext = system.dispatcher - - val route: Route = { - path("upload") { - uploadFileTo(rootDirectory) { form => - val fileName = form.fields.headOption.flatMap { pair => - val (_, fileInfo) = pair - fileInfo match { - case Left(file) => Option(file.file).map(_.getName) - case Right(_) => None - } - } - - if (fileName.isDefined) { - complete(fileName.get) - } else { - failWith(new Exception("File not found in the uploaded form")) - } - } - } ~ - path("download") { - parameters("file") { file: String => - downloadFile(new File(rootDirectory, file)) - } - } ~ - pathEndOrSingleSlash { - extractUri { uri => - val upload = uri.withPath(Uri.Path("/upload")).toString() - val entity = HttpEntity(ContentTypes.`text/html(UTF-8)`, - s""" - | - |<h2>Please specify a file to upload:</h2> - |<form action="$upload" enctype="multipart/form-data" method="post"> - |<input type="file" name="datafile" size="40"> - |</p> - |<div> - |<input type="submit" value="Submit"> - |</div> - |</form> - """.stripMargin) - complete(entity) - } - } - } - - private var connection: Future[ServerBinding] = null - - def start: Future[Port] = { - connection = Http().bindAndHandle(Route.handlerFlow(route), host, port) - connection.map(address => Port(address.localAddress.getPort)) - } - - def stop: Future[Unit] = { - connection.flatMap(_.unbind()) - } -} - -object FileServer { - - implicit def filePathFormat: JsonFormat[FilePath] = jsonFormat1(FilePath.apply) - - case class Port(port: Int) - - /** - * Client of [[io.gearpump.util.FileServer]] - */ - class Client(system: ActorSystem, host: String, port: Int) { - - def this(system: ActorSystem, url: String) = { - this(system, Uri(url).authority.host.address(), Uri(url).authority.port) - } - - private implicit val actorSystem = system - private implicit val materializer = ActorMaterializer() - private implicit val ec = system.dispatcher - - val server = Uri(s"http://$host:$port") - val httpClient = Http(system).outgoingConnection(server.authority.host.address(), - server.authority.port) - - def upload(file: File): Future[FilePath] = { - val target = server.withPath(Path("/upload")) - - val request = entity(file).map { entity => - HttpRequest(HttpMethods.POST, uri = target, entity = entity) - } - - val response = Source.fromFuture(request).via(httpClient).runWith(Sink.head) - response.flatMap { some => - Unmarshal(some).to[String] - }.map { path => - FilePath(path) - } - } - - def download(remoteFile: FilePath, saveAs: File): Future[Unit] = { - val download = server.withPath(Path("/download")).withQuery(Query("file" -> remoteFile.path)) - // Download file to local - val response = Source.single(HttpRequest(uri = download)).via(httpClient).runWith(Sink.head) - val downloaded = response.flatMap { response => - response.entity.dataBytes.runWith(FileIO.toFile(saveAs)) - } - downloaded.map(written => Unit) - } - - private def entity(file: File)(implicit ec: ExecutionContext): Future[RequestEntity] = { - val entity = HttpEntity(MediaTypes.`application/octet-stream`, file.length(), - FileIO.fromFile(file, chunkSize = 100000)) - val body = Source.single( - Multipart.FormData.BodyPart( - "uploadfile", - entity, - Map("filename" -> file.getName))) - val form = Multipart.FormData(body) - - Marshal(form).to[RequestEntity] - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala new file mode 100644 index 0000000..9e55be6 --- /dev/null +++ b/daemon/src/main/scala/org/apache/gearpump/cluster/DaemonMessage.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.cluster + +import akka.actor.ActorRef + +import org.apache.gearpump.cluster.master.Master.MasterInfo +import org.apache.gearpump.cluster.scheduler.Resource +import org.apache.gearpump.cluster.worker.WorkerId + +/** + * Cluster Bootup Flow + */ +object WorkerToMaster { + + /** When an worker is started, it sends RegisterNewWorker */ + case object RegisterNewWorker + + /** When worker lose connection with master, it tries to register itself again with old Id. */ + case class RegisterWorker(workerId: WorkerId) + + /** Worker is responsible to broadcast its current status to master */ + case class ResourceUpdate(worker: ActorRef, workerId: WorkerId, resource: Resource) +} + +object MasterToWorker { + + /** Master confirm the reception of RegisterNewWorker or RegisterWorker */ + case class WorkerRegistered(workerId: WorkerId, masterInfo: MasterInfo) + + /** Worker have not received reply from master */ + case class UpdateResourceFailed(reason: String = null, ex: Throwable = null) + + /** Master is synced with worker on resource slots managed by current worker */ + case object UpdateResourceSucceed +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala new file mode 100644 index 0000000..9bde4d1 --- /dev/null +++ b/daemon/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.embedded + +import scala.collection.JavaConverters._ +import scala.concurrent.Await +import scala.concurrent.duration.Duration + +import akka.actor.{ActorRef, ActorSystem, Props} +import com.typesafe.config.{Config, ConfigValueFactory} + +import org.apache.gearpump.cluster.ClusterConfig +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.cluster.master.{Master => MasterActor} +import org.apache.gearpump.cluster.worker.{Worker => WorkerActor} +import org.apache.gearpump.util.Constants.{GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS, GEARPUMP_CLUSTER_MASTERS, GEARPUMP_METRIC_ENABLED, MASTER} +import org.apache.gearpump.util.{LogUtil, Util} + +/** + * Create a in-process cluster with single worker + */ +class EmbeddedCluster(inputConfig: Config) { + + private val workerCount: Int = 1 + private var _master: ActorRef = null + private var _system: ActorSystem = null + private var _config: Config = null + + private val LOG = LogUtil.getLogger(getClass) + + def start(): Unit = { + val port = Util.findFreePort().get + val akkaConf = getConfig(inputConfig, port) + _config = akkaConf + val system = ActorSystem(MASTER, akkaConf) + + val master = system.actorOf(Props[MasterActor], MASTER) + + 0.until(workerCount).foreach { id => + system.actorOf(Props(classOf[WorkerActor], master), classOf[WorkerActor].getSimpleName + id) + } + this._master = master + this._system = system + + LOG.info("=================================") + LOG.info("Local Cluster is started at: ") + LOG.info(s" 127.0.0.1:$port") + LOG.info(s"To see UI, run command: services -master 127.0.0.1:$port") + } + + private def getConfig(inputConfig: Config, port: Int): Config = { + val config = inputConfig. + withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(port)). + withValue(GEARPUMP_CLUSTER_MASTERS, + ConfigValueFactory.fromIterable(List(s"127.0.0.1:$port").asJava)). + withValue(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS, + ConfigValueFactory.fromAnyRef(true)). + withValue(GEARPUMP_METRIC_ENABLED, ConfigValueFactory.fromAnyRef(true)). + withValue("akka.actor.provider", + ConfigValueFactory.fromAnyRef("akka.cluster.ClusterActorRefProvider")) + config + } + + def newClientContext: ClientContext = { + ClientContext(_config, _system, _master) + } + + def stop(): Unit = { + _system.stop(_master) + _system.terminate() + Await.result(_system.whenTerminated, Duration.Inf) + } +} + +object EmbeddedCluster { + def apply(): EmbeddedCluster = { + new EmbeddedCluster(ClusterConfig.master()) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala new file mode 100644 index 0000000..98ec707 --- /dev/null +++ b/daemon/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.cluster.main + +import java.io.File +import java.net.{URL, URLClassLoader} +import java.util.jar.JarFile +import scala.util.Try + +import org.slf4j.Logger + +import org.apache.gearpump.util.{AkkaApp, Constants, LogUtil, Util} + +/** Tool to submit an application jar to cluster */ +object AppSubmitter extends AkkaApp with ArgumentsParser { + val LOG: Logger = LogUtil.getLogger(getClass) + + override val ignoreUnknownArgument = true + + override val description = "Submit an application to Master by providing a jar" + + override val options: Array[(String, CLIOption[Any])] = Array( + "namePrefix" -> CLIOption[String]("<application name prefix>", required = false, + defaultValue = Some("")), + "jar" -> CLIOption("<application>.jar", required = true), + "executors" -> CLIOption[Int]("number of executor to launch", required = false, + defaultValue = Some(1)), + "verbose" -> CLIOption("<print verbose log on console>", required = false, + defaultValue = Some(false)), + // For document purpose only, OPTION_CONFIG option is not used here. + // OPTION_CONFIG is parsed by parent shell command "Gear" transparently. + Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false, + defaultValue = None)) + + def main(akkaConf: Config, args: Array[String]): Unit = { + + val config = parse(args) + if (null != config) { + + val verbose = config.getBoolean("verbose") + if (verbose) { + LogUtil.verboseLogToConsole() + } + + val jar = config.getString("jar") + + // Set jar path to be submitted to cluster + System.setProperty(Constants.GEARPUMP_APP_JAR, jar) + System.setProperty(Constants.APPLICATION_EXECUTOR_NUMBER, config.getInt("executors").toString) + + val namePrefix = config.getString("namePrefix") + if (namePrefix.nonEmpty) { + if (!Util.validApplicationName(namePrefix)) { + throw new Exception(s"$namePrefix is not a valid prefix for an application name") + } + System.setProperty(Constants.GEARPUMP_APP_NAME_PREFIX, namePrefix) + } + + val jarFile = new java.io.File(jar) + + // Start main class + if (!jarFile.exists()) { + throw new Exception(s"jar $jar does not exist") + } + + val classLoader: URLClassLoader = new URLClassLoader(Array(new URL("file:" + + jarFile.getAbsolutePath)), Thread.currentThread().getContextClassLoader()) + val (main, arguments) = parseMain(jarFile, config.remainArgs, classLoader) + + // Set to context classloader. ActorSystem pick context classloader in preference + Thread.currentThread().setContextClassLoader(classLoader) + val clazz = classLoader.loadClass(main) + val mainMethod = clazz.getMethod("main", classOf[Array[String]]) + mainMethod.invoke(null, arguments) + } + } + + private def parseMain(jar: File, remainArgs: Array[String], classLoader: ClassLoader) + : (String, Array[String]) = { + val mainInManifest = Option(new JarFile(jar).getManifest.getMainAttributes. + getValue("Main-Class")).getOrElse("") + + if (remainArgs.length > 0 && Try(classLoader.loadClass(remainArgs(0))).isSuccess) { + (remainArgs(0), remainArgs.drop(1)) + } else if (mainInManifest.nonEmpty) { + (mainInManifest, remainArgs) + } else { + throw new Exception("No main class specified") + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/cluster/main/Gear.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Gear.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Gear.scala new file mode 100644 index 0000000..672fee6 --- /dev/null +++ b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Gear.scala @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.cluster.main + +import org.slf4j.Logger + +import org.apache.gearpump.util.{Constants, LogUtil} + +object Gear { + + val OPTION_CONFIG = "conf" + + private val LOG: Logger = LogUtil.getLogger(getClass) + + val commands = Map("app" -> AppSubmitter, "kill" -> Kill, + "info" -> Info, "replay" -> Replay, "main" -> MainRunner) + + def usage(): Unit = { + val keys = commands.keys.toList.sorted + // scalastyle:off println + Console.err.println("Usage: " + "<" + keys.mkString("|") + ">") + // scalastyle:on println + } + + private def executeCommand(command: String, commandArgs: Array[String]) = { + commands.get(command).map(_.main(commandArgs)) + if (!commands.contains(command)) { + val allArgs = (command +: commandArgs.toList).toArray + MainRunner.main(allArgs) + } + } + + def main(inputArgs: Array[String]): Unit = { + val (configFile, args) = extractConfig(inputArgs) + if (configFile != null) { + // Sets custom config file... + System.setProperty(Constants.GEARPUMP_CUSTOM_CONFIG_FILE, configFile) + } + + if (args.length == 0) { + usage() + } else { + val command = args(0) + val commandArgs = args.drop(1) + executeCommand(command, commandArgs) + } + } + + private def extractConfig(inputArgs: Array[String]): (String, Array[String]) = { + var index = 0 + + var result = List.empty[String] + var configFile: String = null + while (index < inputArgs.length) { + val item = inputArgs(index) + if (item == s"-$OPTION_CONFIG") { + index += 1 + configFile = inputArgs(index) + } else { + result = result :+ item + } + index += 1 + } + (configFile, result.toArray) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/cluster/main/Info.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Info.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Info.scala new file mode 100644 index 0000000..bf444a3 --- /dev/null +++ b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Info.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.cluster.main + +import org.slf4j.Logger + +import org.apache.gearpump.cluster.MasterToAppMaster.AppMastersData +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.util.{AkkaApp, LogUtil} + +/** Tool to query master info */ +object Info extends AkkaApp with ArgumentsParser { + + private val LOG: Logger = LogUtil.getLogger(getClass) + + override val options: Array[(String, CLIOption[Any])] = Array( + // For document purpose only, OPTION_CONFIG option is not used here. + // OPTION_CONFIG is parsed by parent shell command "Gear" transparently. + Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false, + defaultValue = None)) + + override val description = "Query the Application list" + + // scalastyle:off println + def main(akkaConf: Config, args: Array[String]): Unit = { + val client = ClientContext(akkaConf) + + val AppMastersData(appMasters) = client.listApps + Console.println("== Application Information ==") + Console.println("====================================") + appMasters.foreach { appData => + Console.println(s"application: ${appData.appId}, name: ${appData.appName}, " + + s"status: ${appData.status}, worker: ${appData.workerPath}") + } + client.close() + } + // scalastyle:on println +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/cluster/main/Kill.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Kill.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Kill.scala new file mode 100644 index 0000000..17f6214 --- /dev/null +++ b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Kill.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.main + +import org.slf4j.Logger + +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.util.{AkkaApp, LogUtil} + +/** Tool to kill an App */ +object Kill extends AkkaApp with ArgumentsParser { + + private val LOG: Logger = LogUtil.getLogger(getClass) + + override val options: Array[(String, CLIOption[Any])] = Array( + "appid" -> CLIOption("<application id>", required = true), + // For document purpose only, OPTION_CONFIG option is not used here. + // OPTION_CONFIG is parsed by parent shell command "Gear" transparently. + Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false, + defaultValue = None)) + + override val description = "Kill an application with application Id" + + def main(akkaConf: Config, args: Array[String]): Unit = { + val config = parse(args) + + if (null != config) { + val client = ClientContext(akkaConf) + LOG.info("Client ") + client.shutdown(config.getInt("appid")) + client.close() + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/cluster/main/Local.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Local.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Local.scala new file mode 100644 index 0000000..db71b7b --- /dev/null +++ b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Local.scala @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.main + +import scala.collection.JavaConverters._ +import scala.concurrent.Await +import scala.concurrent.duration.Duration + +import akka.actor.{ActorSystem, Props} +import com.typesafe.config.ConfigValueFactory +import org.slf4j.Logger + +import org.apache.gearpump.cluster.ClusterConfig +import org.apache.gearpump.cluster.master.{Master => MasterActor} +import org.apache.gearpump.cluster.worker.{Worker => WorkerActor} +import org.apache.gearpump.util.Constants._ +import org.apache.gearpump.util.LogUtil.ProcessType +import org.apache.gearpump.util.{ActorUtil, AkkaApp, Constants, LogUtil, Util} + +object Local extends AkkaApp with ArgumentsParser { + override def akkaConfig: Config = ClusterConfig.master() + + var LOG: Logger = LogUtil.getLogger(getClass) + + override val options: Array[(String, CLIOption[Any])] = + Array("sameprocess" -> CLIOption[Boolean]("", required = false, defaultValue = Some(false)), + "workernum" -> CLIOption[Int]("<how many workers to start>", required = false, + defaultValue = Some(2))) + + override val description = "Start a local cluster" + + def main(akkaConf: Config, args: Array[String]): Unit = { + + this.LOG = { + LogUtil.loadConfiguration(akkaConf, ProcessType.LOCAL) + LogUtil.getLogger(getClass) + } + + val config = parse(args) + if (null != config) { + local(config.getInt("workernum"), config.getBoolean("sameprocess"), akkaConf) + } + } + + def local(workerCount: Int, sameProcess: Boolean, akkaConf: Config): Unit = { + if (sameProcess) { + LOG.info("Starting local in same process") + System.setProperty("LOCAL", "true") + } + val masters = akkaConf.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS) + .asScala.flatMap(Util.parseHostList) + val local = akkaConf.getString(Constants.GEARPUMP_HOSTNAME) + + if (masters.size != 1 && masters.head.host != local) { + LOG.error(s"The ${Constants.GEARPUMP_CLUSTER_MASTERS} is not match " + + s"with ${Constants.GEARPUMP_HOSTNAME}") + } else { + + val hostPort = masters.head + implicit val system = ActorSystem(MASTER, akkaConf. + withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(hostPort.port)) + ) + + val master = system.actorOf(Props[MasterActor], MASTER) + val masterPath = ActorUtil.getSystemAddress(system).toString + s"/user/$MASTER" + + 0.until(workerCount).foreach { id => + system.actorOf(Props(classOf[WorkerActor], master), classOf[WorkerActor].getSimpleName + id) + } + + Await.result(system.whenTerminated, Duration.Inf) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/cluster/main/MainRunner.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/MainRunner.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/MainRunner.scala new file mode 100644 index 0000000..c6c9f10 --- /dev/null +++ b/daemon/src/main/scala/org/apache/gearpump/cluster/main/MainRunner.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.main + +import org.slf4j.Logger + +import org.apache.gearpump.util.{AkkaApp, LogUtil} + +/** Tool to run any main class by providing a jar */ +object MainRunner extends AkkaApp with ArgumentsParser { + private val LOG: Logger = LogUtil.getLogger(getClass) + + override val options: Array[(String, CLIOption[Any])] = Array( + // For document purpose only, OPTION_CONFIG option is not used here. + // OPTION_CONFIG is parsed by parent shell command "Gear" transparently. + Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false, + defaultValue = None)) + + def main(akkaConf: Config, args: Array[String]): Unit = { + val mainClazz = args(0) + val commandArgs = args.drop(1) + + val clazz = Thread.currentThread().getContextClassLoader().loadClass(mainClazz) + val mainMethod = clazz.getMethod("main", classOf[Array[String]]) + mainMethod.invoke(null, commandArgs) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/cluster/main/Master.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Master.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Master.scala new file mode 100644 index 0000000..f1b9bdf --- /dev/null +++ b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Master.scala @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.main + +import java.util.concurrent.TimeUnit +import scala.collection.JavaConverters._ +import scala.collection.immutable +import scala.concurrent.Await +import scala.concurrent.duration._ + +import akka.actor._ +import akka.cluster.ClusterEvent._ +import akka.cluster.ddata.DistributedData +import akka.cluster.singleton.{ClusterSingletonManager, ClusterSingletonManagerSettings, ClusterSingletonProxy, ClusterSingletonProxySettings} +import akka.cluster.{Cluster, Member, MemberStatus} +import com.typesafe.config.ConfigValueFactory +import org.slf4j.Logger + +import org.apache.gearpump.cluster.ClusterConfig +import org.apache.gearpump.cluster.master.{Master => MasterActor, MasterNode} +import org.apache.gearpump.cluster.master.Master.MasterListUpdated +import org.apache.gearpump.util.Constants._ +import org.apache.gearpump.util.LogUtil.ProcessType +import org.apache.gearpump.util.{AkkaApp, Constants, LogUtil} + +object Master extends AkkaApp with ArgumentsParser { + + private var LOG: Logger = LogUtil.getLogger(getClass) + + override def akkaConfig: Config = ClusterConfig.master() + + override val options: Array[(String, CLIOption[Any])] = + Array("ip" -> CLIOption[String]("<master ip address>", required = true), + "port" -> CLIOption("<master port>", required = true)) + + override val description = "Start Master daemon" + + def main(akkaConf: Config, args: Array[String]): Unit = { + + this.LOG = { + LogUtil.loadConfiguration(akkaConf, ProcessType.MASTER) + LogUtil.getLogger(getClass) + } + + val config = parse(args) + master(config.getString("ip"), config.getInt("port"), akkaConf) + } + + private def verifyMaster(master: String, port: Int, masters: Iterable[String]) = { + masters.exists { hostPort => + hostPort == s"$master:$port" + } + } + + private def master(ip: String, port: Int, akkaConf: Config): Unit = { + val masters = akkaConf.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).asScala + + if (!verifyMaster(ip, port, masters)) { + LOG.error(s"The provided ip $ip and port $port doesn't conform with config at " + + s"gearpump.cluster.masters: ${masters.mkString(", ")}") + System.exit(-1) + } + + val masterList = masters.map(master => s"akka.tcp://${MASTER}@$master").toList.asJava + val quorum = masterList.size() / 2 + 1 + val masterConfig = akkaConf. + withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(port)). + withValue(NETTY_TCP_HOSTNAME, ConfigValueFactory.fromAnyRef(ip)). + withValue("akka.cluster.seed-nodes", ConfigValueFactory.fromAnyRef(masterList)). + withValue(s"akka.cluster.role.${MASTER}.min-nr-of-members", + ConfigValueFactory.fromAnyRef(quorum)) + + LOG.info(s"Starting Master Actor system $ip:$port, master list: ${masters.mkString(";")}") + val system = ActorSystem(MASTER, masterConfig) + + val replicator = DistributedData(system).replicator + LOG.info(s"Replicator path: ${replicator.path}") + + // Starts singleton manager + val singletonManager = system.actorOf(ClusterSingletonManager.props( + singletonProps = Props(classOf[MasterWatcher], MASTER), + terminationMessage = PoisonPill, + settings = ClusterSingletonManagerSettings(system).withSingletonName(MASTER_WATCHER) + .withRole(MASTER)), + name = SINGLETON_MANAGER) + + // Start master proxy + val masterProxy = system.actorOf(ClusterSingletonProxy.props( + singletonManagerPath = s"/user/${SINGLETON_MANAGER}", + // The effective singleton is s"${MASTER_WATCHER}/$MASTER" instead of s"${MASTER_WATCHER}". + // Master is created when there is a majority of machines started. + settings = ClusterSingletonProxySettings(system) + .withSingletonName(s"${MASTER_WATCHER}/$MASTER").withRole(MASTER)), + name = MASTER + ) + + LOG.info(s"master proxy is started at ${masterProxy.path}") + + val mainThread = Thread.currentThread() + Runtime.getRuntime().addShutdownHook(new Thread() { + override def run(): Unit = { + if (!system.whenTerminated.isCompleted) { + LOG.info("Triggering shutdown hook....") + + system.stop(masterProxy) + val cluster = Cluster(system) + cluster.leave(cluster.selfAddress) + cluster.down(cluster.selfAddress) + try { + Await.result(system.whenTerminated, Duration(3, TimeUnit.SECONDS)) + } catch { + case ex: Exception => // Ignore + } + system.terminate() + mainThread.join() + } + } + }) + + Await.result(system.whenTerminated, Duration.Inf) + } +} + +class MasterWatcher(role: String) extends Actor with ActorLogging { + import context.dispatcher + + val cluster = Cluster(context.system) + + val config = context.system.settings.config + val masters = config.getList("akka.cluster.seed-nodes") + val quorum = masters.size() / 2 + 1 + + val system = context.system + + // Sorts by age, oldest first + val ageOrdering = Ordering.fromLessThan[Member] { (a, b) => a.isOlderThan(b) } + var membersByAge: immutable.SortedSet[Member] = immutable.SortedSet.empty(ageOrdering) + + def receive: Receive = null + + // Subscribes to MemberEvent, re-subscribe when restart + override def preStart(): Unit = { + cluster.subscribe(self, classOf[MemberEvent]) + context.become(waitForInit) + } + override def postStop(): Unit = { + cluster.unsubscribe(self) + } + + def matchingRole(member: Member): Boolean = member.hasRole(role) + + def waitForInit: Receive = { + case state: CurrentClusterState => { + membersByAge = immutable.SortedSet.empty(ageOrdering) ++ state.members.filter(m => + m.status == MemberStatus.Up && matchingRole(m)) + + if (membersByAge.size < quorum) { + membersByAge.iterator.mkString(",") + log.info(s"We cannot get a quorum, $quorum, " + + s"shutting down...${membersByAge.iterator.mkString(",")}") + context.become(waitForShutdown) + self ! MasterWatcher.Shutdown + } else { + val master = context.actorOf(Props(classOf[MasterActor]), MASTER) + notifyMasterMembersChange(master) + context.become(waitForClusterEvent(master)) + } + } + } + + def waitForClusterEvent(master: ActorRef): Receive = { + case MemberUp(m) if matchingRole(m) => { + membersByAge += m + notifyMasterMembersChange(master) + } + case mEvent: MemberEvent if (mEvent.isInstanceOf[MemberExited] || + mEvent.isInstanceOf[MemberRemoved]) && matchingRole(mEvent.member) => { + log.info(s"member removed ${mEvent.member}") + val m = mEvent.member + membersByAge -= m + if (membersByAge.size < quorum) { + log.info(s"We cannot get a quorum, $quorum, " + + s"shutting down...${membersByAge.iterator.mkString(",")}") + context.become(waitForShutdown) + self ! MasterWatcher.Shutdown + } else { + notifyMasterMembersChange(master) + } + } + } + + private def notifyMasterMembersChange(master: ActorRef): Unit = { + val masters = membersByAge.toList.map{ member => + MasterNode(member.address.host.getOrElse("Unknown-Host"), + member.address.port.getOrElse(0)) + } + master ! MasterListUpdated(masters) + } + + def waitForShutdown: Receive = { + case MasterWatcher.Shutdown => { + cluster.unsubscribe(self) + cluster.leave(cluster.selfAddress) + context.stop(self) + system.scheduler.scheduleOnce(Duration.Zero) { + try { + Await.result(system.whenTerminated, Duration(3, TimeUnit.SECONDS)) + } catch { + case ex: Exception => // Ignore + } + system.terminate() + } + } + } +} + +object MasterWatcher { + object Shutdown +} + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/cluster/main/Replay.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Replay.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Replay.scala new file mode 100644 index 0000000..d721832 --- /dev/null +++ b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Replay.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.cluster.main + +import org.slf4j.Logger + +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.util.{AkkaApp, LogUtil} + +// Internal tool to restart an application +object Replay extends AkkaApp with ArgumentsParser { + + private val LOG: Logger = LogUtil.getLogger(getClass) + + override val options: Array[(String, CLIOption[Any])] = Array( + "appid" -> CLIOption("<application id>", required = true), + // For document purpose only, OPTION_CONFIG option is not used here. + // OPTION_CONFIG is parsed by parent shell command "Gear" transparently. + Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false, + defaultValue = None)) + + override val description = "Replay the application from current min clock(low watermark)" + + def main(akkaConf: Config, args: Array[String]): Unit = { + val config = parse(args) + + if (null != config) { + val client = ClientContext(akkaConf) + client.replayFromTimestampWindowTrailingEdge(config.getInt("appid")) + client.close() + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala new file mode 100644 index 0000000..58a9dec --- /dev/null +++ b/daemon/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.main + +import scala.collection.JavaConverters._ +import scala.concurrent.Await +import scala.concurrent.duration.Duration + +import akka.actor.{ActorSystem, Props} +import org.slf4j.Logger + +import org.apache.gearpump.cluster.ClusterConfig +import org.apache.gearpump.cluster.master.MasterProxy +import org.apache.gearpump.transport.HostPort +import org.apache.gearpump.util.Constants._ +import org.apache.gearpump.cluster.worker.{Worker => WorkerActor} +import org.apache.gearpump.util.LogUtil.ProcessType +import org.apache.gearpump.util.{AkkaApp, LogUtil} + +/** Tool to start a worker daemon process */ +object Worker extends AkkaApp with ArgumentsParser { + protected override def akkaConfig = ClusterConfig.worker() + + override val description = "Start a worker daemon" + + var LOG: Logger = LogUtil.getLogger(getClass) + + private def uuid = java.util.UUID.randomUUID.toString + + def main(akkaConf: Config, args: Array[String]): Unit = { + val id = uuid + + this.LOG = { + LogUtil.loadConfiguration(akkaConf, ProcessType.WORKER) + // Delay creation of LOG instance to avoid creating an empty log file as we + // reset the log file name here + LogUtil.getLogger(getClass) + } + + val system = ActorSystem(id, akkaConf) + + val masterAddress = akkaConf.getStringList(GEARPUMP_CLUSTER_MASTERS).asScala.map { address => + val hostAndPort = address.split(":") + HostPort(hostAndPort(0), hostAndPort(1).toInt) + } + + LOG.info(s"Trying to connect to masters " + masterAddress.mkString(",") + "...") + val masterProxy = system.actorOf(MasterProxy.props(masterAddress), s"masterproxy${system.name}") + + system.actorOf(Props(classOf[WorkerActor], masterProxy), + classOf[WorkerActor].getSimpleName + id) + + Await.result(system.whenTerminated, Duration.Inf) + } +} \ No newline at end of file
