http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/cluster/main/Local.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/main/Local.scala b/core/src/main/scala/org/apache/gearpump/cluster/main/Local.scala new file mode 100644 index 0000000..db2cd8a --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/cluster/main/Local.scala @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.main + +import akka.actor.{ActorSystem, Props} +import com.typesafe.config.ConfigValueFactory +import org.apache.gearpump.cluster.ClusterConfig +import org.apache.gearpump.cluster.master.{Master => MasterActor} +import org.apache.gearpump.cluster.worker.{Worker => WorkerActor} +import org.apache.gearpump.util.Constants._ +import org.apache.gearpump.util.LogUtil.ProcessType +import org.apache.gearpump.util.{ActorUtil, AkkaApp, Constants, LogUtil, Util} +import org.slf4j.Logger + +import scala.collection.JavaConverters._ +import scala.concurrent.Await +import scala.concurrent.duration.Duration + +object Local extends AkkaApp with ArgumentsParser { + override def akkaConfig: Config = ClusterConfig.master() + + var LOG: Logger = LogUtil.getLogger(getClass) + + override val options: Array[(String, CLIOption[Any])] = + Array("sameprocess" -> CLIOption[Boolean]("", required = false, defaultValue = Some(false)), + "workernum" -> CLIOption[Int]("<how many workers to start>", required = false, + defaultValue = Some(2))) + + override val description = "Start a local cluster" + + def main(akkaConf: Config, args: Array[String]): Unit = { + + this.LOG = { + LogUtil.loadConfiguration(akkaConf, ProcessType.LOCAL) + LogUtil.getLogger(getClass) + } + + val config = parse(args) + if (null != config) { + local(config.getInt("workernum"), config.getBoolean("sameprocess"), akkaConf) + } + } + + def local(workerCount: Int, sameProcess: Boolean, akkaConf: Config): Unit = { + if (sameProcess) { + LOG.info("Starting local in same process") + System.setProperty("LOCAL", "true") + } + val masters = akkaConf.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS) + .asScala.flatMap(Util.parseHostList) + val local = akkaConf.getString(Constants.GEARPUMP_HOSTNAME) + + if (masters.size != 1 && masters.head.host != local) { + LOG.error(s"The ${Constants.GEARPUMP_CLUSTER_MASTERS} is not match " + + s"with ${Constants.GEARPUMP_HOSTNAME}") + } else { + + val hostPort = masters.head + implicit val system = ActorSystem(MASTER, akkaConf. + withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(hostPort.port)) + ) + + val master = system.actorOf(Props[MasterActor], MASTER) + val masterPath = ActorUtil.getSystemAddress(system).toString + s"/user/$MASTER" + + 0.until(workerCount).foreach { id => + system.actorOf(Props(classOf[WorkerActor], master), classOf[WorkerActor].getSimpleName + id) + } + + Await.result(system.whenTerminated, Duration.Inf) + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/cluster/main/Master.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/main/Master.scala b/core/src/main/scala/org/apache/gearpump/cluster/main/Master.scala new file mode 100644 index 0000000..f758720 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/cluster/main/Master.scala @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.main + +import java.util.concurrent.TimeUnit + +import akka.actor._ +import akka.cluster.ClusterEvent._ +import akka.cluster.{MemberStatus, Member, Cluster} +import akka.cluster.ddata.DistributedData +import akka.cluster.singleton.{ClusterSingletonProxySettings, ClusterSingletonProxy, ClusterSingletonManagerSettings, ClusterSingletonManager} +import com.typesafe.config.ConfigValueFactory +import org.apache.gearpump.cluster.ClusterConfig +import org.apache.gearpump.cluster.master.Master.MasterListUpdated +import org.apache.gearpump.cluster.master.{Master => MasterActor, MasterNode} +import org.apache.gearpump.util.Constants._ +import org.apache.gearpump.util.LogUtil.ProcessType +import org.apache.gearpump.util.{AkkaApp, Constants, LogUtil} +import org.slf4j.Logger + +import scala.collection.JavaConverters._ +import scala.collection.immutable +import scala.concurrent.Await +import scala.concurrent.duration._ + +object Master extends AkkaApp with ArgumentsParser { + + private var LOG: Logger = LogUtil.getLogger(getClass) + + override def akkaConfig: Config = ClusterConfig.master() + + override val options: Array[(String, CLIOption[Any])] = + Array("ip" -> CLIOption[String]("<master ip address>", required = true), + "port" -> CLIOption("<master port>", required = true)) + + override val description = "Start Master daemon" + + def main(akkaConf: Config, args: Array[String]): Unit = { + + this.LOG = { + LogUtil.loadConfiguration(akkaConf, ProcessType.MASTER) + LogUtil.getLogger(getClass) + } + + val config = parse(args) + master(config.getString("ip"), config.getInt("port"), akkaConf) + } + + private def verifyMaster(master: String, port: Int, masters: Iterable[String]) = { + masters.exists { hostPort => + hostPort == s"$master:$port" + } + } + + private def master(ip: String, port: Int, akkaConf: Config): Unit = { + val masters = akkaConf.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).asScala + + if (!verifyMaster(ip, port, masters)) { + LOG.error(s"The provided ip $ip and port $port doesn't conform with config at " + + s"gearpump.cluster.masters: ${masters.mkString(", ")}") + System.exit(-1) + } + + val masterList = masters.map(master => s"akka.tcp://${MASTER}@$master").toList.asJava + val quorum = masterList.size() / 2 + 1 + val masterConfig = akkaConf. + withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(port)). + withValue(NETTY_TCP_HOSTNAME, ConfigValueFactory.fromAnyRef(ip)). + withValue("akka.cluster.seed-nodes", ConfigValueFactory.fromAnyRef(masterList)). + withValue(s"akka.cluster.role.${MASTER}.min-nr-of-members", + ConfigValueFactory.fromAnyRef(quorum)) + + LOG.info(s"Starting Master Actor system $ip:$port, master list: ${masters.mkString(";")}") + val system = ActorSystem(MASTER, masterConfig) + + val replicator = DistributedData(system).replicator + LOG.info(s"Replicator path: ${replicator.path}") + + // Starts singleton manager + val singletonManager = system.actorOf(ClusterSingletonManager.props( + singletonProps = Props(classOf[MasterWatcher], MASTER), + terminationMessage = PoisonPill, + settings = ClusterSingletonManagerSettings(system).withSingletonName(MASTER_WATCHER) + .withRole(MASTER)), + name = SINGLETON_MANAGER) + + // Start master proxy + val masterProxy = system.actorOf(ClusterSingletonProxy.props( + singletonManagerPath = s"/user/${SINGLETON_MANAGER}", + // The effective singleton is s"${MASTER_WATCHER}/$MASTER" instead of s"${MASTER_WATCHER}". + // Master is created when there is a majority of machines started. + settings = ClusterSingletonProxySettings(system) + .withSingletonName(s"${MASTER_WATCHER}/$MASTER").withRole(MASTER)), + name = MASTER + ) + + LOG.info(s"master proxy is started at ${masterProxy.path}") + + val mainThread = Thread.currentThread() + Runtime.getRuntime().addShutdownHook(new Thread() { + override def run(): Unit = { + if (!system.whenTerminated.isCompleted) { + LOG.info("Triggering shutdown hook....") + + system.stop(masterProxy) + val cluster = Cluster(system) + cluster.leave(cluster.selfAddress) + cluster.down(cluster.selfAddress) + try { + Await.result(system.whenTerminated, Duration(3, TimeUnit.SECONDS)) + } catch { + case ex: Exception => // Ignore + } + system.terminate() + mainThread.join() + } + } + }) + + Await.result(system.whenTerminated, Duration.Inf) + } +} + +class MasterWatcher(role: String) extends Actor with ActorLogging { + import context.dispatcher + + val cluster = Cluster(context.system) + + val config = context.system.settings.config + val masters = config.getList("akka.cluster.seed-nodes") + val quorum = masters.size() / 2 + 1 + + val system = context.system + + // Sorts by age, oldest first + val ageOrdering = Ordering.fromLessThan[Member] { (a, b) => a.isOlderThan(b) } + var membersByAge: immutable.SortedSet[Member] = immutable.SortedSet.empty(ageOrdering) + + def receive: Receive = null + + // Subscribes to MemberEvent, re-subscribe when restart + override def preStart(): Unit = { + cluster.subscribe(self, classOf[MemberEvent]) + context.become(waitForInit) + } + override def postStop(): Unit = { + cluster.unsubscribe(self) + } + + def matchingRole(member: Member): Boolean = member.hasRole(role) + + def waitForInit: Receive = { + case state: CurrentClusterState => { + membersByAge = immutable.SortedSet.empty(ageOrdering) ++ state.members.filter(m => + m.status == MemberStatus.Up && matchingRole(m)) + + if (membersByAge.size < quorum) { + membersByAge.iterator.mkString(",") + log.info(s"We cannot get a quorum, $quorum, " + + s"shutting down...${membersByAge.iterator.mkString(",")}") + context.become(waitForShutdown) + self ! MasterWatcher.Shutdown + } else { + val master = context.actorOf(Props(classOf[MasterActor]), MASTER) + notifyMasterMembersChange(master) + context.become(waitForClusterEvent(master)) + } + } + } + + def waitForClusterEvent(master: ActorRef): Receive = { + case MemberUp(m) if matchingRole(m) => { + membersByAge += m + notifyMasterMembersChange(master) + } + case mEvent: MemberEvent if (mEvent.isInstanceOf[MemberExited] || + mEvent.isInstanceOf[MemberRemoved]) && matchingRole(mEvent.member) => { + log.info(s"member removed ${mEvent.member}") + val m = mEvent.member + membersByAge -= m + if (membersByAge.size < quorum) { + log.info(s"We cannot get a quorum, $quorum, " + + s"shutting down...${membersByAge.iterator.mkString(",")}") + context.become(waitForShutdown) + self ! MasterWatcher.Shutdown + } else { + notifyMasterMembersChange(master) + } + } + } + + private def notifyMasterMembersChange(master: ActorRef): Unit = { + val masters = membersByAge.toList.map{ member => + MasterNode(member.address.host.getOrElse("Unknown-Host"), + member.address.port.getOrElse(0)) + } + master ! MasterListUpdated(masters) + } + + def waitForShutdown: Receive = { + case MasterWatcher.Shutdown => { + cluster.unsubscribe(self) + cluster.leave(cluster.selfAddress) + context.stop(self) + system.scheduler.scheduleOnce(Duration.Zero) { + try { + Await.result(system.whenTerminated, Duration(3, TimeUnit.SECONDS)) + } catch { + case ex: Exception => // Ignore + } + system.terminate() + } + } + } +} + +object MasterWatcher { + object Shutdown +} + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala b/core/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala new file mode 100644 index 0000000..3d8d823 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/cluster/main/Worker.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.main + +import akka.actor.{ActorSystem, Props} +import org.apache.gearpump.cluster.ClusterConfig +import org.apache.gearpump.cluster.master.MasterProxy +import org.apache.gearpump.cluster.worker.{Worker => WorkerActor} +import org.apache.gearpump.transport.HostPort +import org.apache.gearpump.util.Constants._ +import org.apache.gearpump.util.LogUtil.ProcessType +import org.apache.gearpump.util.{AkkaApp, LogUtil} +import org.slf4j.Logger + +import scala.collection.JavaConverters._ +import scala.concurrent.Await +import scala.concurrent.duration.Duration + +/** Tool to start a worker daemon process */ +object Worker extends AkkaApp with ArgumentsParser { + protected override def akkaConfig = ClusterConfig.worker() + + override val description = "Start a worker daemon" + + var LOG: Logger = LogUtil.getLogger(getClass) + + private def uuid = java.util.UUID.randomUUID.toString + + def main(akkaConf: Config, args: Array[String]): Unit = { + val id = uuid + + this.LOG = { + LogUtil.loadConfiguration(akkaConf, ProcessType.WORKER) + // Delay creation of LOG instance to avoid creating an empty log file as we + // reset the log file name here + LogUtil.getLogger(getClass) + } + + val system = ActorSystem(id, akkaConf) + + val masterAddress = akkaConf.getStringList(GEARPUMP_CLUSTER_MASTERS).asScala.map { address => + val hostAndPort = address.split(":") + HostPort(hostAndPort(0), hostAndPort(1).toInt) + } + + LOG.info(s"Trying to connect to masters " + masterAddress.mkString(",") + "...") + val masterProxy = system.actorOf(MasterProxy.props(masterAddress), s"masterproxy${system.name}") + + system.actorOf(Props(classOf[WorkerActor], masterProxy), + classOf[WorkerActor].getSimpleName + id) + + Await.result(system.whenTerminated, Duration.Inf) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala b/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala new file mode 100644 index 0000000..0ae7365 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.master + +import akka.actor._ +import akka.pattern.ask +import org.apache.gearpump.cluster.AppMasterToMaster.{AppDataSaved, SaveAppDataFailed, _} +import org.apache.gearpump.cluster.AppMasterToWorker._ +import org.apache.gearpump.cluster.ClientToMaster._ +import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterDataRequest, AppMastersDataRequest, _} +import org.apache.gearpump.cluster.MasterToClient._ +import org.apache.gearpump.cluster.WorkerToAppMaster.{ShutdownExecutorFailed, _} +import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeInfo, ApplicationState} +import org.apache.gearpump.cluster.master.AppManager._ +import org.apache.gearpump.cluster.master.InMemoryKVService.{GetKVResult, PutKVResult, PutKVSuccess, _} +import org.apache.gearpump.cluster.master.Master._ +import org.apache.gearpump.util.Constants._ +import org.apache.gearpump.util.{ActorUtil, TimeOutScheduler, Util, _} +import org.slf4j.Logger + +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.util.{Failure, Success} + +/** + * AppManager is dedicated child of Master to manager all applications. + */ +private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLauncherFactory) + extends Actor with Stash with TimeOutScheduler { + + private val LOG: Logger = LogUtil.getLogger(getClass) + + private val EXECUTOR_ID: Int = APPMASTER_DEFAULT_EXECUTOR_ID + private val appMasterMaxRetries: Int = 5 + private val appMasterRetryTimeRange: Duration = 20.seconds + + implicit val timeout = FUTURE_TIMEOUT + implicit val executionContext = context.dispatcher + + // Next available appId + private var nextAppId: Int = 1 + + // From appId to appMaster data + // Applications not in activeAppMasters or deadAppMasters are in pending status + private var appMasterRegistry = Map.empty[Int, (ActorRef, AppMasterRuntimeInfo)] + + // Active appMaster list where applications are in active status + private var activeAppMasters = Set.empty[Int] + + // Dead appMaster list where applications are in inactive status + private var deadAppMasters = Set.empty[Int] + + private var appMasterRestartPolicies = Map.empty[Int, RestartPolicy] + + def receive: Receive = null + + kvService ! GetKV(MASTER_GROUP, MASTER_STATE) + context.become(waitForMasterState) + + def waitForMasterState: Receive = { + case GetKVSuccess(_, result) => + val masterState = result.asInstanceOf[MasterState] + if (masterState != null) { + this.nextAppId = masterState.maxId + 1 + this.activeAppMasters = masterState.activeAppMasters + this.deadAppMasters = masterState.deadAppMasters + this.appMasterRegistry = masterState.appMasterRegistry + } + context.become(receiveHandler) + unstashAll() + case GetKVFailed(ex) => + LOG.error("Failed to get master state, shutting down master to avoid data corruption...") + context.parent ! PoisonPill + case msg => + LOG.info(s"Get message ${msg.getClass.getSimpleName}") + stash() + } + + def receiveHandler: Receive = { + val msg = "Application Manager started. Ready for application submission..." + LOG.info(msg) + clientMsgHandler orElse appMasterMessage orElse selfMsgHandler orElse workerMessage orElse + appDataStoreService orElse terminationWatch + } + + def clientMsgHandler: Receive = { + case SubmitApplication(app, jar, username) => + LOG.info(s"Submit Application ${app.name}($nextAppId) by $username...") + val client = sender() + if (applicationNameExist(app.name)) { + client ! SubmitApplicationResult(Failure( + new Exception(s"Application name ${app.name} already existed"))) + } else { + context.actorOf(launcher.props(nextAppId, EXECUTOR_ID, app, jar, username, context.parent, + Some(client)), s"launcher${nextAppId}_${Util.randInt()}") + + val appState = new ApplicationState(nextAppId, app.name, 0, app, jar, username, null) + appMasterRestartPolicies += nextAppId -> + new RestartPolicy(appMasterMaxRetries, appMasterRetryTimeRange) + kvService ! PutKV(nextAppId.toString, APP_STATE, appState) + nextAppId += 1 + } + + case RestartApplication(appId) => + val client = sender() + (kvService ? GetKV(appId.toString, APP_STATE)).asInstanceOf[Future[GetKVResult]].map { + case GetKVSuccess(_, result) => + val appState = result.asInstanceOf[ApplicationState] + if (appState != null) { + LOG.info(s"Shutting down the application (restart), $appId") + self ! ShutdownApplication(appId) + self.tell(SubmitApplication(appState.app, appState.jar, appState.username), client) + } else { + client ! SubmitApplicationResult(Failure( + new Exception(s"Failed to restart, because the application $appId does not exist.") + )) + } + case GetKVFailed(ex) => + client ! SubmitApplicationResult(Failure( + new Exception(s"Unable to obtain the Master State. " + + s"Application $appId will not be restarted.") + )) + } + + case ShutdownApplication(appId) => + LOG.info(s"App Manager Shutting down application $appId") + val (_, appInfo) = appMasterRegistry.get(appId) + .filter { case (_, info) => !deadAppMasters.contains(info.appId)} + .getOrElse((null, null)) + Option(appInfo) match { + case Some(info) => + val worker = info.worker + val workerPath = Option(worker).map(_.path).orNull + LOG.info(s"Shutdown AppMaster at $workerPath, appId: $appId, executorId: $EXECUTOR_ID") + cleanApplicationData(appId) + val shutdown = ShutdownExecutor(appId, EXECUTOR_ID, + s"AppMaster $appId shutdown requested by master...") + sendMsgWithTimeOutCallBack(worker, shutdown, 30000, shutDownExecutorTimeOut()) + sender ! ShutdownApplicationResult(Success(appId)) + case None => + val errorMsg = s"Failed to find registration information for appId: $appId" + LOG.error(errorMsg) + sender ! ShutdownApplicationResult(Failure(new Exception(errorMsg))) + } + + case ResolveAppId(appId) => + val (appMaster, _) = appMasterRegistry.getOrElse(appId, (null, null)) + if (null != appMaster) { + sender ! ResolveAppIdResult(Success(appMaster)) + } else { + sender ! ResolveAppIdResult(Failure(new Exception(s"Can not find Application: $appId"))) + } + + case AppMastersDataRequest => + var appMastersData = collection.mutable.ListBuffer[AppMasterData]() + appMasterRegistry.foreach(pair => { + val (id, (appMaster: ActorRef, info: AppMasterRuntimeInfo)) = pair + val appMasterPath = ActorUtil.getFullPath(context.system, appMaster.path) + val workerPath = Option(info.worker).map(worker => + ActorUtil.getFullPath(context.system, worker.path)) + val status = getAppMasterStatus(id) + appMastersData += AppMasterData( + status, id, info.appName, appMasterPath, workerPath.orNull, + info.submissionTime, info.startTime, info.finishTime, info.user) + }) + + sender ! AppMastersData(appMastersData.toList) + + case QueryAppMasterConfig(appId) => + val config = + if (appMasterRegistry.contains(appId)) { + val (_, info) = appMasterRegistry(appId) + info.config + } else { + null + } + sender ! AppMasterConfig(config) + + case appMasterDataRequest: AppMasterDataRequest => + val appId = appMasterDataRequest.appId + val appStatus = getAppMasterStatus(appId) + + appStatus match { + case AppMasterNonExist => + sender ! AppMasterData(AppMasterNonExist) + case _ => + val (appMaster, info) = appMasterRegistry(appId) + val appMasterPath = ActorUtil.getFullPath(context.system, appMaster.path) + val workerPath = Option(info.worker).map( + worker => ActorUtil.getFullPath(context.system, worker.path)).orNull + sender ! AppMasterData( + appStatus, appId, info.appName, appMasterPath, workerPath, + info.submissionTime, info.startTime, info.finishTime, info.user) + } + } + + def workerMessage: Receive = { + case ShutdownExecutorSucceed(appId, executorId) => + LOG.info(s"Shut down executor $executorId for application $appId successfully") + case failed: ShutdownExecutorFailed => + LOG.error(failed.reason) + } + + private def getAppMasterStatus(appId: Int): AppMasterStatus = { + if (activeAppMasters.contains(appId)) { + AppMasterActive + } else if (deadAppMasters.contains(appId)) { + AppMasterInActive + } else if (appMasterRegistry.contains(appId)) { + AppMasterPending + } else { + AppMasterNonExist + } + } + + private def shutDownExecutorTimeOut(): Unit = { + LOG.error(s"Shut down executor time out") + } + + def appMasterMessage: Receive = { + case RegisterAppMaster(appMaster, registerBack: AppMasterRuntimeInfo) => + val startTime = System.currentTimeMillis() + val register = registerBack.copy(startTime = startTime) + + LOG.info(s"Register AppMaster for app: ${register.appId}, $register") + context.watch(appMaster) + appMasterRegistry += register.appId -> (appMaster, register) + kvService ! PutKV(MASTER_GROUP, MASTER_STATE, + MasterState(nextAppId, appMasterRegistry, activeAppMasters, deadAppMasters)) + sender ! AppMasterRegistered(register.appId) + + case ActivateAppMaster(appId) => + LOG.info(s"Activate AppMaster for app $appId") + activeAppMasters += appId + kvService ! PutKV(MASTER_GROUP, MASTER_STATE, + MasterState(this.nextAppId, appMasterRegistry, activeAppMasters, deadAppMasters)) + sender ! AppMasterActivated(appId) + } + + def appDataStoreService: Receive = { + case SaveAppData(appId, key, value) => + val client = sender() + (kvService ? PutKV(appId.toString, key, value)).asInstanceOf[Future[PutKVResult]].map { + case PutKVSuccess => + client ! AppDataSaved + case PutKVFailed(k, ex) => + client ! SaveAppDataFailed + } + case GetAppData(appId, key) => + val client = sender() + (kvService ? GetKV(appId.toString, key)).asInstanceOf[Future[GetKVResult]].map { + case GetKVSuccess(privateKey, value) => + client ! GetAppDataResult(key, value) + case GetKVFailed(ex) => + client ! GetAppDataResult(key, null) + } + } + + def terminationWatch: Receive = { + case terminate: Terminated => + LOG.info(s"AppMaster(${terminate.actor.path}) is terminated, " + + s"network down: ${terminate.getAddressTerminated}") + + // Now we assume that the only normal way to stop the application is submitting a + // ShutdownApplication request + val application = appMasterRegistry.find { appInfo => + val (_, (actorRef, _)) = appInfo + actorRef.compareTo(terminate.actor) == 0 + } + if (application.nonEmpty) { + val appId = application.get._1 + (kvService ? GetKV(appId.toString, APP_STATE)).asInstanceOf[Future[GetKVResult]].map { + case GetKVSuccess(_, result) => + val appState = result.asInstanceOf[ApplicationState] + if (appState != null) { + LOG.info(s"Recovering application, $appId") + self ! RecoverApplication(appState) + } else { + LOG.error(s"Cannot find application state for $appId") + } + case GetKVFailed(ex) => + LOG.error(s"Cannot find master state to recover") + } + } + } + + def selfMsgHandler: Receive = { + case RecoverApplication(state) => + val appId = state.appId + if (appMasterRestartPolicies.get(appId).get.allowRestart) { + LOG.info(s"AppManager Recovering Application $appId...") + activeAppMasters -= appId + kvService ! PutKV(MASTER_GROUP, MASTER_STATE, + MasterState(this.nextAppId, appMasterRegistry, activeAppMasters, deadAppMasters)) + context.actorOf(launcher.props(appId, EXECUTOR_ID, state.app, state.jar, state.username, + context.parent, None), s"launcher${appId}_${Util.randInt()}") + } else { + LOG.error(s"Application $appId failed too many times") + } + } + + case class RecoverApplication(applicationStatus: ApplicationState) + + private def cleanApplicationData(appId: Int): Unit = { + if (appMasterRegistry.contains(appId)) { + // Add the dead app to dead appMasters + deadAppMasters += appId + // Remove the dead app from active appMasters + activeAppMasters -= appId + + appMasterRegistry += appId -> { + val (ref, info) = appMasterRegistry(appId) + (ref, info.copy(finishTime = System.currentTimeMillis())) + } + kvService ! PutKV(MASTER_GROUP, MASTER_STATE, + MasterState(this.nextAppId, appMasterRegistry, activeAppMasters, deadAppMasters)) + kvService ! DeleteKVGroup(appId.toString) + } + } + + private def applicationNameExist(appName: String): Boolean = { + appMasterRegistry.values.exists { case (_, info) => + info.appName == appName && !deadAppMasters.contains(info.appId) + } + } +} + +object AppManager { + final val APP_STATE = "app_state" + // The id is used in KVStore + final val MASTER_STATE = "master_state" + + case class MasterState( + maxId: Int, + appMasterRegistry: Map[Int, (ActorRef, AppMasterRuntimeInfo)], + activeAppMasters: Set[Int], + deadAppMasters: Set[Int]) +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala b/core/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala new file mode 100644 index 0000000..fd19bad --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.master + +import java.util.concurrent.TimeUnit + +import akka.actor._ +import akka.cluster.Cluster +import akka.cluster.ddata.{LWWMap, LWWMapKey, DistributedData} +import akka.cluster.ddata.Replicator._ +import org.apache.gearpump.util.LogUtil +import org.slf4j.Logger + +import scala.concurrent.TimeoutException +import scala.concurrent.duration.Duration + +/** + * A replicated simple in-memory KV service. The replications are stored on all masters. + */ +class InMemoryKVService extends Actor with Stash { + import org.apache.gearpump.cluster.master.InMemoryKVService._ + + private val KV_SERVICE = "gearpump_kvservice" + + private val LOG: Logger = LogUtil.getLogger(getClass) + private val replicator = DistributedData(context.system).replicator + private implicit val cluster = Cluster(context.system) + + // Optimize write path, we can tolerate one master down for recovery. + private val timeout = Duration(15, TimeUnit.SECONDS) + private val readMajority = ReadMajority(timeout) + private val writeMajority = WriteMajority(timeout) + + private def groupKey(group: String): LWWMapKey[Any] = { + LWWMapKey[Any](KV_SERVICE + "_" + group) + } + + def receive: Receive = kvService + + def kvService: Receive = { + + case GetKV(group: String, key: String) => + val request = Request(sender(), key) + replicator ! Get(groupKey(group), readMajority, Some(request)) + case success@GetSuccess(group: LWWMapKey[Any @unchecked], Some(request: Request)) => + val appData = success.get(group) + LOG.info(s"Successfully retrived group: ${group.id}") + request.client ! GetKVSuccess(request.key, appData.get(request.key).orNull) + case NotFound(group: LWWMapKey[Any @unchecked], Some(request: Request)) => + LOG.info(s"We cannot find group $group") + request.client ! GetKVSuccess(request.key, null) + case GetFailure(group: LWWMapKey[Any @unchecked], Some(request: Request)) => + val error = s"Failed to get application data, the request key is ${request.key}" + LOG.error(error) + request.client ! GetKVFailed(new Exception(error)) + + case PutKV(group: String, key: String, value: Any) => + val request = Request(sender(), key) + val update = Update(groupKey(group), LWWMap(), writeMajority, Some(request)) { map => + map + (key -> value) + } + replicator ! update + case UpdateSuccess(group: LWWMapKey[Any @unchecked], Some(request: Request)) => + request.client ! PutKVSuccess + case ModifyFailure(group: LWWMapKey[Any @unchecked], error, cause, Some(request: Request)) => + request.client ! PutKVFailed(request.key, new Exception(error, cause)) + case UpdateTimeout(group: LWWMapKey[Any @unchecked], Some(request: Request)) => + request.client ! PutKVFailed(request.key, new TimeoutException()) + + case delete@DeleteKVGroup(group: String) => + replicator ! Delete(groupKey(group), writeMajority) + case DeleteSuccess(group) => + LOG.info(s"KV Group ${group.id} is deleted") + case ReplicationDeleteFailure(group) => + LOG.error(s"Failed to delete KV Group ${group.id}...") + case DataDeleted(group) => + LOG.error(s"Group ${group.id} is deleted, you can no longer put/get/delete this group...") + } +} + +object InMemoryKVService { + /** + * KV Service related + */ + case class GetKV(group: String, key: String) + + trait GetKVResult + + case class GetKVSuccess(key: String, value: Any) extends GetKVResult + + case class GetKVFailed(ex: Throwable) extends GetKVResult + + case class PutKV(group: String, key: String, value: Any) + + case class DeleteKVGroup(group: String) + + case class GroupDeleted(group: String) extends GetKVResult with PutKVResult + + trait PutKVResult + + case object PutKVSuccess extends PutKVResult + + case class PutKVFailed(key: String, ex: Throwable) extends PutKVResult + + case class Request(client: ActorRef, key: String) +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/cluster/master/Master.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/master/Master.scala b/core/src/main/scala/org/apache/gearpump/cluster/master/Master.scala new file mode 100644 index 0000000..6b4df07 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/cluster/master/Master.scala @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.master + +import java.lang.management.ManagementFactory +import org.apache.gearpump.cluster.worker.WorkerId +import org.apache.gearpump.jarstore.JarStoreServer + +import scala.collection.JavaConverters._ +import scala.collection.immutable + +import akka.actor._ +import akka.remote.DisassociatedEvent +import com.typesafe.config.Config +import org.apache.commons.lang.exception.ExceptionUtils +import org.slf4j.Logger + +import org.apache.gearpump.cluster.AppMasterToMaster._ +import org.apache.gearpump.cluster.ClientToMaster._ +import org.apache.gearpump.cluster.ClusterConfig +import org.apache.gearpump.cluster.MasterToAppMaster._ +import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, MasterConfig, ResolveWorkerIdResult} +import org.apache.gearpump.cluster.MasterToWorker._ +import org.apache.gearpump.cluster.WorkerToMaster._ +import org.apache.gearpump.cluster.master.InMemoryKVService._ +import org.apache.gearpump.cluster.master.Master.{MasterInfo, WorkerTerminated, _} +import org.apache.gearpump.cluster.scheduler.Scheduler.ApplicationFinished +import org.apache.gearpump.metrics.Metrics.ReportMetrics +import org.apache.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService} +import org.apache.gearpump.transport.HostPort +import org.apache.gearpump.util.Constants._ +import org.apache.gearpump.util.HistoryMetricsService.HistoryMetricsConfig +import org.apache.gearpump.util._ + +/** + * Master Actor who manages resources of the whole cluster. + * It is like the resource manager of YARN. + */ +private[cluster] class Master extends Actor with Stash { + private val LOG: Logger = LogUtil.getLogger(getClass) + private val systemConfig: Config = context.system.settings.config + private implicit val timeout = Constants.FUTURE_TIMEOUT + private val kvService = context.actorOf(Props(new InMemoryKVService()), "kvService") + // Resources and resourceRequests can be dynamically constructed by + // heartbeat of worker and appmaster when master singleton is migrated. + // We don't need to persist them in cluster + private var appManager: ActorRef = null + + private var scheduler: ActorRef = null + + private var workers = new immutable.HashMap[ActorRef, WorkerId] + + private val birth = System.currentTimeMillis() + + private var nextWorkerId = 0 + + def receive: Receive = null + + // Register jvm metrics + Metrics(context.system).register(new JvmMetricsSet(s"master")) + + LOG.info("master is started at " + ActorUtil.getFullPath(context.system, self.path) + "...") + + val jarStoreRootPath = systemConfig.getString(Constants.GEARPUMP_APP_JAR_STORE_ROOT_PATH) + + private val jarStore = context.actorOf(Props(classOf[JarStoreServer], jarStoreRootPath)) + + private val hostPort = HostPort(ActorUtil.getSystemAddress(context.system).hostPort) + + // Maintain the list of active masters. + private var masters: List[MasterNode] = { + // Add myself into the list of initial masters. + List(MasterNode(hostPort.host, hostPort.port)) + } + + val metricsEnabled = systemConfig.getBoolean(GEARPUMP_METRIC_ENABLED) + + val getHistoryMetricsConfig = HistoryMetricsConfig(systemConfig) + val historyMetricsService = if (metricsEnabled) { + val historyMetricsService = { + context.actorOf(Props(new HistoryMetricsService("master", getHistoryMetricsConfig))) + } + + val metricsReportService = context.actorOf( + Props(new MetricsReporterService(Metrics(context.system)))) + historyMetricsService.tell(ReportMetrics, metricsReportService) + Some(historyMetricsService) + } else { + None + } + + kvService ! GetKV(MASTER_GROUP, WORKER_ID) + context.become(waitForNextWorkerId) + + def waitForNextWorkerId: Receive = { + case GetKVSuccess(_, result) => + if (result != null) { + this.nextWorkerId = result.asInstanceOf[Int] + } else { + LOG.warn("Cannot find existing state in the distributed cluster...") + } + context.become(receiveHandler) + unstashAll() + case GetKVFailed(ex) => + LOG.error("Failed to get worker id, shutting down master to avoid data corruption...") + context.parent ! PoisonPill + case msg => + LOG.info(s"Get message ${msg.getClass.getSimpleName}") + stash() + } + + def receiveHandler: Receive = workerMsgHandler orElse + appMasterMsgHandler orElse + onMasterListChange orElse + clientMsgHandler orElse + metricsService orElse + jarStoreService orElse + terminationWatch orElse + disassociated orElse + kvServiceMsgHandler orElse + ActorUtil.defaultMsgHandler(self) + + def workerMsgHandler: Receive = { + case RegisterNewWorker => + val workerId = WorkerId(nextWorkerId, System.currentTimeMillis()) + nextWorkerId += 1 + kvService ! PutKV(MASTER_GROUP, WORKER_ID, nextWorkerId) + val workerHostname = ActorUtil.getHostname(sender()) + LOG.info(s"Register new from $workerHostname ....") + self forward RegisterWorker(workerId) + + case RegisterWorker(id) => + context.watch(sender()) + sender ! WorkerRegistered(id, MasterInfo(self, birth)) + scheduler forward WorkerRegistered(id, MasterInfo(self, birth)) + workers += (sender() -> id) + val workerHostname = ActorUtil.getHostname(sender()) + LOG.info(s"Register Worker with id $id from $workerHostname ....") + case resourceUpdate: ResourceUpdate => + scheduler forward resourceUpdate + } + + def jarStoreService: Receive = { + case GetJarStoreServer => + jarStore forward GetJarStoreServer + } + + def kvServiceMsgHandler: Receive = { + case PutKVSuccess => + // Skip + case PutKVFailed(key, exception) => + LOG.error(s"Put KV of key $key to InMemoryKVService failed.\n" + + ExceptionUtils.getStackTrace(exception)) + } + + def metricsService: Receive = { + case query: QueryHistoryMetrics => + if (historyMetricsService.isEmpty) { + // Returns empty metrics so that we don't hang the UI + sender ! HistoryMetrics(query.path, List.empty[HistoryMetricsItem]) + } else { + historyMetricsService.get forward query + } + } + + def appMasterMsgHandler: Receive = { + case request: RequestResource => + scheduler forward request + case registerAppMaster: RegisterAppMaster => + appManager forward registerAppMaster + case activateAppMaster: ActivateAppMaster => + appManager forward activateAppMaster + case save: SaveAppData => + appManager forward save + case get: GetAppData => + appManager forward get + case GetAllWorkers => + sender ! WorkerList(workers.values.toList) + case GetMasterData => + val aliveFor = System.currentTimeMillis() - birth + val logFileDir = LogUtil.daemonLogDir(systemConfig).getAbsolutePath + val userDir = System.getProperty("user.dir") + + val masterDescription = + MasterSummary( + MasterNode(hostPort.host, hostPort.port), + masters, + aliveFor, + logFileDir, + jarStoreRootPath, + MasterStatus.Synced, + userDir, + List.empty[MasterActivity], + jvmName = ManagementFactory.getRuntimeMXBean().getName(), + historyMetricsConfig = getHistoryMetricsConfig + ) + + sender ! MasterData(masterDescription) + + case invalidAppMaster: InvalidAppMaster => + appManager forward invalidAppMaster + } + + import scala.util.{Failure, Success} + + def onMasterListChange: Receive = { + case MasterListUpdated(masters: List[MasterNode]) => + this.masters = masters + } + + def clientMsgHandler: Receive = { + case app: SubmitApplication => + LOG.debug(s"Receive from client, SubmitApplication $app") + appManager.forward(app) + case app: RestartApplication => + LOG.debug(s"Receive from client, RestartApplication $app") + appManager.forward(app) + case app: ShutdownApplication => + LOG.debug(s"Receive from client, Shutting down Application ${app.appId}") + scheduler ! ApplicationFinished(app.appId) + appManager.forward(app) + case app: ResolveAppId => + LOG.debug(s"Receive from client, resolving appId ${app.appId} to ActorRef") + appManager.forward(app) + case resolve: ResolveWorkerId => + LOG.debug(s"Receive from client, resolving workerId ${resolve.workerId}") + val worker = workers.find(_._2 == resolve.workerId) + worker match { + case Some(worker) => sender ! ResolveWorkerIdResult(Success(worker._1)) + case None => sender ! ResolveWorkerIdResult(Failure( + new Exception(s"cannot find worker ${resolve.workerId}"))) + } + case AppMastersDataRequest => + LOG.debug("Master received AppMastersDataRequest") + appManager forward AppMastersDataRequest + case appMasterDataRequest: AppMasterDataRequest => + LOG.debug("Master received AppMasterDataRequest") + appManager forward appMasterDataRequest + case query: QueryAppMasterConfig => + LOG.debug("Master received QueryAppMasterConfig") + appManager forward query + case QueryMasterConfig => + sender ! MasterConfig(ClusterConfig.filterOutDefaultConfig(systemConfig)) + } + + def disassociated: Receive = { + case disassociated: DisassociatedEvent => + LOG.info(s" disassociated ${disassociated.remoteAddress}") + } + + def terminationWatch: Receive = { + case t: Terminated => + val actor = t.actor + LOG.info(s"worker ${actor.path} get terminated, is it due to network reason?" + + t.getAddressTerminated()) + + LOG.info("Let's filter out dead resources...") + // Filters out dead worker resource + if (workers.keySet.contains(actor)) { + scheduler ! WorkerTerminated(workers.get(actor).get) + workers -= actor + } + } + + override def preStart(): Unit = { + val path = ActorUtil.getFullPath(context.system, self.path) + LOG.info(s"master path is $path") + val schedulerClass = Class.forName( + systemConfig.getString(Constants.GEARPUMP_SCHEDULING_SCHEDULER)) + + appManager = context.actorOf(Props(new AppManager(kvService, AppMasterLauncher)), + classOf[AppManager].getSimpleName) + scheduler = context.actorOf(Props(schedulerClass)) + context.system.eventStream.subscribe(self, classOf[DisassociatedEvent]) + } +} + +object Master { + final val MASTER_GROUP = "master_group" + + final val WORKER_ID = "next_worker_id" + + case class WorkerTerminated(workerId: WorkerId) + + case class MasterInfo(master: ActorRef, startTime: Long = 0L) + + /** Notify the subscriber that master actor list has been updated */ + case class MasterListUpdated(masters: List[MasterNode]) + + object MasterInfo { + def empty: MasterInfo = MasterInfo(null) + } + + case class SlotStatus(totalSlots: Int, availableSlots: Int) +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/cluster/scheduler/PriorityScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/scheduler/PriorityScheduler.scala b/core/src/main/scala/org/apache/gearpump/cluster/scheduler/PriorityScheduler.scala new file mode 100644 index 0000000..623e3ff --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/cluster/scheduler/PriorityScheduler.scala @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.scheduler + +import akka.actor.ActorRef +import org.apache.gearpump.cluster.AppMasterToMaster.RequestResource +import org.apache.gearpump.cluster.MasterToAppMaster.ResourceAllocated +import org.apache.gearpump.cluster.scheduler.Relaxation._ +import org.apache.gearpump.cluster.scheduler.Scheduler.PendingRequest +import org.apache.gearpump.cluster.worker.WorkerId + +import scala.collection.mutable + +/** Assign resource to application based on the priority of the application */ +class PriorityScheduler extends Scheduler { + private var resourceRequests = new mutable.PriorityQueue[PendingRequest]()(requestOrdering) + + def requestOrdering: Ordering[PendingRequest] = new Ordering[PendingRequest] { + override def compare(x: PendingRequest, y: PendingRequest): Int = { + var res = x.request.priority.id - y.request.priority.id + if (res == 0) { + res = y.timeStamp.compareTo(x.timeStamp) + } + res + } + } + + override def receive: Receive = super.handleScheduleMessage orElse resourceRequestHandler + + override def allocateResource(): Unit = { + var scheduleLater = Array.empty[PendingRequest] + val resourcesSnapShot = resources.clone() + var allocated = Resource.empty + val totalResource = Resource(resourcesSnapShot.values.map(_._2.slots).sum) + + while (resourceRequests.nonEmpty && (allocated < totalResource)) { + val PendingRequest(appId, appMaster, request, timeStamp) = resourceRequests.dequeue() + request.relaxation match { + case ANY => + val allocations = allocateFairly(resourcesSnapShot, request) + val newAllocated = Resource(allocations.map(_.resource.slots).sum) + if (allocations.nonEmpty) { + appMaster ! ResourceAllocated(allocations.toArray) + } + if (newAllocated < request.resource) { + val remainingRequest = request.resource - newAllocated + val remainingExecutors = request.executorNum - allocations.length + val newResourceRequest = request.copy(resource = remainingRequest, + executorNum = remainingExecutors) + scheduleLater = scheduleLater :+ + PendingRequest(appId, appMaster, newResourceRequest, timeStamp) + } + allocated = allocated + newAllocated + case ONEWORKER => + val availableResource = resourcesSnapShot.find { params => + val (_, (_, resource)) = params + resource > request.resource + } + if (availableResource.nonEmpty) { + val (workerId, (worker, resource)) = availableResource.get + allocated = allocated + request.resource + appMaster ! ResourceAllocated(Array(ResourceAllocation(request.resource, worker, + workerId))) + resourcesSnapShot.update(workerId, (worker, resource - request.resource)) + } else { + scheduleLater = scheduleLater :+ PendingRequest(appId, appMaster, request, timeStamp) + } + case SPECIFICWORKER => + val workerAndResource = resourcesSnapShot.get(request.workerId) + if (workerAndResource.nonEmpty && workerAndResource.get._2 > request.resource) { + val (worker, availableResource) = workerAndResource.get + appMaster ! ResourceAllocated(Array(ResourceAllocation(request.resource, worker, + request.workerId))) + allocated = allocated + request.resource + resourcesSnapShot.update(request.workerId, (worker, + availableResource - request.resource)) + } else { + scheduleLater = scheduleLater :+ PendingRequest(appId, appMaster, request, timeStamp) + } + } + } + for (request <- scheduleLater) + resourceRequests.enqueue(request) + } + + def resourceRequestHandler: Receive = { + case RequestResource(appId, request) => + LOG.info(s"Request resource: appId: $appId, slots: ${request.resource.slots}, " + + s"relaxation: ${request.relaxation}, executor number: ${request.executorNum}") + val appMaster = sender() + resourceRequests.enqueue(new PendingRequest(appId, appMaster, request, + System.currentTimeMillis())) + allocateResource() + } + + override def doneApplication(appId: Int): Unit = { + resourceRequests = resourceRequests.filter(_.appId != appId) + } + + private def allocateFairly( + resources: mutable.HashMap[WorkerId, (ActorRef, Resource)], request: ResourceRequest) + : List[ResourceAllocation] = { + val workerNum = resources.size + var allocations = List.empty[ResourceAllocation] + var totalAvailable = Resource(resources.values.map(_._2.slots).sum) + var remainingRequest = request.resource + var remainingExecutors = Math.min(request.executorNum, request.resource.slots) + + while (remainingExecutors > 0 && !totalAvailable.isEmpty) { + val exeutorNum = Math.min(workerNum, remainingExecutors) + val toRequest = Resource(remainingRequest.slots * exeutorNum / remainingExecutors) + + val sortedResources = resources.toArray.sortBy(_._2._2.slots)(Ordering[Int].reverse) + val pickedResources = sortedResources.take(exeutorNum) + + val flattenResource = pickedResources.zipWithIndex.flatMap { workerWithIndex => + val ((workerId, (worker, resource)), index) = workerWithIndex + 0.until(resource.slots).map(seq => ((workerId, worker), seq * workerNum + index)) + }.sortBy(_._2).map(_._1) + + if (flattenResource.length < toRequest.slots) { + // Can not safisfy the user's requirements + totalAvailable = Resource.empty + } else { + flattenResource.take(toRequest.slots).groupBy(actor => actor).mapValues(_.length). + toArray.foreach { params => + val ((workerId, worker), slots) = params + resources.update(workerId, (worker, resources.get(workerId).get._2 - Resource(slots))) + allocations :+= ResourceAllocation(Resource(slots), worker, workerId) + } + totalAvailable -= toRequest + remainingRequest -= toRequest + remainingExecutors -= exeutorNum + } + } + allocations + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala b/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala new file mode 100644 index 0000000..ec9f1ba --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.cluster.scheduler + +import akka.actor.{Actor, ActorRef} +import org.apache.gearpump.TimeStamp +import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, UpdateResourceSucceed, WorkerRegistered} +import org.apache.gearpump.cluster.WorkerToMaster.ResourceUpdate +import org.apache.gearpump.cluster.master.Master.WorkerTerminated +import org.apache.gearpump.cluster.scheduler.Scheduler.ApplicationFinished +import org.apache.gearpump.cluster.worker.WorkerId +import org.apache.gearpump.util.LogUtil +import org.slf4j.Logger + +import scala.collection.mutable + +/** + * Scheduler schedule resource for different applications. + */ +abstract class Scheduler extends Actor { + val LOG: Logger = LogUtil.getLogger(getClass) + protected var resources = new mutable.HashMap[WorkerId, (ActorRef, Resource)] + + def handleScheduleMessage: Receive = { + case WorkerRegistered(id, _) => + if (!resources.contains(id)) { + LOG.info(s"Worker $id added to the scheduler") + resources.put(id, (sender, Resource.empty)) + } + case update@ResourceUpdate(worker, workerId, resource) => + LOG.info(s"$update...") + if (resources.contains(workerId)) { + val resourceReturned = resource > resources.get(workerId).get._2 + resources.update(workerId, (worker, resource)) + if (resourceReturned) { + allocateResource() + } + sender ! UpdateResourceSucceed + } + else { + sender ! UpdateResourceFailed( + s"ResourceUpdate failed! The worker $workerId has not been registered into master") + } + case WorkerTerminated(workerId) => + if (resources.contains(workerId)) { + resources -= workerId + } + case ApplicationFinished(appId) => + doneApplication(appId) + } + + def allocateResource(): Unit + + def doneApplication(appId: Int): Unit +} + +object Scheduler { + case class PendingRequest( + appId: Int, appMaster: ActorRef, request: ResourceRequest, timeStamp: TimeStamp) + + case class ApplicationFinished(appId: Int) +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala b/core/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala new file mode 100644 index 0000000..3d5b0af --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.cluster.worker + +import java.io.File + +import com.typesafe.config.Config +import org.apache.gearpump.cluster.scheduler.Resource +import org.apache.gearpump.util.{LogUtil, RichProcess, Util} +import org.slf4j.Logger + +/** Launcher to start an executor process */ +class DefaultExecutorProcessLauncher(val config: Config) extends ExecutorProcessLauncher { + private val LOG: Logger = LogUtil.getLogger(getClass) + + override def createProcess( + appId: Int, executorId: Int, resource: Resource, config: Config, options: Array[String], + classPath: Array[String], mainClass: String, arguments: Array[String]): RichProcess = { + + LOG.info(s"Launch executor $executorId, classpath: ${classPath.mkString(File.pathSeparator)}") + Util.startProcess(options, classPath, mainClass, arguments) + } + + override def cleanProcess(appId: Int, executorId: Int): Unit = {} +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala b/core/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala new file mode 100644 index 0000000..447b034 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala @@ -0,0 +1,579 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.worker + +import java.io.File +import java.lang.management.ManagementFactory +import java.net.URL +import java.util.concurrent.{Executors, TimeUnit} + +import akka.actor.SupervisorStrategy.Stop +import akka.actor._ +import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory} +import org.apache.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData} +import org.apache.gearpump.cluster.AppMasterToWorker._ +import org.apache.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryWorkerConfig} +import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, WorkerConfig} +import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceSucceed, UpdateResourceFailed, WorkerRegistered} +import org.apache.gearpump.cluster.WorkerToAppMaster._ +import org.apache.gearpump.cluster.WorkerToMaster.{RegisterNewWorker, RegisterWorker, ResourceUpdate} +import org.apache.gearpump.cluster.master.Master.MasterInfo +import org.apache.gearpump.cluster.scheduler.Resource +import org.apache.gearpump.cluster.worker.Worker.ExecutorWatcher +import org.apache.gearpump.cluster.{ClusterConfig, ExecutorJVMConfig} +import org.apache.gearpump.jarstore.JarStoreClient +import org.apache.gearpump.metrics.Metrics.ReportMetrics +import org.apache.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService} +import org.apache.gearpump.util.ActorSystemBooter.Daemon +import org.apache.gearpump.util.Constants._ +import org.apache.gearpump.util.HistoryMetricsService.HistoryMetricsConfig +import org.apache.gearpump.util.{TimeOutScheduler, _} +import org.slf4j.Logger + +import scala.concurrent.duration._ +import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.util.{Failure, Success, Try} + +/** + * Worker is used to track the resource on single machine, it is like + * the node manager of YARN. + * + * @param masterProxy masterProxy is used to resolve the master + */ +private[cluster] class Worker(masterProxy: ActorRef) extends Actor with TimeOutScheduler { + private val systemConfig: Config = context.system.settings.config + + private val address = ActorUtil.getFullPath(context.system, self.path) + private var resource = Resource.empty + private var allocatedResources = Map[ActorRef, Resource]() + private var executorsInfo = Map[ActorRef, ExecutorSlots]() + private var id: WorkerId = WorkerId.unspecified + private val createdTime = System.currentTimeMillis() + private var masterInfo: MasterInfo = null + private var executorNameToActor = Map.empty[String, ActorRef] + private val executorProcLauncher: ExecutorProcessLauncher = getExecutorProcLauncher() + private val jarStoreClient = new JarStoreClient(systemConfig, context.system) + + private val ioPool = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool()) + private val resourceUpdateTimeoutMs = 30000 // Milliseconds + + private var totalSlots: Int = 0 + + val metricsEnabled = systemConfig.getBoolean(GEARPUMP_METRIC_ENABLED) + var historyMetricsService: Option[ActorRef] = None + + override def receive: Receive = null + var LOG: Logger = LogUtil.getLogger(getClass) + + def service: Receive = + appMasterMsgHandler orElse + clientMessageHandler orElse + metricsService orElse + terminationWatch(masterInfo.master) orElse + ActorUtil.defaultMsgHandler(self) + + def metricsService: Receive = { + case query: QueryHistoryMetrics => + if (historyMetricsService.isEmpty) { + // Returns empty metrics so that we don't hang the UI + sender ! HistoryMetrics(query.path, List.empty[HistoryMetricsItem]) + } else { + historyMetricsService.get forward query + } + } + + private var metricsInitialized = false + + val getHistoryMetricsConfig = HistoryMetricsConfig(systemConfig) + + private def initializeMetrics(): Unit = { + // Registers jvm metrics + val metricsSetName = "worker" + WorkerId.render(id) + Metrics(context.system).register(new JvmMetricsSet(metricsSetName)) + + historyMetricsService = if (metricsEnabled) { + val historyMetricsService = { + context.actorOf(Props(new HistoryMetricsService(metricsSetName, getHistoryMetricsConfig))) + } + + val metricsReportService = context.actorOf(Props( + new MetricsReporterService(Metrics(context.system)))) + historyMetricsService.tell(ReportMetrics, metricsReportService) + Some(historyMetricsService) + } else { + None + } + } + + def waitForMasterConfirm(timeoutTicker: Cancellable): Receive = { + + // If master get disconnected, the WorkerRegistered may be triggered multiple times. + case WorkerRegistered(id, masterInfo) => + this.id = id + + // Adds the flag check, so that we don't re-initialize the metrics when worker re-register + // itself. + if (!metricsInitialized) { + initializeMetrics() + metricsInitialized = true + } + + this.masterInfo = masterInfo + timeoutTicker.cancel() + context.watch(masterInfo.master) + this.LOG = LogUtil.getLogger(getClass, worker = id) + LOG.info(s"Worker is registered. " + + s"actor path: ${ActorUtil.getFullPath(context.system, self.path)} ....") + sendMsgWithTimeOutCallBack(masterInfo.master, ResourceUpdate(self, id, resource), + resourceUpdateTimeoutMs, updateResourceTimeOut()) + context.become(service) + } + + private def updateResourceTimeOut(): Unit = { + LOG.error(s"Update worker resource time out") + } + + def appMasterMsgHandler: Receive = { + case shutdown@ShutdownExecutor(appId, executorId, reason: String) => + val actorName = ActorUtil.actorNameForExecutor(appId, executorId) + val executorToStop = executorNameToActor.get(actorName) + if (executorToStop.isDefined) { + LOG.info(s"Shutdown executor ${actorName}(${executorToStop.get.path.toString}) " + + s"due to: $reason") + executorToStop.get.forward(shutdown) + } else { + LOG.error(s"Cannot find executor $actorName, ignore this message") + sender ! ShutdownExecutorFailed(s"Can not find executor $executorId for app $appId") + } + case launch: LaunchExecutor => + LOG.info(s"$launch") + if (resource < launch.resource) { + sender ! ExecutorLaunchRejected("There is no free resource on this machine") + } else { + val actorName = ActorUtil.actorNameForExecutor(launch.appId, launch.executorId) + + val executor = context.actorOf(Props(classOf[ExecutorWatcher], launch, masterInfo, ioPool, + jarStoreClient, executorProcLauncher)) + executorNameToActor += actorName -> executor + + resource = resource - launch.resource + allocatedResources = allocatedResources + (executor -> launch.resource) + + reportResourceToMaster() + executorsInfo += executor -> + ExecutorSlots(launch.appId, launch.executorId, launch.resource.slots) + context.watch(executor) + } + case UpdateResourceFailed(reason, ex) => + LOG.error(reason) + context.stop(self) + case UpdateResourceSucceed => + LOG.info(s"Update resource succeed") + case GetWorkerData(workerId) => + val aliveFor = System.currentTimeMillis() - createdTime + val logDir = LogUtil.daemonLogDir(systemConfig).getAbsolutePath + val userDir = System.getProperty("user.dir") + sender ! WorkerData(WorkerSummary( + id, "active", + address, + aliveFor, + logDir, + executorsInfo.values.toArray, + totalSlots, + resource.slots, + userDir, + jvmName = ManagementFactory.getRuntimeMXBean().getName(), + resourceManagerContainerId = systemConfig.getString( + GEARPUMP_WORKER_RESOURCE_MANAGER_CONTAINER_ID), + historyMetricsConfig = getHistoryMetricsConfig) + ) + case ChangeExecutorResource(appId, executorId, usedResource) => + for (executor <- executorActorRef(appId, executorId); + allocatedResource <- allocatedResources.get(executor)) { + + allocatedResources += executor -> usedResource + resource = resource + allocatedResource - usedResource + reportResourceToMaster() + + if (usedResource == Resource(0)) { + executorsInfo -= executor + allocatedResources -= executor + // stop executor if there is no resource binded to it. + LOG.info(s"Shutdown executor $executorId because the resource used is zero") + executor ! ShutdownExecutor(appId, executorId, + "Shutdown executor because the resource used is zero") + } + } + } + + private def reportResourceToMaster(): Unit = { + sendMsgWithTimeOutCallBack(masterInfo.master, + ResourceUpdate(self, id, resource), resourceUpdateTimeoutMs, updateResourceTimeOut()) + } + + private def executorActorRef(appId: Int, executorId: Int): Option[ActorRef] = { + val actorName = ActorUtil.actorNameForExecutor(appId, executorId) + executorNameToActor.get(actorName) + } + + def clientMessageHandler: Receive = { + case QueryWorkerConfig(workerId) => + if (this.id == workerId) { + sender ! WorkerConfig(ClusterConfig.filterOutDefaultConfig(systemConfig)) + } else { + sender ! WorkerConfig(ConfigFactory.empty) + } + } + + private def retryRegisterWorker(workerId: WorkerId, timeOutSeconds: Int): Cancellable = { + repeatActionUtil( + seconds = timeOutSeconds, + action = () => { + masterProxy ! RegisterWorker(workerId) + }, + onTimeout = () => { + LOG.error(s"Failed to register the worker $workerId after retrying for $timeOutSeconds " + + s"seconds, abort and kill the worker...") + self ! PoisonPill + }) + } + + def terminationWatch(master: ActorRef): Receive = { + case Terminated(actor) => + if (actor.compareTo(master) == 0) { + // Parent master is down, no point to keep worker anymore. Let's make suicide to free + // resources + LOG.info(s"Master cannot be contacted, find a new master ...") + context.become(waitForMasterConfirm(retryRegisterWorker(id, timeOutSeconds = 30))) + } else if (ActorUtil.isChildActorPath(self, actor)) { + // One executor is down, + LOG.info(s"Executor is down ${getExecutorName(actor)}") + + val allocated = allocatedResources.get(actor) + if (allocated.isDefined) { + resource = resource + allocated.get + executorsInfo -= actor + allocatedResources = allocatedResources - actor + sendMsgWithTimeOutCallBack(master, ResourceUpdate(self, id, resource), + resourceUpdateTimeoutMs, updateResourceTimeOut()) + } + } + } + + private def getExecutorName(actorRef: ActorRef): Option[String] = { + executorNameToActor.find(_._2 == actorRef).map(_._1) + } + + private def getExecutorProcLauncher(): ExecutorProcessLauncher = { + val launcherClazz = Class.forName( + systemConfig.getString(GEARPUMP_EXECUTOR_PROCESS_LAUNCHER)) + launcherClazz.getConstructor(classOf[Config]).newInstance(systemConfig) + .asInstanceOf[ExecutorProcessLauncher] + } + + import context.dispatcher + override def preStart(): Unit = { + LOG.info(s"RegisterNewWorker") + totalSlots = systemConfig.getInt(GEARPUMP_WORKER_SLOTS) + this.resource = Resource(totalSlots) + masterProxy ! RegisterNewWorker + context.become(waitForMasterConfirm(registerTimeoutTicker(seconds = 30))) + } + + private def registerTimeoutTicker(seconds: Int): Cancellable = { + repeatActionUtil(seconds, () => Unit, () => { + LOG.error(s"Failed to register new worker to Master after waiting for $seconds seconds, " + + s"abort and kill the worker...") + self ! PoisonPill + }) + } + + private def repeatActionUtil(seconds: Int, action: () => Unit, onTimeout: () => Unit) + : Cancellable = { + val cancelTimeout = context.system.scheduler.schedule(Duration.Zero, + Duration(2, TimeUnit.SECONDS))(action()) + val cancelSuicide = context.system.scheduler.scheduleOnce(seconds.seconds)(onTimeout()) + new Cancellable { + def cancel(): Boolean = { + val result1 = cancelTimeout.cancel() + val result2 = cancelSuicide.cancel() + result1 && result2 + } + + def isCancelled: Boolean = { + cancelTimeout.isCancelled && cancelSuicide.isCancelled + } + } + } + + override def postStop(): Unit = { + LOG.info(s"Worker is going down....") + ioPool.shutdown() + context.system.terminate() + } +} + +private[cluster] object Worker { + + case class ExecutorResult(result: Try[Int]) + + class ExecutorWatcher( + launch: LaunchExecutor, + masterInfo: MasterInfo, + ioPool: ExecutionContext, + jarStoreClient: JarStoreClient, + procLauncher: ExecutorProcessLauncher) extends Actor { + import launch.{appId, executorId, resource} + + private val LOG: Logger = LogUtil.getLogger(getClass, app = appId, executor = executorId) + + val executorConfig: Config = { + val workerConfig = context.system.settings.config + + val submissionConfig = Option(launch.executorJvmConfig).flatMap { jvmConfig => + Option(jvmConfig.executorAkkaConfig) + }.getOrElse(ConfigFactory.empty()) + + resolveExecutorConfig(workerConfig, submissionConfig) + } + + // For some config, worker has priority, for others, user Application submission config + // have priorities. + private def resolveExecutorConfig(workerConfig: Config, submissionConfig: Config): Config = { + val config = submissionConfig.withoutPath(GEARPUMP_HOSTNAME) + .withoutPath(GEARPUMP_CLUSTER_MASTERS) + .withoutPath(GEARPUMP_HOME) + .withoutPath(GEARPUMP_LOG_DAEMON_DIR) + .withoutPath(GEARPUMP_LOG_APPLICATION_DIR) + .withoutPath(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS) + // Falls back to workerConfig + .withFallback(workerConfig) + + // Minimum supported akka.scheduler.tick-duration on Windows is 10ms + val duration = config.getInt(AKKA_SCHEDULER_TICK_DURATION) + val updatedConf = if (akka.util.Helpers.isWindows && duration < 10) { + LOG.warn(s"$AKKA_SCHEDULER_TICK_DURATION on Windows must be larger than 10ms, set to 10ms") + config.withValue(AKKA_SCHEDULER_TICK_DURATION, ConfigValueFactory.fromAnyRef(10)) + } else { + config + } + + // Excludes reference.conf, and JVM properties.. + ClusterConfig.filterOutDefaultConfig(updatedConf) + } + + implicit val executorService = ioPool + + private val executorHandler = { + val ctx = launch.executorJvmConfig + + if (executorConfig.getBoolean(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS)) { + new ExecutorHandler { + val exitPromise = Promise[Int]() + val app = context.actorOf(Props(new InJvmExecutor(launch, exitPromise))) + + override def destroy(): Unit = { + context.stop(app) + } + override def exitValue: Future[Int] = { + exitPromise.future + } + } + } else { + createProcess(ctx) + } + } + + private def createProcess(ctx: ExecutorJVMConfig): ExecutorHandler = { + + val process = Future { + val jarPath = ctx.jar.map { appJar => + val tempFile = File.createTempFile(appJar.name, ".jar") + jarStoreClient.copyToLocalFile(tempFile, appJar.filePath) + val file = new URL("file:" + tempFile) + file.getFile + } + + val configFile = { + val configFile = File.createTempFile("gearpump", ".conf") + ClusterConfig.saveConfig(executorConfig, configFile) + val file = new URL("file:" + configFile) + file.getFile + } + + val classPath = filterOutDaemonLib(Util.getCurrentClassPath) ++ + ctx.classPath.map(path => expandEnviroment(path)) ++ + jarPath.map(Array(_)).getOrElse(Array.empty[String]) + + val appLogDir = executorConfig.getString(GEARPUMP_LOG_APPLICATION_DIR) + val logArgs = List( + s"-D${GEARPUMP_APPLICATION_ID}=${launch.appId}", + s"-D${GEARPUMP_EXECUTOR_ID}=${launch.executorId}", + s"-D${GEARPUMP_MASTER_STARTTIME}=${getFormatedTime(masterInfo.startTime)}", + s"-D${GEARPUMP_LOG_APPLICATION_DIR}=${appLogDir}") + val configArgs = List(s"-D${GEARPUMP_CUSTOM_CONFIG_FILE}=$configFile") + + val username = List(s"-D${GEARPUMP_USERNAME}=${ctx.username}") + + // Remote debug executor process + val remoteDebugFlag = executorConfig.getBoolean(GEARPUMP_REMOTE_DEBUG_EXECUTOR_JVM) + val remoteDebugConfig = if (remoteDebugFlag) { + val availablePort = Util.findFreePort().get + List( + "-Xdebug", + s"-Xrunjdwp:server=y,transport=dt_socket,address=${availablePort},suspend=n", + s"-D${GEARPUMP_REMOTE_DEBUG_PORT}=$availablePort" + ) + } else { + List.empty[String] + } + + val verboseGCFlag = executorConfig.getBoolean(GEARPUMP_VERBOSE_GC) + val verboseGCConfig = if (verboseGCFlag) { + List( + s"-Xloggc:${appLogDir}/gc-app${launch.appId}-executor-${launch.executorId}.log", + "-verbose:gc", + "-XX:+PrintGCDetails", + "-XX:+PrintGCDateStamps", + "-XX:+PrintTenuringDistribution", + "-XX:+PrintGCApplicationConcurrentTime", + "-XX:+PrintGCApplicationStoppedTime" + ) + } else { + List.empty[String] + } + + val ipv4 = List(s"-D${PREFER_IPV4}=true") + + val options = ctx.jvmArguments ++ username ++ + logArgs ++ remoteDebugConfig ++ verboseGCConfig ++ ipv4 ++ configArgs + + val process = procLauncher.createProcess(appId, executorId, resource, executorConfig, + options, classPath, ctx.mainClass, ctx.arguments) + + ProcessInfo(process, jarPath, configFile) + } + + new ExecutorHandler { + + var destroyed = false + + override def destroy(): Unit = { + LOG.info(s"Destroy executor process ${ctx.mainClass}") + if (!destroyed) { + destroyed = true + process.foreach { info => + info.process.destroy() + info.jarPath.foreach(new File(_).delete()) + new File(info.configFile).delete() + } + } + } + + override def exitValue: Future[Int] = { + process.flatMap { info => + val exit = info.process.exitValue() + if (exit == 0) { + Future.successful(0) + } else { + Future.failed[Int](new Exception(s"Executor exit with failure, exit value: $exit, " + + s"error summary: ${info.process.logger.error}")) + } + } + } + } + } + + private def expandEnviroment(path: String): String = { + // TODO: extend this to support more environment. + path.replace(s"<${GEARPUMP_HOME}>", executorConfig.getString(GEARPUMP_HOME)) + } + + override def preStart(): Unit = { + executorHandler.exitValue.onComplete { value => + procLauncher.cleanProcess(appId, executorId) + val result = ExecutorResult(value) + self ! result + } + } + + override def postStop(): Unit = { + executorHandler.destroy() + } + + // The folders are under ${GEARPUMP_HOME} + val daemonPathPattern = List("lib" + File.separator + "yarn") + + override def receive: Receive = { + case ShutdownExecutor(appId, executorId, reason: String) => + executorHandler.destroy() + sender ! ShutdownExecutorSucceed(appId, executorId) + context.stop(self) + case ExecutorResult(executorResult) => + executorResult match { + case Success(exit) => LOG.info("Executor exit normally with exit value " + exit) + case Failure(e) => LOG.error("Executor exit with errors", e) + } + context.stop(self) + } + + private def getFormatedTime(timestamp: Long): String = { + val datePattern = "yyyy-MM-dd-HH-mm" + val format = new java.text.SimpleDateFormat(datePattern) + format.format(timestamp) + } + + private def filterOutDaemonLib(classPath: Array[String]): Array[String] = { + classPath.filterNot(matchDaemonPattern(_)) + } + + private def matchDaemonPattern(path: String): Boolean = { + daemonPathPattern.exists(path.contains(_)) + } + } + + trait ExecutorHandler { + def destroy(): Unit + def exitValue: Future[Int] + } + + case class ProcessInfo(process: RichProcess, jarPath: Option[String], configFile: String) + + /** + * Starts the executor in the same JVM as worker. + */ + class InJvmExecutor(launch: LaunchExecutor, exit: Promise[Int]) + extends Daemon(launch.executorJvmConfig.arguments(0), launch.executorJvmConfig.arguments(1)) { + private val exitCode = 0 + + override val supervisorStrategy = + OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1.minute) { + case ex: Throwable => + LOG.error(s"system $name stopped ", ex) + exit.failure(ex) + Stop + } + + override def postStop(): Unit = { + if (!exit.isCompleted) { + exit.success(exitCode) + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/metrics/AkkaReporter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/metrics/AkkaReporter.scala b/core/src/main/scala/org/apache/gearpump/metrics/AkkaReporter.scala index acd9493..9092beb 100644 --- a/core/src/main/scala/org/apache/gearpump/metrics/AkkaReporter.scala +++ b/core/src/main/scala/org/apache/gearpump/metrics/AkkaReporter.scala @@ -22,7 +22,7 @@ import scala.collection.JavaConverters._ import akka.actor.{ActorRef, ActorSystem} -import org.apache.gearpump.codahale.metrics.{Gauge => CodaGauge, MetricRegistry} +import com.codahale.metrics.{Gauge => CodaGauge, MetricRegistry} import org.apache.gearpump.metrics.Metrics.{Counter => CounterData, Gauge => GaugeData, Histogram => HistogramData, Meter => MeterData} import org.apache.gearpump.metrics.MetricsReporterService.ReportTo import org.apache.gearpump.util.LogUtil http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/metrics/Counter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/metrics/Counter.scala b/core/src/main/scala/org/apache/gearpump/metrics/Counter.scala index 2a4a195..e778e3d 100644 --- a/core/src/main/scala/org/apache/gearpump/metrics/Counter.scala +++ b/core/src/main/scala/org/apache/gearpump/metrics/Counter.scala @@ -18,10 +18,10 @@ package org.apache.gearpump.metrics -import org.apache.gearpump.codahale.metrics.{Counter => CodaHaleCounter} +import com.codahale.metrics.{Counter => CodaHaleCounter} /** - * @see org.apache.gearpump.codahale.metrics.Counter + * @see com.codahale.metrics.Counter */ class Counter(val name: String, counter: CodaHaleCounter, sampleRate: Int = 1) { private var sampleCount = 0L http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/metrics/Histogram.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/metrics/Histogram.scala b/core/src/main/scala/org/apache/gearpump/metrics/Histogram.scala index 9d1e500..3483c1e 100644 --- a/core/src/main/scala/org/apache/gearpump/metrics/Histogram.scala +++ b/core/src/main/scala/org/apache/gearpump/metrics/Histogram.scala @@ -18,10 +18,10 @@ package org.apache.gearpump.metrics -import org.apache.gearpump.codahale.metrics.{Histogram => CodaHaleHistogram} +import com.codahale.metrics.{Histogram => CodaHaleHistogram} /** - * @see org.apache.gearpump.codahale.metrics.Histogram + * @see com.codahale.metrics.Histogram */ class Histogram(val name: String, histogram: CodaHaleHistogram, sampleRate: Int = 1) { private var sampleCount = 0L http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/core/src/main/scala/org/apache/gearpump/metrics/JvmMetricsSet.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/metrics/JvmMetricsSet.scala b/core/src/main/scala/org/apache/gearpump/metrics/JvmMetricsSet.scala index 436d14f..56b4743 100644 --- a/core/src/main/scala/org/apache/gearpump/metrics/JvmMetricsSet.scala +++ b/core/src/main/scala/org/apache/gearpump/metrics/JvmMetricsSet.scala @@ -21,8 +21,8 @@ package org.apache.gearpump.metrics import java.util import scala.collection.JavaConverters._ -import org.apache.gearpump.codahale.metrics.jvm.{MemoryUsageGaugeSet, ThreadStatesGaugeSet} -import org.apache.gearpump.codahale.metrics.{Metric, MetricSet} +import com.codahale.metrics.jvm.{MemoryUsageGaugeSet, ThreadStatesGaugeSet} +import com.codahale.metrics.{Metric, MetricSet} class JvmMetricsSet(name: String) extends MetricSet {
