http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/io/gearpump/cluster/main/AppSubmitter.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/main/AppSubmitter.scala b/daemon/src/main/scala/io/gearpump/cluster/main/AppSubmitter.scala deleted file mode 100644 index 68f778e..0000000 --- a/daemon/src/main/scala/io/gearpump/cluster/main/AppSubmitter.scala +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.cluster.main - -import java.io.File -import java.net.{URL, URLClassLoader} -import java.util.jar.JarFile -import scala.util.Try - -import org.slf4j.Logger - -import io.gearpump.util.{AkkaApp, Constants, LogUtil, Util} - -/** Tool to submit an application jar to cluster */ -object AppSubmitter extends AkkaApp with ArgumentsParser { - val LOG: Logger = LogUtil.getLogger(getClass) - - override val ignoreUnknownArgument = true - - override val description = "Submit an application to Master by providing a jar" - - override val options: Array[(String, CLIOption[Any])] = Array( - "namePrefix" -> CLIOption[String]("<application name prefix>", required = false, - defaultValue = Some("")), - "jar" -> CLIOption("<application>.jar", required = true), - "executors" -> CLIOption[Int]("number of executor to launch", required = false, - defaultValue = Some(1)), - "verbose" -> CLIOption("<print verbose log on console>", required = false, - defaultValue = Some(false)), - // For document purpose only, OPTION_CONFIG option is not used here. - // OPTION_CONFIG is parsed by parent shell command "Gear" transparently. - Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false, - defaultValue = None)) - - def main(akkaConf: Config, args: Array[String]): Unit = { - - val config = parse(args) - if (null != config) { - - val verbose = config.getBoolean("verbose") - if (verbose) { - LogUtil.verboseLogToConsole() - } - - val jar = config.getString("jar") - - // Set jar path to be submitted to cluster - System.setProperty(Constants.GEARPUMP_APP_JAR, jar) - System.setProperty(Constants.APPLICATION_EXECUTOR_NUMBER, config.getInt("executors").toString) - - val namePrefix = config.getString("namePrefix") - if (namePrefix.nonEmpty) { - if (!Util.validApplicationName(namePrefix)) { - throw new Exception(s"$namePrefix is not a valid prefix for an application name") - } - System.setProperty(Constants.GEARPUMP_APP_NAME_PREFIX, namePrefix) - } - - val jarFile = new java.io.File(jar) - - // Start main class - if (!jarFile.exists()) { - throw new Exception(s"jar $jar does not exist") - } - - val classLoader: URLClassLoader = new URLClassLoader(Array(new URL("file:" + - jarFile.getAbsolutePath)), Thread.currentThread().getContextClassLoader()) - val (main, arguments) = parseMain(jarFile, config.remainArgs, classLoader) - - // Set to context classloader. ActorSystem pick context classloader in preference - Thread.currentThread().setContextClassLoader(classLoader) - val clazz = classLoader.loadClass(main) - val mainMethod = clazz.getMethod("main", classOf[Array[String]]) - mainMethod.invoke(null, arguments) - } - } - - private def parseMain(jar: File, remainArgs: Array[String], classLoader: ClassLoader) - : (String, Array[String]) = { - val mainInManifest = Option(new JarFile(jar).getManifest.getMainAttributes. - getValue("Main-Class")).getOrElse("") - - if (remainArgs.length > 0 && Try(classLoader.loadClass(remainArgs(0))).isSuccess) { - (remainArgs(0), remainArgs.drop(1)) - } else if (mainInManifest.nonEmpty) { - (mainInManifest, remainArgs) - } else { - throw new Exception("No main class specified") - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/io/gearpump/cluster/main/Gear.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/main/Gear.scala b/daemon/src/main/scala/io/gearpump/cluster/main/Gear.scala deleted file mode 100644 index 4423727..0000000 --- a/daemon/src/main/scala/io/gearpump/cluster/main/Gear.scala +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.cluster.main - -import org.slf4j.Logger - -import io.gearpump.util.{Constants, LogUtil} - -object Gear { - - val OPTION_CONFIG = "conf" - - private val LOG: Logger = LogUtil.getLogger(getClass) - - val commands = Map("app" -> AppSubmitter, "kill" -> Kill, - "info" -> Info, "replay" -> Replay, "main" -> MainRunner) - - def usage(): Unit = { - val keys = commands.keys.toList.sorted - // scalastyle:off println - Console.err.println("Usage: " + "<" + keys.mkString("|") + ">") - // scalastyle:on println - } - - private def executeCommand(command: String, commandArgs: Array[String]) = { - commands.get(command).map(_.main(commandArgs)) - if (!commands.contains(command)) { - val allArgs = (command +: commandArgs.toList).toArray - MainRunner.main(allArgs) - } - } - - def main(inputArgs: Array[String]): Unit = { - val (configFile, args) = extractConfig(inputArgs) - if (configFile != null) { - // Sets custom config file... - System.setProperty(Constants.GEARPUMP_CUSTOM_CONFIG_FILE, configFile) - } - - if (args.length == 0) { - usage() - } else { - val command = args(0) - val commandArgs = args.drop(1) - executeCommand(command, commandArgs) - } - } - - private def extractConfig(inputArgs: Array[String]): (String, Array[String]) = { - var index = 0 - - var result = List.empty[String] - var configFile: String = null - while (index < inputArgs.length) { - val item = inputArgs(index) - if (item == s"-$OPTION_CONFIG") { - index += 1 - configFile = inputArgs(index) - } else { - result = result :+ item - } - index += 1 - } - (configFile, result.toArray) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/io/gearpump/cluster/main/Info.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/main/Info.scala b/daemon/src/main/scala/io/gearpump/cluster/main/Info.scala deleted file mode 100644 index 4922690..0000000 --- a/daemon/src/main/scala/io/gearpump/cluster/main/Info.scala +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.cluster.main - -import org.slf4j.Logger - -import io.gearpump.cluster.MasterToAppMaster.AppMastersData -import io.gearpump.cluster.client.ClientContext -import io.gearpump.util.{AkkaApp, LogUtil} - -/** Tool to query master info */ -object Info extends AkkaApp with ArgumentsParser { - - private val LOG: Logger = LogUtil.getLogger(getClass) - - override val options: Array[(String, CLIOption[Any])] = Array( - // For document purpose only, OPTION_CONFIG option is not used here. - // OPTION_CONFIG is parsed by parent shell command "Gear" transparently. - Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false, - defaultValue = None)) - - override val description = "Query the Application list" - - // scalastyle:off println - def main(akkaConf: Config, args: Array[String]): Unit = { - val client = ClientContext(akkaConf) - - val AppMastersData(appMasters) = client.listApps - Console.println("== Application Information ==") - Console.println("====================================") - appMasters.foreach { appData => - Console.println(s"application: ${appData.appId}, name: ${appData.appName}, " + - s"status: ${appData.status}, worker: ${appData.workerPath}") - } - client.close() - } - // scalastyle:on println -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/io/gearpump/cluster/main/Kill.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/main/Kill.scala b/daemon/src/main/scala/io/gearpump/cluster/main/Kill.scala deleted file mode 100644 index 3ce781f..0000000 --- a/daemon/src/main/scala/io/gearpump/cluster/main/Kill.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.cluster.main - -import org.slf4j.Logger - -import io.gearpump.cluster.client.ClientContext -import io.gearpump.util.{AkkaApp, LogUtil} - -/** Tool to kill an App */ -object Kill extends AkkaApp with ArgumentsParser { - - private val LOG: Logger = LogUtil.getLogger(getClass) - - override val options: Array[(String, CLIOption[Any])] = Array( - "appid" -> CLIOption("<application id>", required = true), - // For document purpose only, OPTION_CONFIG option is not used here. - // OPTION_CONFIG is parsed by parent shell command "Gear" transparently. - Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false, - defaultValue = None)) - - override val description = "Kill an application with application Id" - - def main(akkaConf: Config, args: Array[String]): Unit = { - val config = parse(args) - - if (null != config) { - val client = ClientContext(akkaConf) - LOG.info("Client ") - client.shutdown(config.getInt("appid")) - client.close() - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/io/gearpump/cluster/main/Local.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/main/Local.scala b/daemon/src/main/scala/io/gearpump/cluster/main/Local.scala deleted file mode 100644 index d5681df..0000000 --- a/daemon/src/main/scala/io/gearpump/cluster/main/Local.scala +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.cluster.main - -import scala.collection.JavaConverters._ -import scala.concurrent.Await -import scala.concurrent.duration.Duration - -import akka.actor.{ActorSystem, Props} -import com.typesafe.config.ConfigValueFactory -import org.slf4j.Logger - -import io.gearpump.cluster.ClusterConfig -import io.gearpump.cluster.master.{Master => MasterActor} -import io.gearpump.cluster.worker.{Worker => WorkerActor} -import io.gearpump.util.Constants._ -import io.gearpump.util.LogUtil.ProcessType -import io.gearpump.util.{ActorUtil, AkkaApp, Constants, LogUtil, Util} - -object Local extends AkkaApp with ArgumentsParser { - override def akkaConfig: Config = ClusterConfig.master() - - var LOG: Logger = LogUtil.getLogger(getClass) - - override val options: Array[(String, CLIOption[Any])] = - Array("sameprocess" -> CLIOption[Boolean]("", required = false, defaultValue = Some(false)), - "workernum" -> CLIOption[Int]("<how many workers to start>", required = false, - defaultValue = Some(2))) - - override val description = "Start a local cluster" - - def main(akkaConf: Config, args: Array[String]): Unit = { - - this.LOG = { - LogUtil.loadConfiguration(akkaConf, ProcessType.LOCAL) - LogUtil.getLogger(getClass) - } - - val config = parse(args) - if (null != config) { - local(config.getInt("workernum"), config.getBoolean("sameprocess"), akkaConf) - } - } - - def local(workerCount: Int, sameProcess: Boolean, akkaConf: Config): Unit = { - if (sameProcess) { - LOG.info("Starting local in same process") - System.setProperty("LOCAL", "true") - } - val masters = akkaConf.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS) - .asScala.flatMap(Util.parseHostList) - val local = akkaConf.getString(Constants.GEARPUMP_HOSTNAME) - - if (masters.size != 1 && masters.head.host != local) { - LOG.error(s"The ${Constants.GEARPUMP_CLUSTER_MASTERS} is not match " + - s"with ${Constants.GEARPUMP_HOSTNAME}") - } else { - - val hostPort = masters.head - implicit val system = ActorSystem(MASTER, akkaConf. - withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(hostPort.port)) - ) - - val master = system.actorOf(Props[MasterActor], MASTER) - val masterPath = ActorUtil.getSystemAddress(system).toString + s"/user/$MASTER" - - 0.until(workerCount).foreach { id => - system.actorOf(Props(classOf[WorkerActor], master), classOf[WorkerActor].getSimpleName + id) - } - - Await.result(system.whenTerminated, Duration.Inf) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/io/gearpump/cluster/main/MainRunner.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/main/MainRunner.scala b/daemon/src/main/scala/io/gearpump/cluster/main/MainRunner.scala deleted file mode 100644 index 923a646..0000000 --- a/daemon/src/main/scala/io/gearpump/cluster/main/MainRunner.scala +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.cluster.main - -import org.slf4j.Logger - -import io.gearpump.util.{AkkaApp, LogUtil} - -/** Tool to run any main class by providing a jar */ -object MainRunner extends AkkaApp with ArgumentsParser { - private val LOG: Logger = LogUtil.getLogger(getClass) - - override val options: Array[(String, CLIOption[Any])] = Array( - // For document purpose only, OPTION_CONFIG option is not used here. - // OPTION_CONFIG is parsed by parent shell command "Gear" transparently. - Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false, - defaultValue = None)) - - def main(akkaConf: Config, args: Array[String]): Unit = { - val mainClazz = args(0) - val commandArgs = args.drop(1) - - val clazz = Thread.currentThread().getContextClassLoader().loadClass(mainClazz) - val mainMethod = clazz.getMethod("main", classOf[Array[String]]) - mainMethod.invoke(null, commandArgs) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/io/gearpump/cluster/main/Master.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/main/Master.scala b/daemon/src/main/scala/io/gearpump/cluster/main/Master.scala deleted file mode 100644 index eac1c54..0000000 --- a/daemon/src/main/scala/io/gearpump/cluster/main/Master.scala +++ /dev/null @@ -1,236 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.cluster.main - -import java.util.concurrent.TimeUnit -import scala.collection.JavaConverters._ -import scala.collection.immutable -import scala.concurrent.Await -import scala.concurrent.duration._ - -import akka.actor._ -import akka.cluster.ClusterEvent._ -import akka.cluster.ddata.DistributedData -import akka.cluster.singleton.{ClusterSingletonManager, ClusterSingletonManagerSettings, ClusterSingletonProxy, ClusterSingletonProxySettings} -import akka.cluster.{Cluster, Member, MemberStatus} -import com.typesafe.config.ConfigValueFactory -import org.slf4j.Logger - -import io.gearpump.cluster.ClusterConfig -import io.gearpump.cluster.master.{Master => MasterActor, MasterNode} -import io.gearpump.cluster.master.Master.MasterListUpdated -import io.gearpump.util.Constants._ -import io.gearpump.util.LogUtil.ProcessType -import io.gearpump.util.{AkkaApp, Constants, LogUtil} - -object Master extends AkkaApp with ArgumentsParser { - - private var LOG: Logger = LogUtil.getLogger(getClass) - - override def akkaConfig: Config = ClusterConfig.master() - - override val options: Array[(String, CLIOption[Any])] = - Array("ip" -> CLIOption[String]("<master ip address>", required = true), - "port" -> CLIOption("<master port>", required = true)) - - override val description = "Start Master daemon" - - def main(akkaConf: Config, args: Array[String]): Unit = { - - this.LOG = { - LogUtil.loadConfiguration(akkaConf, ProcessType.MASTER) - LogUtil.getLogger(getClass) - } - - val config = parse(args) - master(config.getString("ip"), config.getInt("port"), akkaConf) - } - - private def verifyMaster(master: String, port: Int, masters: Iterable[String]) = { - masters.exists { hostPort => - hostPort == s"$master:$port" - } - } - - private def master(ip: String, port: Int, akkaConf: Config): Unit = { - val masters = akkaConf.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).asScala - - if (!verifyMaster(ip, port, masters)) { - LOG.error(s"The provided ip $ip and port $port doesn't conform with config at " + - s"gearpump.cluster.masters: ${masters.mkString(", ")}") - System.exit(-1) - } - - val masterList = masters.map(master => s"akka.tcp://${MASTER}@$master").toList.asJava - val quorum = masterList.size() / 2 + 1 - val masterConfig = akkaConf. - withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(port)). - withValue(NETTY_TCP_HOSTNAME, ConfigValueFactory.fromAnyRef(ip)). - withValue("akka.cluster.seed-nodes", ConfigValueFactory.fromAnyRef(masterList)). - withValue(s"akka.cluster.role.${MASTER}.min-nr-of-members", - ConfigValueFactory.fromAnyRef(quorum)) - - LOG.info(s"Starting Master Actor system $ip:$port, master list: ${masters.mkString(";")}") - val system = ActorSystem(MASTER, masterConfig) - - val replicator = DistributedData(system).replicator - LOG.info(s"Replicator path: ${replicator.path}") - - // Starts singleton manager - val singletonManager = system.actorOf(ClusterSingletonManager.props( - singletonProps = Props(classOf[MasterWatcher], MASTER), - terminationMessage = PoisonPill, - settings = ClusterSingletonManagerSettings(system).withSingletonName(MASTER_WATCHER) - .withRole(MASTER)), - name = SINGLETON_MANAGER) - - // Start master proxy - val masterProxy = system.actorOf(ClusterSingletonProxy.props( - singletonManagerPath = s"/user/${SINGLETON_MANAGER}", - // The effective singleton is s"${MASTER_WATCHER}/$MASTER" instead of s"${MASTER_WATCHER}". - // Master is created when there is a majority of machines started. - settings = ClusterSingletonProxySettings(system) - .withSingletonName(s"${MASTER_WATCHER}/$MASTER").withRole(MASTER)), - name = MASTER - ) - - LOG.info(s"master proxy is started at ${masterProxy.path}") - - val mainThread = Thread.currentThread() - Runtime.getRuntime().addShutdownHook(new Thread() { - override def run(): Unit = { - if (!system.whenTerminated.isCompleted) { - LOG.info("Triggering shutdown hook....") - - system.stop(masterProxy) - val cluster = Cluster(system) - cluster.leave(cluster.selfAddress) - cluster.down(cluster.selfAddress) - try { - Await.result(system.whenTerminated, Duration(3, TimeUnit.SECONDS)) - } catch { - case ex: Exception => // Ignore - } - system.terminate() - mainThread.join() - } - } - }) - - Await.result(system.whenTerminated, Duration.Inf) - } -} - -class MasterWatcher(role: String) extends Actor with ActorLogging { - import context.dispatcher - - val cluster = Cluster(context.system) - - val config = context.system.settings.config - val masters = config.getList("akka.cluster.seed-nodes") - val quorum = masters.size() / 2 + 1 - - val system = context.system - - // Sorts by age, oldest first - val ageOrdering = Ordering.fromLessThan[Member] { (a, b) => a.isOlderThan(b) } - var membersByAge: immutable.SortedSet[Member] = immutable.SortedSet.empty(ageOrdering) - - def receive: Receive = null - - // Subscribes to MemberEvent, re-subscribe when restart - override def preStart(): Unit = { - cluster.subscribe(self, classOf[MemberEvent]) - context.become(waitForInit) - } - override def postStop(): Unit = { - cluster.unsubscribe(self) - } - - def matchingRole(member: Member): Boolean = member.hasRole(role) - - def waitForInit: Receive = { - case state: CurrentClusterState => { - membersByAge = immutable.SortedSet.empty(ageOrdering) ++ state.members.filter(m => - m.status == MemberStatus.Up && matchingRole(m)) - - if (membersByAge.size < quorum) { - membersByAge.iterator.mkString(",") - log.info(s"We cannot get a quorum, $quorum, " + - s"shutting down...${membersByAge.iterator.mkString(",")}") - context.become(waitForShutdown) - self ! MasterWatcher.Shutdown - } else { - val master = context.actorOf(Props(classOf[MasterActor]), MASTER) - notifyMasterMembersChange(master) - context.become(waitForClusterEvent(master)) - } - } - } - - def waitForClusterEvent(master: ActorRef): Receive = { - case MemberUp(m) if matchingRole(m) => { - membersByAge += m - notifyMasterMembersChange(master) - } - case mEvent: MemberEvent if (mEvent.isInstanceOf[MemberExited] || - mEvent.isInstanceOf[MemberRemoved]) && matchingRole(mEvent.member) => { - log.info(s"member removed ${mEvent.member}") - val m = mEvent.member - membersByAge -= m - if (membersByAge.size < quorum) { - log.info(s"We cannot get a quorum, $quorum, " + - s"shutting down...${membersByAge.iterator.mkString(",")}") - context.become(waitForShutdown) - self ! MasterWatcher.Shutdown - } else { - notifyMasterMembersChange(master) - } - } - } - - private def notifyMasterMembersChange(master: ActorRef): Unit = { - val masters = membersByAge.toList.map{ member => - MasterNode(member.address.host.getOrElse("Unknown-Host"), - member.address.port.getOrElse(0)) - } - master ! MasterListUpdated(masters) - } - - def waitForShutdown: Receive = { - case MasterWatcher.Shutdown => { - cluster.unsubscribe(self) - cluster.leave(cluster.selfAddress) - context.stop(self) - system.scheduler.scheduleOnce(Duration.Zero) { - try { - Await.result(system.whenTerminated, Duration(3, TimeUnit.SECONDS)) - } catch { - case ex: Exception => // Ignore - } - system.terminate() - } - } - } -} - -object MasterWatcher { - object Shutdown -} - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/io/gearpump/cluster/main/Replay.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/main/Replay.scala b/daemon/src/main/scala/io/gearpump/cluster/main/Replay.scala deleted file mode 100644 index c9a6e9c..0000000 --- a/daemon/src/main/scala/io/gearpump/cluster/main/Replay.scala +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.cluster.main - -import org.slf4j.Logger - -import io.gearpump.cluster.client.ClientContext -import io.gearpump.util.{AkkaApp, LogUtil} - -// Internal tool to restart an application -object Replay extends AkkaApp with ArgumentsParser { - - private val LOG: Logger = LogUtil.getLogger(getClass) - - override val options: Array[(String, CLIOption[Any])] = Array( - "appid" -> CLIOption("<application id>", required = true), - // For document purpose only, OPTION_CONFIG option is not used here. - // OPTION_CONFIG is parsed by parent shell command "Gear" transparently. - Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false, - defaultValue = None)) - - override val description = "Replay the application from current min clock(low watermark)" - - def main(akkaConf: Config, args: Array[String]): Unit = { - val config = parse(args) - - if (null != config) { - val client = ClientContext(akkaConf) - client.replayFromTimestampWindowTrailingEdge(config.getInt("appid")) - client.close() - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/io/gearpump/cluster/main/Worker.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/main/Worker.scala b/daemon/src/main/scala/io/gearpump/cluster/main/Worker.scala deleted file mode 100644 index 4818262..0000000 --- a/daemon/src/main/scala/io/gearpump/cluster/main/Worker.scala +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.cluster.main - -import scala.collection.JavaConverters._ -import scala.concurrent.Await -import scala.concurrent.duration.Duration - -import akka.actor.{ActorSystem, Props} -import org.slf4j.Logger - -import io.gearpump.cluster.ClusterConfig -import io.gearpump.cluster.master.MasterProxy -import io.gearpump.cluster.worker.{Worker => WorkerActor} -import io.gearpump.transport.HostPort -import io.gearpump.util.Constants._ -import io.gearpump.util.LogUtil.ProcessType -import io.gearpump.util.{AkkaApp, LogUtil} - -/** Tool to start a worker daemon process */ -object Worker extends AkkaApp with ArgumentsParser { - protected override def akkaConfig = ClusterConfig.worker() - - override val description = "Start a worker daemon" - - var LOG: Logger = LogUtil.getLogger(getClass) - - private def uuid = java.util.UUID.randomUUID.toString - - def main(akkaConf: Config, args: Array[String]): Unit = { - val id = uuid - - this.LOG = { - LogUtil.loadConfiguration(akkaConf, ProcessType.WORKER) - // Delay creation of LOG instance to avoid creating an empty log file as we - // reset the log file name here - LogUtil.getLogger(getClass) - } - - val system = ActorSystem(id, akkaConf) - - val masterAddress = akkaConf.getStringList(GEARPUMP_CLUSTER_MASTERS).asScala.map { address => - val hostAndPort = address.split(":") - HostPort(hostAndPort(0), hostAndPort(1).toInt) - } - - LOG.info(s"Trying to connect to masters " + masterAddress.mkString(",") + "...") - val masterProxy = system.actorOf(MasterProxy.props(masterAddress), s"masterproxy${system.name}") - - system.actorOf(Props(classOf[WorkerActor], masterProxy), - classOf[WorkerActor].getSimpleName + id) - - Await.result(system.whenTerminated, Duration.Inf) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/io/gearpump/cluster/master/AppManager.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/master/AppManager.scala b/daemon/src/main/scala/io/gearpump/cluster/master/AppManager.scala deleted file mode 100644 index 058533e..0000000 --- a/daemon/src/main/scala/io/gearpump/cluster/master/AppManager.scala +++ /dev/null @@ -1,341 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.cluster.master - -import scala.concurrent.Future -import scala.concurrent.duration._ -import scala.util.{Failure, Success} - -import akka.actor._ -import akka.pattern.ask -import org.slf4j.Logger - -import io.gearpump.cluster.AppMasterToMaster.{AppDataSaved, SaveAppDataFailed, _} -import io.gearpump.cluster.AppMasterToWorker._ -import io.gearpump.cluster.ClientToMaster._ -import io.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterDataRequest, AppMastersDataRequest, _} -import io.gearpump.cluster.MasterToClient._ -import io.gearpump.cluster.WorkerToAppMaster.{ShutdownExecutorFailed, _} -import io.gearpump.cluster.appmaster.{AppMasterRuntimeInfo, ApplicationState} -import io.gearpump.cluster.master.AppManager._ -import io.gearpump.cluster.master.InMemoryKVService.{GetKVResult, PutKVResult, PutKVSuccess, _} -import io.gearpump.cluster.master.Master._ -import io.gearpump.util.Constants._ -import io.gearpump.util.{ActorUtil, TimeOutScheduler, Util, _} - -/** - * AppManager is dedicated child of Master to manager all applications. - */ -private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLauncherFactory) - extends Actor with Stash with TimeOutScheduler { - - private val LOG: Logger = LogUtil.getLogger(getClass) - - private val executorId: Int = APPMASTER_DEFAULT_EXECUTOR_ID - private val appMasterMaxRetries: Int = 5 - private val appMasterRetryTimeRange: Duration = 20.seconds - - implicit val timeout = FUTURE_TIMEOUT - implicit val executionContext = context.dispatcher - - // Next available appId - private var appId: Int = 1 - - // From appid to appMaster data - private var appMasterRegistry = Map.empty[Int, (ActorRef, AppMasterRuntimeInfo)] - - // Dead appmaster list - private var deadAppMasters = Map.empty[Int, (ActorRef, AppMasterRuntimeInfo)] - - private var appMasterRestartPolicies = Map.empty[Int, RestartPolicy] - - def receive: Receive = null - - kvService ! GetKV(MASTER_GROUP, MASTER_STATE) - context.become(waitForMasterState) - - def waitForMasterState: Receive = { - case GetKVSuccess(_, result) => - val masterState = result.asInstanceOf[MasterState] - if (masterState != null) { - this.appId = masterState.maxId + 1 - this.deadAppMasters = masterState.deadAppMasters - this.appMasterRegistry = masterState.appMasterRegistry - } - context.become(receiveHandler) - unstashAll() - case GetKVFailed(ex) => - LOG.error("Failed to get master state, shutting down master to avoid data corruption...") - context.parent ! PoisonPill - case msg => - LOG.info(s"Get message ${msg.getClass.getSimpleName}") - stash() - } - - def receiveHandler: Receive = { - val msg = "Application Manager started. Ready for application submission..." - LOG.info(msg) - clientMsgHandler orElse appMasterMessage orElse selfMsgHandler orElse workerMessage orElse - appDataStoreService orElse terminationWatch - } - - def clientMsgHandler: Receive = { - case SubmitApplication(app, jar, username) => - LOG.info(s"Submit Application ${app.name}($appId) by $username...") - val client = sender - if (applicationNameExist(app.name)) { - client ! SubmitApplicationResult(Failure( - new Exception(s"Application name ${app.name} already existed"))) - } else { - context.actorOf(launcher.props(appId, executorId, app, jar, username, context.parent, - Some(client)), s"launcher${appId}_${Util.randInt()}") - - val appState = new ApplicationState(appId, app.name, 0, app, jar, username, null) - appMasterRestartPolicies += appId -> - new RestartPolicy(appMasterMaxRetries, appMasterRetryTimeRange) - kvService ! PutKV(appId.toString, APP_STATE, appState) - appId += 1 - } - - case RestartApplication(appId) => - val client = sender() - (kvService ? GetKV(appId.toString, APP_STATE)).asInstanceOf[Future[GetKVResult]].map { - case GetKVSuccess(_, result) => - val appState = result.asInstanceOf[ApplicationState] - if (appState != null) { - LOG.info(s"Shutting down the application (restart), $appId") - self ! ShutdownApplication(appId) - self.tell(SubmitApplication(appState.app, appState.jar, appState.username), client) - } else { - client ! SubmitApplicationResult(Failure( - new Exception(s"Failed to restart, because the application $appId does not exist.") - )) - } - case GetKVFailed(ex) => - client ! SubmitApplicationResult(Failure( - new Exception(s"Unable to obtain the Master State. " + - s"Application $appId will not be restarted.") - )) - } - - case ShutdownApplication(appId) => - LOG.info(s"App Manager Shutting down application $appId") - val (_, info) = appMasterRegistry.getOrElse(appId, (null, null)) - Option(info) match { - case Some(info) => - val worker = info.worker - val workerPath = Option(worker).map(_.path).orNull - LOG.info(s"Shutdown AppMaster at ${workerPath}, appId: $appId, executorId: $executorId") - cleanApplicationData(appId) - val shutdown = ShutdownExecutor(appId, executorId, - s"AppMaster $appId shutdown requested by master...") - sendMsgWithTimeOutCallBack(worker, shutdown, 30000, shutDownExecutorTimeOut()) - sender ! ShutdownApplicationResult(Success(appId)) - case None => - val errorMsg = s"Failed to find regisration information for appId: $appId" - LOG.error(errorMsg) - sender ! ShutdownApplicationResult(Failure(new Exception(errorMsg))) - } - case ResolveAppId(appId) => - val (appMaster, _) = appMasterRegistry.getOrElse(appId, (null, null)) - if (null != appMaster) { - sender ! ResolveAppIdResult(Success(appMaster)) - } else { - sender ! ResolveAppIdResult(Failure(new Exception(s"Can not find Application: $appId"))) - } - case AppMastersDataRequest => - var appMastersData = collection.mutable.ListBuffer[AppMasterData]() - appMasterRegistry.foreach(pair => { - val (id, (appMaster: ActorRef, info: AppMasterRuntimeInfo)) = pair - val appMasterPath = ActorUtil.getFullPath(context.system, appMaster.path) - val workerPath = Option(info.worker).map(worker => - ActorUtil.getFullPath(context.system, worker.path)) - appMastersData += AppMasterData( - AppMasterActive, id, info.appName, appMasterPath, workerPath.orNull, - info.submissionTime, info.startTime, info.finishTime, info.user) - }) - - deadAppMasters.foreach(pair => { - val (id, (appMaster: ActorRef, info: AppMasterRuntimeInfo)) = pair - val appMasterPath = ActorUtil.getFullPath(context.system, appMaster.path) - val workerPath = Option(info.worker).map(worker => - ActorUtil.getFullPath(context.system, worker.path)) - - appMastersData += AppMasterData( - AppMasterInActive, id, info.appName, appMasterPath, workerPath.orNull, - info.submissionTime, info.startTime, info.finishTime, info.user) - }) - - sender ! AppMastersData(appMastersData.toList) - case QueryAppMasterConfig(appId) => - val config = - if (appMasterRegistry.contains(appId)) { - val (_, info) = appMasterRegistry(appId) - info.config - } else if (deadAppMasters.contains(appId)) { - val (_, info) = deadAppMasters(appId) - info.config - } else { - null - } - sender ! AppMasterConfig(config) - - case appMasterDataRequest: AppMasterDataRequest => - val appId = appMasterDataRequest.appId - val (appStatus, appMaster, info) = - if (appMasterRegistry.contains(appId)) { - val (appMaster, info) = appMasterRegistry(appId) - (AppMasterActive, appMaster, info) - } else if (deadAppMasters.contains(appId)) { - val (appMaster, info) = deadAppMasters(appId) - (AppMasterInActive, appMaster, info) - } else { - (AppMasterNonExist, null, null) - } - - appStatus match { - case AppMasterActive | AppMasterInActive => - val appMasterPath = ActorUtil.getFullPath(context.system, appMaster.path) - val workerPath = Option(info.worker).map( - worker => ActorUtil.getFullPath(context.system, worker.path)).orNull - sender ! AppMasterData( - appStatus, appId, info.appName, appMasterPath, workerPath, - info.submissionTime, info.startTime, info.finishTime, info.user) - - case AppMasterNonExist => - sender ! AppMasterData(AppMasterNonExist) - } - } - - def workerMessage: Receive = { - case ShutdownExecutorSucceed(appId, executorId) => - LOG.info(s"Shut down executor $executorId for application $appId successfully") - case failed: ShutdownExecutorFailed => - LOG.error(failed.reason) - } - - private def shutDownExecutorTimeOut(): Unit = { - LOG.error(s"Shut down executor time out") - } - - def appMasterMessage: Receive = { - case RegisterAppMaster(appMaster, registerBack: AppMasterRuntimeInfo) => - val startTime = System.currentTimeMillis() - val register = registerBack.copy(startTime = startTime) - - LOG.info(s"Register AppMaster for app: ${register.appId} $register") - context.watch(appMaster) - appMasterRegistry += register.appId -> (appMaster, register) - kvService ! PutKV(MASTER_GROUP, MASTER_STATE, - MasterState(appId, appMasterRegistry, deadAppMasters)) - sender ! AppMasterRegistered(register.appId) - } - - def appDataStoreService: Receive = { - case SaveAppData(appId, key, value) => - val client = sender - (kvService ? PutKV(appId.toString, key, value)).asInstanceOf[Future[PutKVResult]].map { - case PutKVSuccess => - client ! AppDataSaved - case PutKVFailed(k, ex) => - client ! SaveAppDataFailed - } - case GetAppData(appId, key) => - val client = sender - (kvService ? GetKV(appId.toString, key)).asInstanceOf[Future[GetKVResult]].map { - case GetKVSuccess(privateKey, value) => - client ! GetAppDataResult(key, value) - case GetKVFailed(ex) => - client ! GetAppDataResult(key, null) - } - } - - def terminationWatch: Receive = { - case terminate: Terminated => - terminate.getAddressTerminated() - LOG.info(s"AppMaster(${terminate.actor.path}) is terminiated, " + - s"network down: ${terminate.getAddressTerminated()}") - - // Now we assume that the only normal way to stop the application is submitting a - // ShutdownApplication request - val application = appMasterRegistry.find { appInfo => - val (_, (actorRef, _)) = appInfo - actorRef.compareTo(terminate.actor) == 0 - } - if (application.nonEmpty) { - val appId = application.get._1 - (kvService ? GetKV(appId.toString, APP_STATE)).asInstanceOf[Future[GetKVResult]].map { - case GetKVSuccess(_, result) => - val appState = result.asInstanceOf[ApplicationState] - if (appState != null) { - LOG.info(s"Recovering application, $appId") - self ! RecoverApplication(appState) - } else { - LOG.error(s"Cannot find application state for $appId") - } - case GetKVFailed(ex) => - LOG.error(s"Cannot find master state to recover") - } - } - } - - def selfMsgHandler: Receive = { - case RecoverApplication(state) => - val appId = state.appId - if (appMasterRestartPolicies.get(appId).get.allowRestart) { - LOG.info(s"AppManager Recovering Application $appId...") - context.actorOf(launcher.props(appId, executorId, state.app, state.jar, state.username, - context.parent, None), s"launcher${appId}_${Util.randInt()}") - } else { - LOG.error(s"Application $appId failed too many times") - } - } - - case class RecoverApplication(applicationStatus: ApplicationState) - - private def cleanApplicationData(appId: Int): Unit = { - // Add the dead app to dead appMaster - appMasterRegistry.get(appId).foreach { pair => - val (appMasterActor, info) = pair - deadAppMasters += appId -> (appMasterActor, info.copy( - finishTime = System.currentTimeMillis())) - } - - appMasterRegistry -= appId - - kvService ! PutKV(MASTER_GROUP, MASTER_STATE, - MasterState(this.appId, appMasterRegistry, deadAppMasters)) - kvService ! DeleteKVGroup(appId.toString) - } - - private def applicationNameExist(appName: String): Boolean = { - appMasterRegistry.values.exists(_._2.appName == appName) - } -} - -object AppManager { - final val APP_STATE = "app_state" - // The id is used in KVStore - final val MASTER_STATE = "master_state" - - case class MasterState( - maxId: Int, - appMasterRegistry: Map[Int, (ActorRef, AppMasterRuntimeInfo)], - deadAppMasters: Map[Int, (ActorRef, AppMasterRuntimeInfo)]) -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/io/gearpump/cluster/master/InMemoryKVService.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/master/InMemoryKVService.scala b/daemon/src/main/scala/io/gearpump/cluster/master/InMemoryKVService.scala deleted file mode 100644 index 616d3ee..0000000 --- a/daemon/src/main/scala/io/gearpump/cluster/master/InMemoryKVService.scala +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.cluster.master - -import java.util.concurrent.TimeUnit -import scala.concurrent.TimeoutException -import scala.concurrent.duration.Duration - -import akka.actor._ -import akka.cluster.Cluster -import akka.cluster.ddata.Replicator._ -import akka.cluster.ddata.{DistributedData, LWWMap, LWWMapKey} -import org.slf4j.Logger - -import io.gearpump.util.LogUtil - -/** - * A replicated simple in-memory KV service. The replications are stored on all masters. - */ -class InMemoryKVService extends Actor with Stash { - import io.gearpump.cluster.master.InMemoryKVService._ - - private val KV_SERVICE = "gearpump_kvservice" - - private val LOG: Logger = LogUtil.getLogger(getClass) - private val replicator = DistributedData(context.system).replicator - private implicit val cluster = Cluster(context.system) - - // Optimize write path, we can tolerate one master down for recovery. - private val timeout = Duration(15, TimeUnit.SECONDS) - private val readMajority = ReadMajority(timeout) - private val writeMajority = WriteMajority(timeout) - - private def groupKey(group: String): LWWMapKey[Any] = { - LWWMapKey[Any](KV_SERVICE + "_" + group) - } - - def receive: Receive = kvService - - def kvService: Receive = { - - case GetKV(group: String, key: String) => - val request = Request(sender(), key) - replicator ! Get(groupKey(group), readMajority, Some(request)) - case success@GetSuccess(group: LWWMapKey[Any @unchecked], Some(request: Request)) => - val appData = success.get(group) - LOG.info(s"Successfully retrived group: ${group.id}") - request.client ! GetKVSuccess(request.key, appData.get(request.key).orNull) - case NotFound(group: LWWMapKey[Any @unchecked], Some(request: Request)) => - LOG.info(s"We cannot find group $group") - request.client ! GetKVSuccess(request.key, null) - case GetFailure(group: LWWMapKey[Any @unchecked], Some(request: Request)) => - val error = s"Failed to get application data, the request key is ${request.key}" - LOG.error(error) - request.client ! GetKVFailed(new Exception(error)) - - case PutKV(group: String, key: String, value: Any) => - val request = Request(sender(), key) - val update = Update(groupKey(group), LWWMap(), writeMajority, Some(request)) { map => - map + (key -> value) - } - replicator ! update - case UpdateSuccess(group: LWWMapKey[Any @unchecked], Some(request: Request)) => - request.client ! PutKVSuccess - case ModifyFailure(group: LWWMapKey[Any @unchecked], error, cause, Some(request: Request)) => - request.client ! PutKVFailed(request.key, new Exception(error, cause)) - case UpdateTimeout(group: LWWMapKey[Any @unchecked], Some(request: Request)) => - request.client ! PutKVFailed(request.key, new TimeoutException()) - - case delete@DeleteKVGroup(group: String) => - replicator ! Delete(groupKey(group), writeMajority) - case DeleteSuccess(group) => - LOG.info(s"KV Group ${group.id} is deleted") - case ReplicationDeleteFailure(group) => - LOG.error(s"Failed to delete KV Group ${group.id}...") - case DataDeleted(group) => - LOG.error(s"Group ${group.id} is deleted, you can no longer put/get/delete this group...") - } -} - -object InMemoryKVService { - /** - * KV Service related - */ - case class GetKV(group: String, key: String) - - trait GetKVResult - - case class GetKVSuccess(key: String, value: Any) extends GetKVResult - - case class GetKVFailed(ex: Throwable) extends GetKVResult - - case class PutKV(group: String, key: String, value: Any) - - case class DeleteKVGroup(group: String) - - case class GroupDeleted(group: String) extends GetKVResult with PutKVResult - - trait PutKVResult - - case object PutKVSuccess extends PutKVResult - - case class PutKVFailed(key: String, ex: Throwable) extends PutKVResult - - case class Request(client: ActorRef, key: String) -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala b/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala deleted file mode 100644 index 0dfa381..0000000 --- a/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala +++ /dev/null @@ -1,313 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.cluster.master - -import java.lang.management.ManagementFactory -import scala.collection.JavaConverters._ -import scala.collection.immutable - -import akka.actor._ -import akka.remote.DisassociatedEvent -import com.typesafe.config.Config -import org.apache.commons.lang.exception.ExceptionUtils -import org.slf4j.Logger - -import io.gearpump.cluster.AppMasterToMaster._ -import io.gearpump.cluster.ClientToMaster._ -import io.gearpump.cluster.ClusterConfig -import io.gearpump.cluster.MasterToAppMaster._ -import io.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, MasterConfig, ResolveWorkerIdResult} -import io.gearpump.cluster.MasterToWorker._ -import io.gearpump.cluster.WorkerToMaster._ -import io.gearpump.cluster.master.InMemoryKVService._ -import io.gearpump.cluster.master.Master.{MasterInfo, WorkerTerminated, _} -import io.gearpump.cluster.scheduler.Scheduler.ApplicationFinished -import io.gearpump.cluster.worker.WorkerId -import io.gearpump.jarstore.local.LocalJarStore -import io.gearpump.metrics.Metrics.ReportMetrics -import io.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService} -import io.gearpump.transport.HostPort -import io.gearpump.util.Constants._ -import io.gearpump.util.HistoryMetricsService.HistoryMetricsConfig -import io.gearpump.util._ - -/** - * Master Actor who manages resources of the whole cluster. - * It is like the resource manager of YARN. - */ -private[cluster] class Master extends Actor with Stash { - private val LOG: Logger = LogUtil.getLogger(getClass) - private val systemConfig: Config = context.system.settings.config - private implicit val timeout = Constants.FUTURE_TIMEOUT - private val kvService = context.actorOf(Props(new InMemoryKVService()), "kvService") - // Resources and resourceRequests can be dynamically constructed by - // heartbeat of worker and appmaster when master singleton is migrated. - // We don't need to persist them in cluster - private var appManager: ActorRef = null - - private var scheduler: ActorRef = null - - private var workers = new immutable.HashMap[ActorRef, WorkerId] - - private val birth = System.currentTimeMillis() - - private var nextWorkerId = 0 - - def receive: Receive = null - - // Register jvm metrics - Metrics(context.system).register(new JvmMetricsSet(s"master")) - - LOG.info("master is started at " + ActorUtil.getFullPath(context.system, self.path) + "...") - - val jarStoreRootPath = systemConfig.getString(Constants.GEARPUMP_APP_JAR_STORE_ROOT_PATH) - - private val jarStore = if (Util.isLocalPath(jarStoreRootPath)) { - Some(context.actorOf(Props(classOf[LocalJarStore], jarStoreRootPath))) - } else { - None - } - - private val hostPort = HostPort(ActorUtil.getSystemAddress(context.system).hostPort) - - // Maintain the list of active masters. - private var masters: List[MasterNode] = { - // Add myself into the list of initial masters. - List(MasterNode(hostPort.host, hostPort.port)) - } - - val metricsEnabled = systemConfig.getBoolean(GEARPUMP_METRIC_ENABLED) - - val getHistoryMetricsConfig = HistoryMetricsConfig(systemConfig) - val historyMetricsService = if (metricsEnabled) { - val historyMetricsService = { - context.actorOf(Props(new HistoryMetricsService("master", getHistoryMetricsConfig))) - } - - val metricsReportService = context.actorOf( - Props(new MetricsReporterService(Metrics(context.system)))) - historyMetricsService.tell(ReportMetrics, metricsReportService) - Some(historyMetricsService) - } else { - None - } - - kvService ! GetKV(MASTER_GROUP, WORKER_ID) - context.become(waitForNextWorkerId) - - def waitForNextWorkerId: Receive = { - case GetKVSuccess(_, result) => - if (result != null) { - this.nextWorkerId = result.asInstanceOf[Int] - } else { - LOG.warn("Cannot find existing state in the distributed cluster...") - } - context.become(receiveHandler) - unstashAll() - case GetKVFailed(ex) => - LOG.error("Failed to get worker id, shutting down master to avoid data corruption...") - context.parent ! PoisonPill - case msg => - LOG.info(s"Get message ${msg.getClass.getSimpleName}") - stash() - } - - def receiveHandler: Receive = workerMsgHandler orElse - appMasterMsgHandler orElse - onMasterListChange orElse - clientMsgHandler orElse - metricsService orElse - jarStoreService orElse - terminationWatch orElse - disassociated orElse - kvServiceMsgHandler orElse - ActorUtil.defaultMsgHandler(self) - - def workerMsgHandler: Receive = { - case RegisterNewWorker => - val workerId = WorkerId(nextWorkerId, System.currentTimeMillis()) - nextWorkerId += 1 - kvService ! PutKV(MASTER_GROUP, WORKER_ID, nextWorkerId) - val workerHostname = ActorUtil.getHostname(sender()) - LOG.info(s"Register new from $workerHostname ....") - self forward RegisterWorker(workerId) - - case RegisterWorker(id) => - context.watch(sender()) - sender ! WorkerRegistered(id, MasterInfo(self, birth)) - scheduler forward WorkerRegistered(id, MasterInfo(self, birth)) - workers += (sender() -> id) - val workerHostname = ActorUtil.getHostname(sender()) - LOG.info(s"Register Worker with id $id from $workerHostname ....") - case resourceUpdate: ResourceUpdate => - scheduler forward resourceUpdate - } - - def jarStoreService: Receive = { - case GetJarStoreServer => - jarStore.foreach(_ forward GetJarStoreServer) - } - - def kvServiceMsgHandler: Receive = { - case PutKVSuccess => - // Skip - case PutKVFailed(key, exception) => - LOG.error(s"Put KV of key $key to InMemoryKVService failed.\n" + - ExceptionUtils.getStackTrace(exception)) - } - - def metricsService: Receive = { - case query: QueryHistoryMetrics => - if (historyMetricsService.isEmpty) { - // Returns empty metrics so that we don't hang the UI - sender ! HistoryMetrics(query.path, List.empty[HistoryMetricsItem]) - } else { - historyMetricsService.get forward query - } - } - - def appMasterMsgHandler: Receive = { - case request: RequestResource => - scheduler forward request - case registerAppMaster: RegisterAppMaster => - // Forward to appManager - appManager forward registerAppMaster - case save: SaveAppData => - appManager forward save - case get: GetAppData => - appManager forward get - case GetAllWorkers => - sender ! WorkerList(workers.values.toList) - case GetMasterData => - val aliveFor = System.currentTimeMillis() - birth - val logFileDir = LogUtil.daemonLogDir(systemConfig).getAbsolutePath - val userDir = System.getProperty("user.dir") - - val masterDescription = - MasterSummary( - MasterNode(hostPort.host, hostPort.port), - masters, - aliveFor, - logFileDir, - jarStoreRootPath, - MasterStatus.Synced, - userDir, - List.empty[MasterActivity], - jvmName = ManagementFactory.getRuntimeMXBean().getName(), - historyMetricsConfig = getHistoryMetricsConfig - ) - - sender ! MasterData(masterDescription) - - case invalidAppMaster: InvalidAppMaster => - appManager forward invalidAppMaster - } - - import scala.util.{Failure, Success} - - def onMasterListChange: Receive = { - case MasterListUpdated(masters: List[MasterNode]) => - this.masters = masters - } - - def clientMsgHandler: Receive = { - case app: SubmitApplication => - LOG.debug(s"Receive from client, SubmitApplication $app") - appManager.forward(app) - case app: RestartApplication => - LOG.debug(s"Receive from client, RestartApplication $app") - appManager.forward(app) - case app: ShutdownApplication => - LOG.debug(s"Receive from client, Shutting down Application ${app.appId}") - scheduler ! ApplicationFinished(app.appId) - appManager.forward(app) - case app: ResolveAppId => - LOG.debug(s"Receive from client, resolving appId ${app.appId} to ActorRef") - appManager.forward(app) - case resolve: ResolveWorkerId => - LOG.debug(s"Receive from client, resolving workerId ${resolve.workerId}") - val worker = workers.find(_._2 == resolve.workerId) - worker match { - case Some(worker) => sender ! ResolveWorkerIdResult(Success(worker._1)) - case None => sender ! ResolveWorkerIdResult(Failure( - new Exception(s"cannot find worker ${resolve.workerId}"))) - } - case AppMastersDataRequest => - LOG.debug("Master received AppMastersDataRequest") - appManager forward AppMastersDataRequest - case appMasterDataRequest: AppMasterDataRequest => - LOG.debug("Master received AppMasterDataRequest") - appManager forward appMasterDataRequest - case query: QueryAppMasterConfig => - LOG.debug("Master received QueryAppMasterConfig") - appManager forward query - case QueryMasterConfig => - sender ! MasterConfig(ClusterConfig.filterOutDefaultConfig(systemConfig)) - } - - def disassociated: Receive = { - case disassociated: DisassociatedEvent => - LOG.info(s" disassociated ${disassociated.remoteAddress}") - } - - def terminationWatch: Receive = { - case t: Terminated => - val actor = t.actor - LOG.info(s"worker ${actor.path} get terminated, is it due to network reason?" + - t.getAddressTerminated()) - - LOG.info("Let's filter out dead resources...") - // Filters out dead worker resource - if (workers.keySet.contains(actor)) { - scheduler ! WorkerTerminated(workers.get(actor).get) - workers -= actor - } - } - - override def preStart(): Unit = { - val path = ActorUtil.getFullPath(context.system, self.path) - LOG.info(s"master path is $path") - val schedulerClass = Class.forName( - systemConfig.getString(Constants.GEARPUMP_SCHEDULING_SCHEDULER)) - - appManager = context.actorOf(Props(new AppManager(kvService, AppMasterLauncher)), - classOf[AppManager].getSimpleName) - scheduler = context.actorOf(Props(schedulerClass)) - context.system.eventStream.subscribe(self, classOf[DisassociatedEvent]) - } -} - -object Master { - final val MASTER_GROUP = "master_group" - - final val WORKER_ID = "next_worker_id" - - case class WorkerTerminated(workerId: WorkerId) - - case class MasterInfo(master: ActorRef, startTime: Long = 0L) - - /** Notify the subscriber that master actor list has been updated */ - case class MasterListUpdated(masters: List[MasterNode]) - - object MasterInfo { - def empty: MasterInfo = MasterInfo(null) - } - - case class SlotStatus(totalSlots: Int, availableSlots: Int) -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/io/gearpump/cluster/scheduler/PriorityScheduler.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/scheduler/PriorityScheduler.scala b/daemon/src/main/scala/io/gearpump/cluster/scheduler/PriorityScheduler.scala deleted file mode 100644 index 5df008e..0000000 --- a/daemon/src/main/scala/io/gearpump/cluster/scheduler/PriorityScheduler.scala +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.cluster.scheduler - -import scala.collection.mutable - -import akka.actor.ActorRef - -import io.gearpump.cluster.AppMasterToMaster.RequestResource -import io.gearpump.cluster.MasterToAppMaster.ResourceAllocated -import io.gearpump.cluster.scheduler.Relaxation._ -import io.gearpump.cluster.scheduler.Scheduler.PendingRequest -import io.gearpump.cluster.worker.WorkerId - -/** Assign resource to application based on the priority of the application */ -class PriorityScheduler extends Scheduler { - private var resourceRequests = new mutable.PriorityQueue[PendingRequest]()(requestOrdering) - - def requestOrdering: Ordering[PendingRequest] = new Ordering[PendingRequest] { - override def compare(x: PendingRequest, y: PendingRequest): Int = { - var res = x.request.priority.id - y.request.priority.id - if (res == 0) { - res = y.timeStamp.compareTo(x.timeStamp) - } - res - } - } - - override def receive: Receive = super.handleScheduleMessage orElse resourceRequestHandler - - override def allocateResource(): Unit = { - var scheduleLater = Array.empty[PendingRequest] - val resourcesSnapShot = resources.clone() - var allocated = Resource.empty - val totalResource = Resource(resourcesSnapShot.values.map(_._2.slots).sum) - - while (resourceRequests.nonEmpty && (allocated < totalResource)) { - val PendingRequest(appId, appMaster, request, timeStamp) = resourceRequests.dequeue() - request.relaxation match { - case ANY => - val allocations = allocateFairly(resourcesSnapShot, request) - val newAllocated = Resource(allocations.map(_.resource.slots).sum) - if (allocations.nonEmpty) { - appMaster ! ResourceAllocated(allocations.toArray) - } - if (newAllocated < request.resource) { - val remainingRequest = request.resource - newAllocated - val remainingExecutors = request.executorNum - allocations.length - val newResourceRequest = request.copy(resource = remainingRequest, - executorNum = remainingExecutors) - scheduleLater = scheduleLater :+ - PendingRequest(appId, appMaster, newResourceRequest, timeStamp) - } - allocated = allocated + newAllocated - case ONEWORKER => - val availableResource = resourcesSnapShot.find { params => - val (_, (_, resource)) = params - resource > request.resource - } - if (availableResource.nonEmpty) { - val (workerId, (worker, resource)) = availableResource.get - allocated = allocated + request.resource - appMaster ! ResourceAllocated(Array(ResourceAllocation(request.resource, worker, - workerId))) - resourcesSnapShot.update(workerId, (worker, resource - request.resource)) - } else { - scheduleLater = scheduleLater :+ PendingRequest(appId, appMaster, request, timeStamp) - } - case SPECIFICWORKER => - val workerAndResource = resourcesSnapShot.get(request.workerId) - if (workerAndResource.nonEmpty && workerAndResource.get._2 > request.resource) { - val (worker, availableResource) = workerAndResource.get - appMaster ! ResourceAllocated(Array(ResourceAllocation(request.resource, worker, - request.workerId))) - allocated = allocated + request.resource - resourcesSnapShot.update(request.workerId, (worker, - availableResource - request.resource)) - } else { - scheduleLater = scheduleLater :+ PendingRequest(appId, appMaster, request, timeStamp) - } - } - } - for (request <- scheduleLater) - resourceRequests.enqueue(request) - } - - def resourceRequestHandler: Receive = { - case RequestResource(appId, request) => - LOG.info(s"Request resource: appId: $appId, slots: ${request.resource.slots}, " + - s"relaxation: ${request.relaxation}, executor number: ${request.executorNum}") - val appMaster = sender() - resourceRequests.enqueue(new PendingRequest(appId, appMaster, request, - System.currentTimeMillis())) - allocateResource() - } - - override def doneApplication(appId: Int): Unit = { - resourceRequests = resourceRequests.filter(_.appId != appId) - } - - private def allocateFairly( - resources: mutable.HashMap[WorkerId, (ActorRef, Resource)], request: ResourceRequest) - : List[ResourceAllocation] = { - val workerNum = resources.size - var allocations = List.empty[ResourceAllocation] - var totalAvailable = Resource(resources.values.map(_._2.slots).sum) - var remainingRequest = request.resource - var remainingExecutors = Math.min(request.executorNum, request.resource.slots) - - while (remainingExecutors > 0 && !totalAvailable.isEmpty) { - val exeutorNum = Math.min(workerNum, remainingExecutors) - val toRequest = Resource(remainingRequest.slots * exeutorNum / remainingExecutors) - - val sortedResources = resources.toArray.sortBy(_._2._2.slots)(Ordering[Int].reverse) - val pickedResources = sortedResources.take(exeutorNum) - - val flattenResource = pickedResources.zipWithIndex.flatMap { workerWithIndex => - val ((workerId, (worker, resource)), index) = workerWithIndex - 0.until(resource.slots).map(seq => ((workerId, worker), seq * workerNum + index)) - }.sortBy(_._2).map(_._1) - - if (flattenResource.length < toRequest.slots) { - // Can not safisfy the user's requirements - totalAvailable = Resource.empty - } else { - flattenResource.take(toRequest.slots).groupBy(actor => actor).mapValues(_.length). - toArray.foreach { params => - val ((workerId, worker), slots) = params - resources.update(workerId, (worker, resources.get(workerId).get._2 - Resource(slots))) - allocations :+= ResourceAllocation(Resource(slots), worker, workerId) - } - totalAvailable -= toRequest - remainingRequest -= toRequest - remainingExecutors -= exeutorNum - } - } - allocations - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/io/gearpump/cluster/scheduler/Scheduler.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/scheduler/Scheduler.scala b/daemon/src/main/scala/io/gearpump/cluster/scheduler/Scheduler.scala deleted file mode 100644 index ccd105f..0000000 --- a/daemon/src/main/scala/io/gearpump/cluster/scheduler/Scheduler.scala +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.cluster.scheduler - -import scala.collection.mutable - -import akka.actor.{Actor, ActorRef} -import org.slf4j.Logger - -import io.gearpump.TimeStamp -import io.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, UpdateResourceSucceed, WorkerRegistered} -import io.gearpump.cluster.WorkerToMaster.ResourceUpdate -import io.gearpump.cluster.master.Master.WorkerTerminated -import io.gearpump.cluster.scheduler.Scheduler.ApplicationFinished -import io.gearpump.cluster.worker.WorkerId -import io.gearpump.util.LogUtil - -/** - * Scheduler schedule resource for different applications. - */ -abstract class Scheduler extends Actor { - val LOG: Logger = LogUtil.getLogger(getClass) - protected var resources = new mutable.HashMap[WorkerId, (ActorRef, Resource)] - - def handleScheduleMessage: Receive = { - case WorkerRegistered(id, _) => - if (!resources.contains(id)) { - LOG.info(s"Worker $id added to the scheduler") - resources.put(id, (sender, Resource.empty)) - } - case update@ResourceUpdate(worker, workerId, resource) => - LOG.info(s"$update...") - if (resources.contains(workerId)) { - val resourceReturned = resource > resources.get(workerId).get._2 - resources.update(workerId, (worker, resource)) - if (resourceReturned) { - allocateResource() - } - sender ! UpdateResourceSucceed - } - else { - sender ! UpdateResourceFailed( - s"ResourceUpdate failed! The worker $workerId has not been registered into master") - } - case WorkerTerminated(workerId) => - if (resources.contains(workerId)) { - resources -= workerId - } - case ApplicationFinished(appId) => - doneApplication(appId) - } - - def allocateResource(): Unit - - def doneApplication(appId: Int): Unit -} - -object Scheduler { - case class PendingRequest( - appId: Int, appMaster: ActorRef, request: ResourceRequest, timeStamp: TimeStamp) - - case class ApplicationFinished(appId: Int) -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/daemon/src/main/scala/io/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala b/daemon/src/main/scala/io/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala deleted file mode 100644 index f97a209..0000000 --- a/daemon/src/main/scala/io/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.gearpump.cluster.worker - -import java.io.File - -import com.typesafe.config.Config -import org.slf4j.Logger - -import io.gearpump.cluster.scheduler.Resource -import io.gearpump.util.{LogUtil, RichProcess, Util} - -/** Launcher to start an executor process */ -class DefaultExecutorProcessLauncher(val config: Config) extends ExecutorProcessLauncher { - private val LOG: Logger = LogUtil.getLogger(getClass) - - override def createProcess( - appId: Int, executorId: Int, resource: Resource, config: Config, options: Array[String], - classPath: Array[String], mainClass: String, arguments: Array[String]): RichProcess = { - - LOG.info(s"Launch executor, classpath: ${classPath.mkString(File.pathSeparator)}") - Util.startProcess(options, classPath, mainClass, arguments) - } - - override def cleanProcess(appId: Int, executorId: Int): Unit = {} -}
