http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/main/scala/io/gearpump/cluster/main/Master.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/main/Master.scala b/daemon/src/main/scala/io/gearpump/cluster/main/Master.scala index 8d4515a..6a4ac07 100644 --- a/daemon/src/main/scala/io/gearpump/cluster/main/Master.scala +++ b/daemon/src/main/scala/io/gearpump/cluster/main/Master.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,23 +19,24 @@ package io.gearpump.cluster.main import java.util.concurrent.TimeUnit +import scala.collection.JavaConverters._ +import scala.collection.immutable +import scala.concurrent.Await +import scala.concurrent.duration._ import akka.actor._ import akka.cluster.ClusterEvent._ import akka.cluster.ddata.DistributedData +import akka.cluster.singleton.{ClusterSingletonManager, ClusterSingletonManagerSettings, ClusterSingletonProxy, ClusterSingletonProxySettings} import akka.cluster.{Cluster, Member, MemberStatus} -import akka.cluster.singleton.{ClusterSingletonManagerSettings, ClusterSingletonProxySettings, ClusterSingletonManager, ClusterSingletonProxy} import com.typesafe.config.ConfigValueFactory +import org.slf4j.Logger + import io.gearpump.cluster.ClusterConfig import io.gearpump.cluster.master.{Master => MasterActor} import io.gearpump.util.Constants._ import io.gearpump.util.LogUtil.ProcessType import io.gearpump.util.{AkkaApp, Constants, LogUtil} -import org.slf4j.Logger - -import scala.collection.JavaConverters._ -import scala.collection.immutable -import scala.concurrent.duration._ object Master extends AkkaApp with ArgumentsParser { @@ -44,14 +45,14 @@ object Master extends AkkaApp with ArgumentsParser { override def akkaConfig: Config = ClusterConfig.master() override val options: Array[(String, CLIOption[Any])] = - Array("ip"->CLIOption[String]("<master ip address>",required = true), - "port"->CLIOption("<master port>",required = true)) + Array("ip" -> CLIOption[String]("<master ip address>", required = true), + "port" -> CLIOption("<master port>", required = true)) override val description = "Start Master daemon" def main(akkaConf: Config, args: Array[String]): Unit = { - this.LOG = { + this.LOG = { LogUtil.loadConfiguration(akkaConf, ProcessType.MASTER) LogUtil.getLogger(getClass) } @@ -60,27 +61,29 @@ object Master extends AkkaApp with ArgumentsParser { master(config.getString("ip"), config.getInt("port"), akkaConf) } - def verifyMaster(master : String, port: Int, masters : Iterable[String]) = { - masters.exists{ hostPort => + private def verifyMaster(master: String, port: Int, masters: Iterable[String]) = { + masters.exists { hostPort => hostPort == s"$master:$port" } } - def master(ip:String, port : Int, akkaConf: Config): Unit = { + private def master(ip: String, port: Int, akkaConf: Config): Unit = { val masters = akkaConf.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).asScala if (!verifyMaster(ip, port, masters)) { - LOG.error(s"The provided ip $ip and port $port doesn't conform with config at gearpump.cluster.masters: ${masters.mkString(", ")}") + LOG.error(s"The provided ip $ip and port $port doesn't conform with config at " + + s"gearpump.cluster.masters: ${masters.mkString(", ")}") System.exit(-1) } val masterList = masters.map(master => s"akka.tcp://${MASTER}@$master").toList.asJava - val quorum = masterList.size() /2 + 1 + val quorum = masterList.size() / 2 + 1 val masterConfig = akkaConf. withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(port)). withValue(NETTY_TCP_HOSTNAME, ConfigValueFactory.fromAnyRef(ip)). withValue("akka.cluster.seed-nodes", ConfigValueFactory.fromAnyRef(masterList)). - withValue(s"akka.cluster.role.${MASTER}.min-nr-of-members", ConfigValueFactory.fromAnyRef(quorum)) + withValue(s"akka.cluster.role.${MASTER}.min-nr-of-members", + ConfigValueFactory.fromAnyRef(quorum)) LOG.info(s"Starting Master Actor system $ip:$port, master list: ${masters.mkString(";")}") val system = ActorSystem(MASTER, masterConfig) @@ -88,19 +91,21 @@ object Master extends AkkaApp with ArgumentsParser { val replicator = DistributedData(system).replicator LOG.info(s"Replicator path: ${replicator.path}") - //start singleton manager + // Starts singleton manager val singletonManager = system.actorOf(ClusterSingletonManager.props( singletonProps = Props(classOf[MasterWatcher], MASTER), terminationMessage = PoisonPill, - settings = ClusterSingletonManagerSettings(system).withSingletonName(MASTER_WATCHER).withRole(MASTER)), + settings = ClusterSingletonManagerSettings(system).withSingletonName(MASTER_WATCHER) + .withRole(MASTER)), name = SINGLETON_MANAGER) - //start master proxy + // Start master proxy val masterProxy = system.actorOf(ClusterSingletonProxy.props( singletonManagerPath = s"/user/${SINGLETON_MANAGER}", // The effective singleton is s"${MASTER_WATCHER}/$MASTER" instead of s"${MASTER_WATCHER}". - // Master will only be created when there is a majority of machines started. - settings = ClusterSingletonProxySettings(system).withSingletonName(s"${MASTER_WATCHER}/$MASTER").withRole(MASTER)), + // Master is created when there is a majority of machines started. + settings = ClusterSingletonProxySettings(system) + .withSingletonName(s"${MASTER_WATCHER}/$MASTER").withRole(MASTER)), name = MASTER ) @@ -108,8 +113,8 @@ object Master extends AkkaApp with ArgumentsParser { val mainThread = Thread.currentThread() Runtime.getRuntime().addShutdownHook(new Thread() { - override def run() : Unit = { - if (!system.isTerminated) { + override def run(): Unit = { + if (!system.whenTerminated.isCompleted) { LOG.info("Triggering shutdown hook....") system.stop(masterProxy) @@ -117,21 +122,21 @@ object Master extends AkkaApp with ArgumentsParser { cluster.leave(cluster.selfAddress) cluster.down(cluster.selfAddress) try { - system.awaitTermination(Duration(3, TimeUnit.SECONDS)) + Await.result(system.whenTerminated, Duration(3, TimeUnit.SECONDS)) } catch { - case ex : Exception => //ignore + case ex: Exception => // Ignore } - system.shutdown() + system.terminate() mainThread.join() } } }) - system.awaitTermination() + Await.result(system.whenTerminated, Duration.Inf) } } -class MasterWatcher(role: String) extends Actor with ActorLogging { +class MasterWatcher(role: String) extends Actor with ActorLogging { import context.dispatcher val cluster = Cluster(context.system) @@ -142,13 +147,13 @@ class MasterWatcher(role: String) extends Actor with ActorLogging { val system = context.system - // sort by age, oldest first + // Sorts by age, oldest first val ageOrdering = Ordering.fromLessThan[Member] { (a, b) => a.isOlderThan(b) } var membersByAge: immutable.SortedSet[Member] = immutable.SortedSet.empty(ageOrdering) - def receive : Receive = null + def receive: Receive = null - // subscribe to MemberEvent, re-subscribe when restart + // Subscribes to MemberEvent, re-subscribe when restart override def preStart(): Unit = { cluster.subscribe(self, classOf[MemberEvent]) context.become(waitForInit) @@ -159,14 +164,15 @@ class MasterWatcher(role: String) extends Actor with ActorLogging { def matchingRole(member: Member): Boolean = member.hasRole(role) - def waitForInit : Receive = { + def waitForInit: Receive = { case state: CurrentClusterState => { membersByAge = immutable.SortedSet.empty(ageOrdering) ++ state.members.filter(m => m.status == MemberStatus.Up && matchingRole(m)) if (membersByAge.size < quorum) { membersByAge.iterator.mkString(",") - log.info(s"We cannot get a quorum, $quorum, shutting down...${membersByAge.iterator.mkString(",")}") + log.info(s"We cannot get a quorum, $quorum, " + + s"shutting down...${membersByAge.iterator.mkString(",")}") context.become(waitForShutdown) self ! MasterWatcher.Shutdown } else { @@ -176,34 +182,36 @@ class MasterWatcher(role: String) extends Actor with ActorLogging { } } - def waitForClusterEvent : Receive = { - case MemberUp(m) if matchingRole(m) => { + def waitForClusterEvent: Receive = { + case MemberUp(m) if matchingRole(m) => { membersByAge += m } - case mEvent: MemberEvent if (mEvent.isInstanceOf[MemberExited] || mEvent.isInstanceOf[MemberRemoved]) && matchingRole(mEvent.member) => { + case mEvent: MemberEvent if (mEvent.isInstanceOf[MemberExited] || + mEvent.isInstanceOf[MemberRemoved]) && matchingRole(mEvent.member) => { log.info(s"member removed ${mEvent.member}") val m = mEvent.member membersByAge -= m if (membersByAge.size < quorum) { - log.info(s"We cannot get a quorum, $quorum, shutting down...${membersByAge.iterator.mkString(",")}") + log.info(s"We cannot get a quorum, $quorum, " + + s"shutting down...${membersByAge.iterator.mkString(",")}") context.become(waitForShutdown) self ! MasterWatcher.Shutdown } } } - def waitForShutdown : Receive = { + def waitForShutdown: Receive = { case MasterWatcher.Shutdown => { cluster.unsubscribe(self) cluster.leave(cluster.selfAddress) context.stop(self) system.scheduler.scheduleOnce(Duration.Zero) { try { - system.awaitTermination(Duration(3, TimeUnit.SECONDS)) + Await.result(system.whenTerminated, Duration(3, TimeUnit.SECONDS)) } catch { - case ex : Exception => //ignore + case ex: Exception => // Ignore } - system.shutdown() + system.terminate() } } }
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/main/scala/io/gearpump/cluster/main/Replay.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/main/Replay.scala b/daemon/src/main/scala/io/gearpump/cluster/main/Replay.scala index 451376e..c9a6e9c 100644 --- a/daemon/src/main/scala/io/gearpump/cluster/main/Replay.scala +++ b/daemon/src/main/scala/io/gearpump/cluster/main/Replay.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,12 +17,12 @@ */ package io.gearpump.cluster.main -import io.gearpump.cluster.client.ClientContext -import io.gearpump.util.{AkkaApp, LogUtil} import org.slf4j.Logger -import scala.util.Try +import io.gearpump.cluster.client.ClientContext +import io.gearpump.util.{AkkaApp, LogUtil} +// Internal tool to restart an application object Replay extends AkkaApp with ArgumentsParser { private val LOG: Logger = LogUtil.getLogger(getClass) @@ -31,19 +31,18 @@ object Replay extends AkkaApp with ArgumentsParser { "appid" -> CLIOption("<application id>", required = true), // For document purpose only, OPTION_CONFIG option is not used here. // OPTION_CONFIG is parsed by parent shell command "Gear" transparently. - Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false, defaultValue = None)) + Gear.OPTION_CONFIG -> CLIOption("custom configuration file", required = false, + defaultValue = None)) override val description = "Replay the application from current min clock(low watermark)" def main(akkaConf: Config, args: Array[String]): Unit = { val config = parse(args) - if (null == config) { - return + if (null != config) { + val client = ClientContext(akkaConf) + client.replayFromTimestampWindowTrailingEdge(config.getInt("appid")) + client.close() } - - val client = ClientContext(akkaConf) - client.replayFromTimestampWindowTrailingEdge(config.getInt("appid")) - client.close() } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/main/scala/io/gearpump/cluster/main/Worker.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/main/Worker.scala b/daemon/src/main/scala/io/gearpump/cluster/main/Worker.scala index 66fe25b..4818262 100644 --- a/daemon/src/main/scala/io/gearpump/cluster/main/Worker.scala +++ b/daemon/src/main/scala/io/gearpump/cluster/main/Worker.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,34 +18,38 @@ package io.gearpump.cluster.main +import scala.collection.JavaConverters._ +import scala.concurrent.Await +import scala.concurrent.duration.Duration + import akka.actor.{ActorSystem, Props} +import org.slf4j.Logger + import io.gearpump.cluster.ClusterConfig import io.gearpump.cluster.master.MasterProxy import io.gearpump.cluster.worker.{Worker => WorkerActor} import io.gearpump.transport.HostPort import io.gearpump.util.Constants._ -import io.gearpump.util.{AkkaApp, LogUtil} import io.gearpump.util.LogUtil.ProcessType -import org.slf4j.Logger - -import scala.collection.JavaConverters._ -import scala.util.Try +import io.gearpump.util.{AkkaApp, LogUtil} +/** Tool to start a worker daemon process */ object Worker extends AkkaApp with ArgumentsParser { - override def akkaConfig = ClusterConfig.worker() + protected override def akkaConfig = ClusterConfig.worker() override val description = "Start a worker daemon" - var LOG : Logger = LogUtil.getLogger(getClass) + var LOG: Logger = LogUtil.getLogger(getClass) - def uuid = java.util.UUID.randomUUID.toString + private def uuid = java.util.UUID.randomUUID.toString def main(akkaConf: Config, args: Array[String]): Unit = { val id = uuid this.LOG = { LogUtil.loadConfiguration(akkaConf, ProcessType.WORKER) - //delay creation of LOG instance to avoid creating an empty log file as we reset the log file name here + // Delay creation of LOG instance to avoid creating an empty log file as we + // reset the log file name here LogUtil.getLogger(getClass) } @@ -62,6 +66,6 @@ object Worker extends AkkaApp with ArgumentsParser { system.actorOf(Props(classOf[WorkerActor], masterProxy), classOf[WorkerActor].getSimpleName + id) - system.awaitTermination() + Await.result(system.whenTerminated, Duration.Inf) } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/main/scala/io/gearpump/cluster/master/AppManager.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/master/AppManager.scala b/daemon/src/main/scala/io/gearpump/cluster/master/AppManager.scala index e6bd1db..058533e 100644 --- a/daemon/src/main/scala/io/gearpump/cluster/master/AppManager.scala +++ b/daemon/src/main/scala/io/gearpump/cluster/master/AppManager.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,8 +18,14 @@ package io.gearpump.cluster.master +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.util.{Failure, Success} + import akka.actor._ import akka.pattern.ask +import org.slf4j.Logger + import io.gearpump.cluster.AppMasterToMaster.{AppDataSaved, SaveAppDataFailed, _} import io.gearpump.cluster.AppMasterToWorker._ import io.gearpump.cluster.ClientToMaster._ @@ -32,32 +38,29 @@ import io.gearpump.cluster.master.InMemoryKVService.{GetKVResult, PutKVResult, P import io.gearpump.cluster.master.Master._ import io.gearpump.util.Constants._ import io.gearpump.util.{ActorUtil, TimeOutScheduler, Util, _} -import org.slf4j.Logger - -import scala.concurrent.Future -import scala.concurrent.duration._ -import scala.util.{Failure, Success} /** - * AppManager is dedicated part of Master to manager all applicaitons. + * AppManager is dedicated child of Master to manager all applications. */ -private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLauncherFactory) extends Actor with Stash with TimeOutScheduler{ +private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLauncherFactory) + extends Actor with Stash with TimeOutScheduler { + private val LOG: Logger = LogUtil.getLogger(getClass) - private val executorId : Int = APPMASTER_DEFAULT_EXECUTOR_ID + private val executorId: Int = APPMASTER_DEFAULT_EXECUTOR_ID private val appMasterMaxRetries: Int = 5 - private val appMasterRetryTimeRange: Duration = 20 seconds + private val appMasterRetryTimeRange: Duration = 20.seconds implicit val timeout = FUTURE_TIMEOUT implicit val executionContext = context.dispatcher - //next available appId + // Next available appId private var appId: Int = 1 - //from appid to appMaster data + // From appid to appMaster data private var appMasterRegistry = Map.empty[Int, (ActorRef, AppMasterRuntimeInfo)] - // dead appmaster list + // Dead appmaster list private var deadAppMasters = Map.empty[Int, (ActorRef, AppMasterRuntimeInfo)] private var appMasterRestartPolicies = Map.empty[Int, RestartPolicy] @@ -70,7 +73,7 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch def waitForMasterState: Receive = { case GetKVSuccess(_, result) => val masterState = result.asInstanceOf[MasterState] - if(masterState != null) { + if (masterState != null) { this.appId = masterState.maxId + 1 this.deadAppMasters = masterState.deadAppMasters this.appMasterRegistry = masterState.appMasterRegistry @@ -85,11 +88,11 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch stash() } - def receiveHandler : Receive = { + def receiveHandler: Receive = { val msg = "Application Manager started. Ready for application submission..." - System.out.println(msg) LOG.info(msg) - clientMsgHandler orElse appMasterMessage orElse selfMsgHandler orElse workerMessage orElse appDataStoreService orElse terminationWatch + clientMsgHandler orElse appMasterMessage orElse selfMsgHandler orElse workerMessage orElse + appDataStoreService orElse terminationWatch } def clientMsgHandler: Receive = { @@ -97,12 +100,15 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch LOG.info(s"Submit Application ${app.name}($appId) by $username...") val client = sender if (applicationNameExist(app.name)) { - client ! SubmitApplicationResult(Failure(new Exception(s"Application name ${app.name} already existed"))) + client ! SubmitApplicationResult(Failure( + new Exception(s"Application name ${app.name} already existed"))) } else { - context.actorOf(launcher.props(appId, executorId, app, jar, username, context.parent, Some(client)), s"launcher${appId}_${Util.randInt}") + context.actorOf(launcher.props(appId, executorId, app, jar, username, context.parent, + Some(client)), s"launcher${appId}_${Util.randInt()}") val appState = new ApplicationState(appId, app.name, 0, app, jar, username, null) - appMasterRestartPolicies += appId -> new RestartPolicy(appMasterMaxRetries, appMasterRetryTimeRange) + appMasterRestartPolicies += appId -> + new RestartPolicy(appMasterMaxRetries, appMasterRetryTimeRange) kvService ! PutKV(appId.toString, APP_STATE, appState) appId += 1 } @@ -123,7 +129,8 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch } case GetKVFailed(ex) => client ! SubmitApplicationResult(Failure( - new Exception(s"Unable to obtain the Master State. Application $appId will not be restarted.") + new Exception(s"Unable to obtain the Master State. " + + s"Application $appId will not be restarted.") )) } @@ -133,9 +140,11 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch Option(info) match { case Some(info) => val worker = info.worker - LOG.info(s"Shutdown AppMaster at ${Option(worker).map(_.path).orNull}, appId: $appId, executorId: $executorId") + val workerPath = Option(worker).map(_.path).orNull + LOG.info(s"Shutdown AppMaster at ${workerPath}, appId: $appId, executorId: $executorId") cleanApplicationData(appId) - val shutdown = ShutdownExecutor(appId, executorId, s"AppMaster $appId shutdown requested by master...") + val shutdown = ShutdownExecutor(appId, executorId, + s"AppMaster $appId shutdown requested by master...") sendMsgWithTimeOutCallBack(worker, shutdown, 30000, shutDownExecutorTimeOut()) sender ! ShutdownApplicationResult(Success(appId)) case None => @@ -153,18 +162,20 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch case AppMastersDataRequest => var appMastersData = collection.mutable.ListBuffer[AppMasterData]() appMasterRegistry.foreach(pair => { - val (id, (appMaster:ActorRef, info: AppMasterRuntimeInfo)) = pair + val (id, (appMaster: ActorRef, info: AppMasterRuntimeInfo)) = pair val appMasterPath = ActorUtil.getFullPath(context.system, appMaster.path) - val workerPath = Option(info.worker).map(worker => ActorUtil.getFullPath(context.system, worker.path)) + val workerPath = Option(info.worker).map(worker => + ActorUtil.getFullPath(context.system, worker.path)) appMastersData += AppMasterData( AppMasterActive, id, info.appName, appMasterPath, workerPath.orNull, info.submissionTime, info.startTime, info.finishTime, info.user) }) deadAppMasters.foreach(pair => { - val (id, (appMaster:ActorRef, info:AppMasterRuntimeInfo)) = pair + val (id, (appMaster: ActorRef, info: AppMasterRuntimeInfo)) = pair val appMasterPath = ActorUtil.getFullPath(context.system, appMaster.path) - val workerPath = Option(info.worker).map(worker => ActorUtil.getFullPath(context.system, worker.path)) + val workerPath = Option(info.worker).map(worker => + ActorUtil.getFullPath(context.system, worker.path)) appMastersData += AppMasterData( AppMasterInActive, id, info.appName, appMasterPath, workerPath.orNull, @@ -218,7 +229,7 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch case failed: ShutdownExecutorFailed => LOG.error(failed.reason) } - + private def shutDownExecutorTimeOut(): Unit = { LOG.error(s"Shut down executor time out") } @@ -231,7 +242,8 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch LOG.info(s"Register AppMaster for app: ${register.appId} $register") context.watch(appMaster) appMasterRegistry += register.appId -> (appMaster, register) - kvService ! PutKV(MASTER_GROUP, MASTER_STATE, MasterState(appId, appMasterRegistry, deadAppMasters)) + kvService ! PutKV(MASTER_GROUP, MASTER_STATE, + MasterState(appId, appMasterRegistry, deadAppMasters)) sender ! AppMasterRegistered(register.appId) } @@ -257,13 +269,16 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch def terminationWatch: Receive = { case terminate: Terminated => terminate.getAddressTerminated() - LOG.info(s"AppMaster(${terminate.actor.path}) is terminiated, network down: ${terminate.getAddressTerminated()}") - //Now we assume that the only normal way to stop the application is submitting a ShutdownApplication request - val application = appMasterRegistry.find{appInfo => + LOG.info(s"AppMaster(${terminate.actor.path}) is terminiated, " + + s"network down: ${terminate.getAddressTerminated()}") + + // Now we assume that the only normal way to stop the application is submitting a + // ShutdownApplication request + val application = appMasterRegistry.find { appInfo => val (_, (actorRef, _)) = appInfo actorRef.compareTo(terminate.actor) == 0 } - if(application.nonEmpty){ + if (application.nonEmpty) { val appId = application.get._1 (kvService ? GetKV(appId.toString, APP_STATE)).asInstanceOf[Future[GetKVResult]].map { case GetKVSuccess(_, result) => @@ -283,26 +298,29 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch def selfMsgHandler: Receive = { case RecoverApplication(state) => val appId = state.appId - if(appMasterRestartPolicies.get(appId).get.allowRestart) { + if (appMasterRestartPolicies.get(appId).get.allowRestart) { LOG.info(s"AppManager Recovering Application $appId...") - context.actorOf(launcher.props(appId, executorId, state.app, state.jar, state.username, context.parent, None), s"launcher${appId}_${Util.randInt}") + context.actorOf(launcher.props(appId, executorId, state.app, state.jar, state.username, + context.parent, None), s"launcher${appId}_${Util.randInt()}") } else { LOG.error(s"Application $appId failed too many times") } } - case class RecoverApplication(applicationStatus : ApplicationState) + case class RecoverApplication(applicationStatus: ApplicationState) - private def cleanApplicationData(appId : Int) : Unit = { - //add the dead app to dead appMaster + private def cleanApplicationData(appId: Int): Unit = { + // Add the dead app to dead appMaster appMasterRegistry.get(appId).foreach { pair => val (appMasterActor, info) = pair - deadAppMasters += appId -> (appMasterActor, info.copy(finishTime = System.currentTimeMillis())) + deadAppMasters += appId -> (appMasterActor, info.copy( + finishTime = System.currentTimeMillis())) } appMasterRegistry -= appId - kvService ! PutKV(MASTER_GROUP, MASTER_STATE, MasterState(this.appId, appMasterRegistry, deadAppMasters)) + kvService ! PutKV(MASTER_GROUP, MASTER_STATE, + MasterState(this.appId, appMasterRegistry, deadAppMasters)) kvService ! DeleteKVGroup(appId.toString) } @@ -313,7 +331,7 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch object AppManager { final val APP_STATE = "app_state" - //The id is used in KVStore + // The id is used in KVStore final val MASTER_STATE = "master_state" case class MasterState( http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/main/scala/io/gearpump/cluster/master/InMemoryKVService.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/master/InMemoryKVService.scala b/daemon/src/main/scala/io/gearpump/cluster/master/InMemoryKVService.scala index fb66a0c..616d3ee 100644 --- a/daemon/src/main/scala/io/gearpump/cluster/master/InMemoryKVService.scala +++ b/daemon/src/main/scala/io/gearpump/cluster/master/InMemoryKVService.scala @@ -19,21 +19,22 @@ package io.gearpump.cluster.master import java.util.concurrent.TimeUnit +import scala.concurrent.TimeoutException +import scala.concurrent.duration.Duration import akka.actor._ import akka.cluster.Cluster -import akka.cluster.ddata.{DistributedData, LWWMap, Key, LWWMapKey} import akka.cluster.ddata.Replicator._ -import io.gearpump.util.{LogUtil} +import akka.cluster.ddata.{DistributedData, LWWMap, LWWMapKey} import org.slf4j.Logger -import scala.concurrent.TimeoutException -import scala.concurrent.duration.Duration + +import io.gearpump.util.LogUtil /** - * A replicated simple in-memory KV service. + * A replicated simple in-memory KV service. The replications are stored on all masters. */ class InMemoryKVService extends Actor with Stash { - import InMemoryKVService._ + import io.gearpump.cluster.master.InMemoryKVService._ private val KV_SERVICE = "gearpump_kvservice" @@ -41,7 +42,7 @@ class InMemoryKVService extends Actor with Stash { private val replicator = DistributedData(context.system).replicator private implicit val cluster = Cluster(context.system) - //optimize write path, we can tolerate one master down for recovery. + // Optimize write path, we can tolerate one master down for recovery. private val timeout = Duration(15, TimeUnit.SECONDS) private val readMajority = ReadMajority(timeout) private val writeMajority = WriteMajority(timeout) @@ -50,39 +51,39 @@ class InMemoryKVService extends Actor with Stash { LWWMapKey[Any](KV_SERVICE + "_" + group) } - def receive : Receive = kvService + def receive: Receive = kvService - def kvService : Receive = { + def kvService: Receive = { - case GetKV(group: String, key : String) => + case GetKV(group: String, key: String) => val request = Request(sender(), key) replicator ! Get(groupKey(group), readMajority, Some(request)) - case success@ GetSuccess(group: LWWMapKey[Any], Some(request: Request)) => + case success@GetSuccess(group: LWWMapKey[Any @unchecked], Some(request: Request)) => val appData = success.get(group) LOG.info(s"Successfully retrived group: ${group.id}") request.client ! GetKVSuccess(request.key, appData.get(request.key).orNull) - case NotFound(group: LWWMapKey[Any], Some(request: Request)) => + case NotFound(group: LWWMapKey[Any @unchecked], Some(request: Request)) => LOG.info(s"We cannot find group $group") request.client ! GetKVSuccess(request.key, null) - case GetFailure(group: LWWMapKey[Any], Some(request: Request)) => + case GetFailure(group: LWWMapKey[Any @unchecked], Some(request: Request)) => val error = s"Failed to get application data, the request key is ${request.key}" LOG.error(error) request.client ! GetKVFailed(new Exception(error)) case PutKV(group: String, key: String, value: Any) => val request = Request(sender(), key) - val update = Update(groupKey(group), LWWMap(), writeMajority, Some(request)) {map => + val update = Update(groupKey(group), LWWMap(), writeMajority, Some(request)) { map => map + (key -> value) } replicator ! update - case UpdateSuccess(group: LWWMapKey[Any], Some(request: Request)) => - request.client ! PutKVSuccess - case ModifyFailure(group: LWWMapKey[Any], error, cause, Some(request: Request)) => + case UpdateSuccess(group: LWWMapKey[Any @unchecked], Some(request: Request)) => + request.client ! PutKVSuccess + case ModifyFailure(group: LWWMapKey[Any @unchecked], error, cause, Some(request: Request)) => request.client ! PutKVFailed(request.key, new Exception(error, cause)) - case UpdateTimeout(group: LWWMapKey[Any], Some(request: Request)) => + case UpdateTimeout(group: LWWMapKey[Any @unchecked], Some(request: Request)) => request.client ! PutKVFailed(request.key, new TimeoutException()) - case delete@ DeleteKVGroup(group: String) => + case delete@DeleteKVGroup(group: String) => replicator ! Delete(groupKey(group), writeMajority) case DeleteSuccess(group) => LOG.info(s"KV Group ${group.id} is deleted") http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala b/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala index fdef42e..0203237 100644 --- a/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala +++ b/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,20 +19,26 @@ package io.gearpump.cluster.master import java.lang.management.ManagementFactory +import scala.collection.JavaConverters._ +import scala.collection.immutable import akka.actor._ import akka.remote.DisassociatedEvent import com.typesafe.config.Config +import org.apache.commons.lang.exception.ExceptionUtils +import org.slf4j.Logger + import io.gearpump.cluster.AppMasterToMaster._ import io.gearpump.cluster.ClientToMaster._ import io.gearpump.cluster.ClusterConfig import io.gearpump.cluster.MasterToAppMaster._ -import io.gearpump.cluster.MasterToClient.{HistoryMetricsItem, HistoryMetrics, MasterConfig, ResolveWorkerIdResult} +import io.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, MasterConfig, ResolveWorkerIdResult} import io.gearpump.cluster.MasterToWorker._ import io.gearpump.cluster.WorkerToMaster._ import io.gearpump.cluster.master.InMemoryKVService._ import io.gearpump.cluster.master.Master.{MasterInfo, WorkerTerminated, _} import io.gearpump.cluster.scheduler.Scheduler.ApplicationFinished +import io.gearpump.cluster.worker.WorkerId import io.gearpump.jarstore.local.LocalJarStore import io.gearpump.metrics.Metrics.ReportMetrics import io.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService} @@ -40,31 +46,22 @@ import io.gearpump.transport.HostPort import io.gearpump.util.Constants._ import io.gearpump.util.HistoryMetricsService.HistoryMetricsConfig import io.gearpump.util._ -import io.gearpump.WorkerId - -import org.apache.commons.lang.exception.ExceptionUtils -import org.slf4j.Logger - -import scala.collection.JavaConverters._ -import scala.collection.immutable /** - * Master manage resources of the whole cluster. + * Master Actor who manages resources of the whole cluster. * It is like the resource manager of YARN. - * */ private[cluster] class Master extends Actor with Stash { private val LOG: Logger = LogUtil.getLogger(getClass) - private val systemConfig : Config = context.system.settings.config + private val systemConfig: Config = context.system.settings.config private implicit val timeout = Constants.FUTURE_TIMEOUT private val kvService = context.actorOf(Props(new InMemoryKVService()), "kvService") - // resources and resourceRequests can be dynamically constructed by + // Resources and resourceRequests can be dynamically constructed by // heartbeat of worker and appmaster when master singleton is migrated. - // we don't need to persist them in cluster + // We don't need to persist them in cluster + private var appManager: ActorRef = null - private var appManager : ActorRef = null - - private var scheduler : ActorRef = null + private var scheduler: ActorRef = null private var workers = new immutable.HashMap[ActorRef, WorkerId] @@ -74,16 +71,16 @@ private[cluster] class Master extends Actor with Stash { def receive: Receive = null - // register jvm metrics + // Register jvm metrics Metrics(context.system).register(new JvmMetricsSet(s"master")) LOG.info("master is started at " + ActorUtil.getFullPath(context.system, self.path) + "...") val jarStoreRootPath = systemConfig.getString(Constants.GEARPUMP_APP_JAR_STORE_ROOT_PATH) - val jarStore = if(Util.isLocalPath(jarStoreRootPath)) { + private val jarStore = if (Util.isLocalPath(jarStoreRootPath)) { Some(context.actorOf(Props(classOf[LocalJarStore], jarStoreRootPath))) - } else{ + } else { None } @@ -97,7 +94,8 @@ private[cluster] class Master extends Actor with Stash { context.actorOf(Props(new HistoryMetricsService("master", getHistoryMetricsConfig))) } - val metricsReportService = context.actorOf(Props(new MetricsReporterService(Metrics(context.system)))) + val metricsReportService = context.actorOf( + Props(new MetricsReporterService(Metrics(context.system)))) historyMetricsService.tell(ReportMetrics, metricsReportService) Some(historyMetricsService) } else { @@ -109,7 +107,7 @@ private[cluster] class Master extends Actor with Stash { def waitForNextWorkerId: Receive = { case GetKVSuccess(_, result) => - if(result != null) { + if (result != null) { this.nextWorkerId = result.asInstanceOf[Int] } else { LOG.warn("Cannot find existing state in the distributed cluster...") @@ -124,7 +122,7 @@ private[cluster] class Master extends Actor with Stash { stash() } - def receiveHandler : Receive = workerMsgHandler orElse + def receiveHandler: Receive = workerMsgHandler orElse appMasterMsgHandler orElse clientMsgHandler orElse metricsService orElse @@ -134,7 +132,7 @@ private[cluster] class Master extends Actor with Stash { kvServiceMsgHandler orElse ActorUtil.defaultMsgHandler(self) - def workerMsgHandler : Receive = { + def workerMsgHandler: Receive = { case RegisterNewWorker => val workerId = WorkerId(nextWorkerId, System.currentTimeMillis()) nextWorkerId += 1 @@ -150,41 +148,42 @@ private[cluster] class Master extends Actor with Stash { workers += (sender() -> id) val workerHostname = ActorUtil.getHostname(sender()) LOG.info(s"Register Worker with id $id from $workerHostname ....") - case resourceUpdate : ResourceUpdate => + case resourceUpdate: ResourceUpdate => scheduler forward resourceUpdate } - def jarStoreService : Receive = { + def jarStoreService: Receive = { case GetJarStoreServer => jarStore.foreach(_ forward GetJarStoreServer) } def kvServiceMsgHandler: Receive = { case PutKVSuccess => - //Skip + // Skip case PutKVFailed(key, exception) => - LOG.error(s"Put KV of key $key to InMemoryKVService failed.\n" + ExceptionUtils.getStackTrace(exception)) + LOG.error(s"Put KV of key $key to InMemoryKVService failed.\n" + + ExceptionUtils.getStackTrace(exception)) } - def metricsService : Receive = { + def metricsService: Receive = { case query: QueryHistoryMetrics => if (historyMetricsService.isEmpty) { - // return empty metrics so that we don't hang the UI + // Returns empty metrics so that we don't hang the UI sender ! HistoryMetrics(query.path, List.empty[HistoryMetricsItem]) } else { historyMetricsService.get forward query } } - def appMasterMsgHandler : Receive = { - case request : RequestResource => + def appMasterMsgHandler: Receive = { + case request: RequestResource => scheduler forward request - case registerAppMaster : RegisterAppMaster => - //forward to appManager + case registerAppMaster: RegisterAppMaster => + // Forward to appManager appManager forward registerAppMaster - case save : SaveAppData => + case save: SaveAppData => appManager forward save - case get : GetAppData => + case get: GetAppData => appManager forward get case GetAllWorkers => sender ! WorkerList(workers.values.toList) @@ -219,7 +218,7 @@ private[cluster] class Master extends Actor with Stash { if (cluster.isEmpty) { - //add myself into the list if it is a single node cluster + // Add myself into the list if it is a single node cluster List(hostPort) } else { cluster @@ -228,18 +227,18 @@ private[cluster] class Master extends Actor with Stash { import scala.util.{Failure, Success} - def clientMsgHandler : Receive = { - case app : SubmitApplication => + def clientMsgHandler: Receive = { + case app: SubmitApplication => LOG.debug(s"Receive from client, SubmitApplication $app") appManager.forward(app) - case app : RestartApplication => + case app: RestartApplication => LOG.debug(s"Receive from client, RestartApplication $app") appManager.forward(app) - case app : ShutdownApplication => + case app: ShutdownApplication => LOG.debug(s"Receive from client, Shutting down Application ${app.appId}") scheduler ! ApplicationFinished(app.appId) appManager.forward(app) - case app : ResolveAppId => + case app: ResolveAppId => LOG.debug(s"Receive from client, resolving appId ${app.appId} to ActorRef") appManager.forward(app) case resolve: ResolveWorkerId => @@ -247,7 +246,8 @@ private[cluster] class Master extends Actor with Stash { val worker = workers.find(_._2 == resolve.workerId) worker match { case Some(worker) => sender ! ResolveWorkerIdResult(Success(worker._1)) - case None => sender ! ResolveWorkerIdResult(Failure(new Exception(s"cannot find worker ${resolve.workerId}"))) + case None => sender ! ResolveWorkerIdResult(Failure( + new Exception(s"cannot find worker ${resolve.workerId}"))) } case AppMastersDataRequest => LOG.debug("Master received AppMastersDataRequest") @@ -262,19 +262,20 @@ private[cluster] class Master extends Actor with Stash { sender ! MasterConfig(ClusterConfig.filterOutDefaultConfig(systemConfig)) } - def disassociated : Receive = { - case disassociated : DisassociatedEvent => + def disassociated: Receive = { + case disassociated: DisassociatedEvent => LOG.info(s" disassociated ${disassociated.remoteAddress}") - //LOG.info(s"remote lifecycle events are "+systemConfig.getString("akka.remote.log-remote-lifecycle-events")) } - def terminationWatch : Receive = { - case t : Terminated => + def terminationWatch: Receive = { + case t: Terminated => val actor = t.actor - LOG.info(s"worker ${actor.path} get terminated, is it due to network reason? ${t.getAddressTerminated()}") + LOG.info(s"worker ${actor.path} get terminated, is it due to network reason?" + + t.getAddressTerminated()) + LOG.info("Let's filter out dead resources...") - // filter out dead worker resource - if(workers.keySet.contains(actor)){ + // Filters out dead worker resource + if (workers.keySet.contains(actor)) { scheduler ! WorkerTerminated(workers.get(actor).get) workers -= actor } @@ -283,9 +284,11 @@ private[cluster] class Master extends Actor with Stash { override def preStart(): Unit = { val path = ActorUtil.getFullPath(context.system, self.path) LOG.info(s"master path is $path") - val schedulerClass = Class.forName(systemConfig.getString(Constants.GEARPUMP_SCHEDULING_SCHEDULER)) + val schedulerClass = Class.forName( + systemConfig.getString(Constants.GEARPUMP_SCHEDULING_SCHEDULER)) - appManager = context.actorOf(Props(new AppManager(kvService, AppMasterLauncher)), classOf[AppManager].getSimpleName) + appManager = context.actorOf(Props(new AppManager(kvService, AppMasterLauncher)), + classOf[AppManager].getSimpleName) scheduler = context.actorOf(Props(schedulerClass)) context.system.eventStream.subscribe(self, classOf[DisassociatedEvent]) } @@ -298,10 +301,10 @@ object Master { case class WorkerTerminated(workerId: WorkerId) - case class MasterInfo(master: ActorRef, startTime : Long = 0L) + case class MasterInfo(master: ActorRef, startTime: Long = 0L) object MasterInfo { - def empty = MasterInfo(null) + def empty: MasterInfo = MasterInfo(null) } case class SlotStatus(totalSlots: Int, availableSlots: Int) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/main/scala/io/gearpump/cluster/scheduler/PriorityScheduler.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/scheduler/PriorityScheduler.scala b/daemon/src/main/scala/io/gearpump/cluster/scheduler/PriorityScheduler.scala index 3b1bd9f..5df008e 100644 --- a/daemon/src/main/scala/io/gearpump/cluster/scheduler/PriorityScheduler.scala +++ b/daemon/src/main/scala/io/gearpump/cluster/scheduler/PriorityScheduler.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,23 +18,26 @@ package io.gearpump.cluster.scheduler +import scala.collection.mutable + import akka.actor.ActorRef -import io.gearpump.WorkerId + import io.gearpump.cluster.AppMasterToMaster.RequestResource import io.gearpump.cluster.MasterToAppMaster.ResourceAllocated import io.gearpump.cluster.scheduler.Relaxation._ import io.gearpump.cluster.scheduler.Scheduler.PendingRequest +import io.gearpump.cluster.worker.WorkerId -import scala.collection.mutable - +/** Assign resource to application based on the priority of the application */ class PriorityScheduler extends Scheduler { private var resourceRequests = new mutable.PriorityQueue[PendingRequest]()(requestOrdering) - def requestOrdering = new Ordering[PendingRequest] { - override def compare(x: PendingRequest, y: PendingRequest) = { + def requestOrdering: Ordering[PendingRequest] = new Ordering[PendingRequest] { + override def compare(x: PendingRequest, y: PendingRequest): Int = { var res = x.request.priority.id - y.request.priority.id - if (res == 0) + if (res == 0) { res = y.timeStamp.compareTo(x.timeStamp) + } res } } @@ -59,8 +62,10 @@ class PriorityScheduler extends Scheduler { if (newAllocated < request.resource) { val remainingRequest = request.resource - newAllocated val remainingExecutors = request.executorNum - allocations.length - val newResourceRequest = request.copy(resource = remainingRequest, executorNum = remainingExecutors) - scheduleLater = scheduleLater :+ PendingRequest(appId, appMaster, newResourceRequest, timeStamp) + val newResourceRequest = request.copy(resource = remainingRequest, + executorNum = remainingExecutors) + scheduleLater = scheduleLater :+ + PendingRequest(appId, appMaster, newResourceRequest, timeStamp) } allocated = allocated + newAllocated case ONEWORKER => @@ -71,7 +76,8 @@ class PriorityScheduler extends Scheduler { if (availableResource.nonEmpty) { val (workerId, (worker, resource)) = availableResource.get allocated = allocated + request.resource - appMaster ! ResourceAllocated(Array(ResourceAllocation(request.resource, worker, workerId))) + appMaster ! ResourceAllocated(Array(ResourceAllocation(request.resource, worker, + workerId))) resourcesSnapShot.update(workerId, (worker, resource - request.resource)) } else { scheduleLater = scheduleLater :+ PendingRequest(appId, appMaster, request, timeStamp) @@ -80,24 +86,27 @@ class PriorityScheduler extends Scheduler { val workerAndResource = resourcesSnapShot.get(request.workerId) if (workerAndResource.nonEmpty && workerAndResource.get._2 > request.resource) { val (worker, availableResource) = workerAndResource.get - appMaster ! ResourceAllocated(Array(ResourceAllocation(request.resource, worker, request.workerId))) + appMaster ! ResourceAllocated(Array(ResourceAllocation(request.resource, worker, + request.workerId))) allocated = allocated + request.resource - resourcesSnapShot.update(request.workerId, (worker, availableResource - request.resource)) + resourcesSnapShot.update(request.workerId, (worker, + availableResource - request.resource)) } else { scheduleLater = scheduleLater :+ PendingRequest(appId, appMaster, request, timeStamp) } } } - for(request <- scheduleLater) + for (request <- scheduleLater) resourceRequests.enqueue(request) } def resourceRequestHandler: Receive = { case RequestResource(appId, request) => - LOG.info(s"Request resource: appId: $appId, slots: ${request.resource.slots}, relaxation: ${request.relaxation}," + - s" executor number: ${request.executorNum}") + LOG.info(s"Request resource: appId: $appId, slots: ${request.resource.slots}, " + + s"relaxation: ${request.relaxation}, executor number: ${request.executorNum}") val appMaster = sender() - resourceRequests.enqueue(new PendingRequest(appId, appMaster, request, System.currentTimeMillis())) + resourceRequests.enqueue(new PendingRequest(appId, appMaster, request, + System.currentTimeMillis())) allocateResource() } @@ -105,7 +114,9 @@ class PriorityScheduler extends Scheduler { resourceRequests = resourceRequests.filter(_.appId != appId) } - private def allocateFairly(resources: mutable.HashMap[WorkerId, (ActorRef, Resource)], request: ResourceRequest): List[ResourceAllocation] = { + private def allocateFairly( + resources: mutable.HashMap[WorkerId, (ActorRef, Resource)], request: ResourceRequest) + : List[ResourceAllocation] = { val workerNum = resources.size var allocations = List.empty[ResourceAllocation] var totalAvailable = Resource(resources.values.map(_._2.slots).sum) @@ -116,13 +127,16 @@ class PriorityScheduler extends Scheduler { val exeutorNum = Math.min(workerNum, remainingExecutors) val toRequest = Resource(remainingRequest.slots * exeutorNum / remainingExecutors) - val flattenResource = resources.toArray.sortBy(_._2._2.slots)(Ordering[Int].reverse).take(exeutorNum).zipWithIndex.flatMap { workerWithIndex => + val sortedResources = resources.toArray.sortBy(_._2._2.slots)(Ordering[Int].reverse) + val pickedResources = sortedResources.take(exeutorNum) + + val flattenResource = pickedResources.zipWithIndex.flatMap { workerWithIndex => val ((workerId, (worker, resource)), index) = workerWithIndex 0.until(resource.slots).map(seq => ((workerId, worker), seq * workerNum + index)) }.sortBy(_._2).map(_._1) if (flattenResource.length < toRequest.slots) { - //Can not safisfy the user's requirements + // Can not safisfy the user's requirements totalAvailable = Resource.empty } else { flattenResource.take(toRequest.slots).groupBy(actor => actor).mapValues(_.length). http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/main/scala/io/gearpump/cluster/scheduler/Scheduler.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/scheduler/Scheduler.scala b/daemon/src/main/scala/io/gearpump/cluster/scheduler/Scheduler.scala index 8ccf1fb..ccd105f 100644 --- a/daemon/src/main/scala/io/gearpump/cluster/scheduler/Scheduler.scala +++ b/daemon/src/main/scala/io/gearpump/cluster/scheduler/Scheduler.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,47 +17,48 @@ */ package io.gearpump.cluster.scheduler +import scala.collection.mutable + import akka.actor.{Actor, ActorRef} -import io.gearpump.cluster.MasterToWorker.UpdateResourceSucceed -import io.gearpump.util.LogUtil -import io.gearpump.{WorkerId, TimeStamp} +import org.slf4j.Logger + +import io.gearpump.TimeStamp import io.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, UpdateResourceSucceed, WorkerRegistered} import io.gearpump.cluster.WorkerToMaster.ResourceUpdate import io.gearpump.cluster.master.Master.WorkerTerminated import io.gearpump.cluster.scheduler.Scheduler.ApplicationFinished +import io.gearpump.cluster.worker.WorkerId import io.gearpump.util.LogUtil -import org.slf4j.Logger - -import scala.collection.mutable /** * Scheduler schedule resource for different applications. */ -abstract class Scheduler extends Actor{ +abstract class Scheduler extends Actor { val LOG: Logger = LogUtil.getLogger(getClass) protected var resources = new mutable.HashMap[WorkerId, (ActorRef, Resource)] - def handleScheduleMessage : Receive = { + def handleScheduleMessage: Receive = { case WorkerRegistered(id, _) => - if(!resources.contains(id)) { + if (!resources.contains(id)) { LOG.info(s"Worker $id added to the scheduler") resources.put(id, (sender, Resource.empty)) } case update@ResourceUpdate(worker, workerId, resource) => LOG.info(s"$update...") - if(resources.contains(workerId)) { + if (resources.contains(workerId)) { val resourceReturned = resource > resources.get(workerId).get._2 resources.update(workerId, (worker, resource)) - if(resourceReturned){ + if (resourceReturned) { allocateResource() } sender ! UpdateResourceSucceed } else { - sender ! UpdateResourceFailed(s"ResourceUpdate failed! The worker $workerId has not been registered into master") + sender ! UpdateResourceFailed( + s"ResourceUpdate failed! The worker $workerId has not been registered into master") } case WorkerTerminated(workerId) => - if(resources.contains(workerId)){ + if (resources.contains(workerId)) { resources -= workerId } case ApplicationFinished(appId) => @@ -69,7 +70,9 @@ abstract class Scheduler extends Actor{ def doneApplication(appId: Int): Unit } -object Scheduler{ - case class PendingRequest(appId: Int, appMaster: ActorRef, request: ResourceRequest, timeStamp: TimeStamp) +object Scheduler { + case class PendingRequest( + appId: Int, appMaster: ActorRef, request: ResourceRequest, timeStamp: TimeStamp) + case class ApplicationFinished(appId: Int) } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/main/scala/io/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala b/daemon/src/main/scala/io/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala index 08a342e..f97a209 100644 --- a/daemon/src/main/scala/io/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala +++ b/daemon/src/main/scala/io/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -20,15 +20,18 @@ package io.gearpump.cluster.worker import java.io.File import com.typesafe.config.Config +import org.slf4j.Logger + import io.gearpump.cluster.scheduler.Resource import io.gearpump.util.{LogUtil, RichProcess, Util} -import org.slf4j.Logger +/** Launcher to start an executor process */ class DefaultExecutorProcessLauncher(val config: Config) extends ExecutorProcessLauncher { private val LOG: Logger = LogUtil.getLogger(getClass) - override def createProcess(appId: Int, executorId: Int, resource: Resource, config: Config, options: Array[String], - classPath: Array[String], mainClass: String, arguments: Array[String]): RichProcess = { + override def createProcess( + appId: Int, executorId: Int, resource: Resource, config: Config, options: Array[String], + classPath: Array[String], mainClass: String, arguments: Array[String]): RichProcess = { LOG.info(s"Launch executor, classpath: ${classPath.mkString(File.pathSeparator)}") Util.startProcess(options, classPath, mainClass, arguments) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/main/scala/io/gearpump/cluster/worker/Worker.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/worker/Worker.scala b/daemon/src/main/scala/io/gearpump/cluster/worker/Worker.scala index a746b39..1c22b05 100644 --- a/daemon/src/main/scala/io/gearpump/cluster/worker/Worker.scala +++ b/daemon/src/main/scala/io/gearpump/cluster/worker/Worker.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -22,34 +22,33 @@ import java.io.File import java.lang.management.ManagementFactory import java.net.URL import java.util.concurrent.{Executors, TimeUnit} +import scala.concurrent.duration._ +import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.util.{Failure, Success, Try} import akka.actor.SupervisorStrategy.Stop import akka.actor._ import com.typesafe.config.{Config, ConfigFactory} -import io.gearpump.WorkerId +import org.slf4j.Logger + import io.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData} import io.gearpump.cluster.AppMasterToWorker._ import io.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryWorkerConfig} -import io.gearpump.cluster.worker.Worker.ExecutorWatcher -import io.gearpump.cluster.{ExecutorJVMConfig, ClusterConfig} import io.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, WorkerConfig} import io.gearpump.cluster.MasterToWorker._ import io.gearpump.cluster.WorkerToAppMaster._ import io.gearpump.cluster.WorkerToMaster._ import io.gearpump.cluster.master.Master.MasterInfo import io.gearpump.cluster.scheduler.Resource +import io.gearpump.cluster.worker.Worker.ExecutorWatcher +import io.gearpump.cluster.{ClusterConfig, ExecutorJVMConfig} import io.gearpump.jarstore.JarStoreService import io.gearpump.metrics.Metrics.ReportMetrics import io.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService} import io.gearpump.util.ActorSystemBooter.Daemon import io.gearpump.util.Constants._ import io.gearpump.util.HistoryMetricsService.HistoryMetricsConfig -import io.gearpump.util.{Constants, TimeOutScheduler, _} -import org.slf4j.Logger - -import scala.concurrent.{Future, Promise, ExecutionContext} -import scala.concurrent.duration._ -import scala.util.{Try, Success, Failure} +import io.gearpump.util.{TimeOutScheduler, _} /** * Worker is used to track the resource on single machine, it is like @@ -57,8 +56,8 @@ import scala.util.{Try, Success, Failure} * * @param masterProxy masterProxy is used to resolve the master */ -private[cluster] class Worker(masterProxy : ActorRef) extends Actor with TimeOutScheduler{ - private val systemConfig : Config = context.system.settings.config +private[cluster] class Worker(masterProxy: ActorRef) extends Actor with TimeOutScheduler { + private val systemConfig: Config = context.system.settings.config private val address = ActorUtil.getFullPath(context.system, self.path) private var resource = Resource.empty @@ -73,14 +72,14 @@ private[cluster] class Worker(masterProxy : ActorRef) extends Actor with TimeOut jarStoreService.init(systemConfig, context.system) private val ioPool = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool()) - private val resourceUpdateTimeoutMs = 30000 //milliseconds + private val resourceUpdateTimeoutMs = 30000 // Milliseconds private var totalSlots: Int = 0 val metricsEnabled = systemConfig.getBoolean(GEARPUMP_METRIC_ENABLED) var historyMetricsService: Option[ActorRef] = None - override def receive : Receive = null + override def receive: Receive = null var LOG: Logger = LogUtil.getLogger(getClass) def service: Receive = @@ -90,10 +89,10 @@ private[cluster] class Worker(masterProxy : ActorRef) extends Actor with TimeOut terminationWatch(masterInfo.master) orElse ActorUtil.defaultMsgHandler(self) - def metricsService : Receive = { + def metricsService: Receive = { case query: QueryHistoryMetrics => if (historyMetricsService.isEmpty) { - // return empty metrics so that we don't hang the UI + // Returns empty metrics so that we don't hang the UI sender ! HistoryMetrics(query.path, List.empty[HistoryMetricsItem]) } else { historyMetricsService.get forward query @@ -104,8 +103,8 @@ private[cluster] class Worker(masterProxy : ActorRef) extends Actor with TimeOut val getHistoryMetricsConfig = HistoryMetricsConfig(systemConfig) - private def initializeMetrics: Unit = { - // register jvm metrics + private def initializeMetrics(): Unit = { + // Registers jvm metrics Metrics(context.system).register(new JvmMetricsSet(s"worker${id}")) historyMetricsService = if (metricsEnabled) { @@ -113,7 +112,8 @@ private[cluster] class Worker(masterProxy : ActorRef) extends Actor with TimeOut context.actorOf(Props(new HistoryMetricsService("worker" + id, getHistoryMetricsConfig))) } - val metricsReportService = context.actorOf(Props(new MetricsReporterService(Metrics(context.system)))) + val metricsReportService = context.actorOf(Props( + new MetricsReporterService(Metrics(context.system)))) historyMetricsService.tell(ReportMetrics, metricsReportService) Some(historyMetricsService) } else { @@ -121,25 +121,27 @@ private[cluster] class Worker(masterProxy : ActorRef) extends Actor with TimeOut } } - def waitForMasterConfirm(killSelf : Cancellable) : Receive = { + def waitForMasterConfirm(timeoutTicker: Cancellable): Receive = { // If master get disconnected, the WorkerRegistered may be triggered multiple times. case WorkerRegistered(id, masterInfo) => this.id = id - // Add the flag check, so that we don't re-initialize when WorkerRegistered - // is triggered multiple times. + // Adds the flag check, so that we don't re-initialize the metrics when worker re-register + // itself. if (!metricsInitialized) { - initializeMetrics + initializeMetrics() metricsInitialized = true } this.masterInfo = masterInfo - killSelf.cancel() + timeoutTicker.cancel() context.watch(masterInfo.master) this.LOG = LogUtil.getLogger(getClass, worker = id) - LOG.info(s"Worker is registered. actor path: ${ActorUtil.getFullPath(context.system, self.path)} ....") - sendMsgWithTimeOutCallBack(masterInfo.master, ResourceUpdate(self, id, resource), resourceUpdateTimeoutMs, updateResourceTimeOut()) + LOG.info(s"Worker is registered. " + + s"actor path: ${ActorUtil.getFullPath(context.system, self.path)} ....") + sendMsgWithTimeOutCallBack(masterInfo.master, ResourceUpdate(self, id, resource), + resourceUpdateTimeoutMs, updateResourceTimeOut()) context.become(service) } @@ -147,31 +149,33 @@ private[cluster] class Worker(masterProxy : ActorRef) extends Actor with TimeOut LOG.error(s"Update worker resource time out") } - def appMasterMsgHandler : Receive = { - case shutdown @ ShutdownExecutor(appId, executorId, reason : String) => + def appMasterMsgHandler: Receive = { + case shutdown@ShutdownExecutor(appId, executorId, reason: String) => val actorName = ActorUtil.actorNameForExecutor(appId, executorId) val executorToStop = executorNameToActor.get(actorName) if (executorToStop.isDefined) { - LOG.info(s"Shutdown executor ${actorName}(${executorToStop.get.path.toString}) due to: $reason") + LOG.info(s"Shutdown executor ${actorName}(${executorToStop.get.path.toString}) " + + s"due to: $reason") executorToStop.get.forward(shutdown) } else { LOG.error(s"Cannot find executor $actorName, ignore this message") sender ! ShutdownExecutorFailed(s"Can not find executor $executorId for app $appId") } - case launch : LaunchExecutor => + case launch: LaunchExecutor => LOG.info(s"$launch") if (resource < launch.resource) { sender ! ExecutorLaunchRejected("There is no free resource on this machine") } else { val actorName = ActorUtil.actorNameForExecutor(launch.appId, launch.executorId) - val executor = context.actorOf(Props(classOf[ExecutorWatcher], launch, masterInfo, ioPool, jarStoreService, executorProcLauncher)) - executorNameToActor += actorName ->executor + val executor = context.actorOf(Props(classOf[ExecutorWatcher], launch, masterInfo, ioPool, + jarStoreService, executorProcLauncher)) + executorNameToActor += actorName -> executor resource = resource - launch.resource allocatedResources = allocatedResources + (executor -> launch.resource) - reportResourceToMaster + reportResourceToMaster() executorsInfo += executor -> ExecutorSlots(launch.appId, launch.executorId, launch.resource.slots) context.watch(executor) @@ -195,26 +199,29 @@ private[cluster] class Worker(masterProxy : ActorRef) extends Actor with TimeOut resource.slots, userDir, jvmName = ManagementFactory.getRuntimeMXBean().getName(), - resourceManagerContainerId = systemConfig.getString(Constants.GEARPUMP_WORKER_RESOURCE_MANAGER_CONTAINER_ID), + resourceManagerContainerId = systemConfig.getString( + GEARPUMP_WORKER_RESOURCE_MANAGER_CONTAINER_ID), historyMetricsConfig = getHistoryMetricsConfig) ) case ChangeExecutorResource(appId, executorId, usedResource) => for (executor <- executorActorRef(appId, executorId); - allocatedResource <- allocatedResources.get(executor)) { + allocatedResource <- allocatedResources.get(executor)) { + allocatedResources += executor -> usedResource resource = resource + allocatedResource - usedResource - reportResourceToMaster + reportResourceToMaster() if (usedResource == Resource(0)) { allocatedResources -= executor // stop executor if there is no resource binded to it. LOG.info(s"Shutdown executor $executorId because the resource used is zero") - executor ! ShutdownExecutor(appId, executorId, "Shutdown executor because the resource used is zero") + executor ! ShutdownExecutor(appId, executorId, + "Shutdown executor because the resource used is zero") } } } - private def reportResourceToMaster: Unit = { + private def reportResourceToMaster(): Unit = { sendMsgWithTimeOutCallBack(masterInfo.master, ResourceUpdate(self, id, resource), resourceUpdateTimeoutMs, updateResourceTimeOut()) } @@ -233,14 +240,28 @@ private[cluster] class Worker(masterProxy : ActorRef) extends Actor with TimeOut } } - def terminationWatch(master : ActorRef) : Receive = { + private def retryRegisterWorker(workerId: WorkerId, timeOutSeconds: Int): Cancellable = { + repeatActionUtil( + seconds = timeOutSeconds, + action = () => { + masterProxy ! RegisterWorker(workerId) + }, + onTimeout = () => { + LOG.error(s"Failed to register the worker $workerId after retrying for $timeOutSeconds " + + s"seconds, abort and kill the worker...") + self ! PoisonPill + }) + } + + def terminationWatch(master: ActorRef): Receive = { case Terminated(actor) => if (actor.compareTo(master) == 0) { - // parent is down, let's make suicide + // Parent master is down, no point to keep worker anymore. Let's make suicide to free + // resources LOG.info(s"Master cannot be contacted, find a new master ...") - context.become(waitForMasterConfirm(repeatActionUtil(30)(masterProxy ! RegisterWorker(id)))) + context.become(waitForMasterConfirm(retryRegisterWorker(id, timeOutSeconds = 30))) } else if (ActorUtil.isChildActorPath(self, actor)) { - //one executor is down, + // One executor is down, LOG.info(s"Executor is down ${getExecutorName(actor)}") val allocated = allocatedResources.get(actor) @@ -248,68 +269,81 @@ private[cluster] class Worker(masterProxy : ActorRef) extends Actor with TimeOut resource = resource + allocated.get executorsInfo -= actor allocatedResources = allocatedResources - actor - sendMsgWithTimeOutCallBack(master, ResourceUpdate(self, id, resource), resourceUpdateTimeoutMs, updateResourceTimeOut()) + sendMsgWithTimeOutCallBack(master, ResourceUpdate(self, id, resource), + resourceUpdateTimeoutMs, updateResourceTimeOut()) } } } - private def getExecutorName(actorRef: ActorRef): Option[String] = { + private def getExecutorName(actorRef: ActorRef): Option[String] = { executorNameToActor.find(_._2 == actorRef).map(_._1) } private def getExecutorProcLauncher(): ExecutorProcessLauncher = { - val launcherClazz = Class.forName(systemConfig.getString(Constants.GEARPUMP_EXECUTOR_PROCESS_LAUNCHER)) - launcherClazz.getConstructor(classOf[Config]).newInstance(systemConfig).asInstanceOf[ExecutorProcessLauncher] + val launcherClazz = Class.forName( + systemConfig.getString(GEARPUMP_EXECUTOR_PROCESS_LAUNCHER)) + launcherClazz.getConstructor(classOf[Config]).newInstance(systemConfig) + .asInstanceOf[ExecutorProcessLauncher] } import context.dispatcher - override def preStart() : Unit = { + override def preStart(): Unit = { LOG.info(s"RegisterNewWorker") - totalSlots = systemConfig.getInt(Constants.GEARPUMP_WORKER_SLOTS) + totalSlots = systemConfig.getInt(GEARPUMP_WORKER_SLOTS) this.resource = Resource(totalSlots) masterProxy ! RegisterNewWorker - context.become(waitForMasterConfirm(repeatActionUtil(30)(Unit))) + context.become(waitForMasterConfirm(registerTimeoutTicker(seconds = 30))) } - private def repeatActionUtil(seconds: Int)(action : => Unit) : Cancellable = { - val cancelSend = context.system.scheduler.schedule(Duration.Zero, Duration(2, TimeUnit.SECONDS))(action) - val cancelSuicide = context.system.scheduler.scheduleOnce(FiniteDuration(seconds, TimeUnit.SECONDS), self, PoisonPill) - return new Cancellable { + private def registerTimeoutTicker(seconds: Int): Cancellable = { + repeatActionUtil(seconds, () => Unit, () => { + LOG.error(s"Failed to register new worker to Master after waiting for $seconds seconds, " + + s"abort and kill the worker...") + self ! PoisonPill + }) + } + + private def repeatActionUtil(seconds: Int, action: () => Unit, onTimeout: () => Unit) + : Cancellable = { + val cancelTimeout = context.system.scheduler.schedule(Duration.Zero, + Duration(2, TimeUnit.SECONDS))(action()) + val cancelSuicide = context.system.scheduler.scheduleOnce(seconds.seconds)(onTimeout()) + new Cancellable { def cancel(): Boolean = { - val result1 = cancelSend.cancel() + val result1 = cancelTimeout.cancel() val result2 = cancelSuicide.cancel() result1 && result2 } def isCancelled: Boolean = { - cancelSend.isCancelled && cancelSuicide.isCancelled + cancelTimeout.isCancelled && cancelSuicide.isCancelled } } } - override def postStop : Unit = { + override def postStop(): Unit = { LOG.info(s"Worker is going down....") ioPool.shutdown() - context.system.shutdown() + context.system.terminate() } } private[cluster] object Worker { - case class ExecutorResult(result : Try[Int]) + case class ExecutorResult(result: Try[Int]) class ExecutorWatcher( - launch: LaunchExecutor, - masterInfo: MasterInfo, - ioPool: ExecutionContext, - jarStoreService: JarStoreService, - procLauncher: ExecutorProcessLauncher) extends Actor { + launch: LaunchExecutor, + masterInfo: MasterInfo, + ioPool: ExecutionContext, + jarStoreService: JarStoreService, + procLauncher: ExecutorProcessLauncher) extends Actor { import launch.{appId, executorId, resource} val executorConfig: Config = { val workerConfig = context.system.settings.config - val submissionConfig = Option(launch.executorJvmConfig).flatMap{ jvmConfig => + val submissionConfig = Option(launch.executorJvmConfig).flatMap { jvmConfig => Option(jvmConfig.executorAkkaConfig) }.getOrElse(ConfigFactory.empty()) @@ -319,15 +353,15 @@ private[cluster] object Worker { // For some config, worker has priority, for others, user Application submission config // have priorities. private def resolveExecutorConfig(workerConfig: Config, submissionConfig: Config): Config = { - val config = submissionConfig.withoutPath(Constants.GEARPUMP_HOSTNAME) - .withoutPath(Constants.GEARPUMP_CLUSTER_MASTERS) - .withoutPath(Constants.GEARPUMP_HOME) - .withoutPath(Constants.GEARPUMP_LOG_DAEMON_DIR) + val config = submissionConfig.withoutPath(GEARPUMP_HOSTNAME) + .withoutPath(GEARPUMP_CLUSTER_MASTERS) + .withoutPath(GEARPUMP_HOME) + .withoutPath(GEARPUMP_LOG_DAEMON_DIR) .withoutPath(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS) - // fall back to workerConfig + // Falls back to workerConfig .withFallback(workerConfig) - // we should exclude reference.conf, and JVM properties.. + // Excludes reference.conf, and JVM properties.. ClusterConfig.filterOutDefaultConfig(config) } @@ -343,10 +377,10 @@ private[cluster] object Worker { val exitPromise = Promise[Int]() val app = context.actorOf(Props(new InJvmExecutor(launch, exitPromise))) - override def destroy = { + override def destroy(): Unit = { context.stop(app) } - override def exitValue : Future[Int] = { + override def exitValue: Future[Int] = { exitPromise.future } } @@ -376,31 +410,30 @@ private[cluster] object Worker { ctx.classPath.map(path => expandEnviroment(path)) ++ jarPath.map(Array(_)).getOrElse(Array.empty[String]) - - val appLogDir = executorConfig.getString(Constants.GEARPUMP_LOG_APPLICATION_DIR) + val appLogDir = executorConfig.getString(GEARPUMP_LOG_APPLICATION_DIR) val logArgs = List( - s"-D${Constants.GEARPUMP_APPLICATION_ID}=${launch.appId}", - s"-D${Constants.GEARPUMP_EXECUTOR_ID}=${launch.executorId}", - s"-D${Constants.GEARPUMP_MASTER_STARTTIME}=${getFormatedTime(masterInfo.startTime)}", - s"-D${Constants.GEARPUMP_LOG_APPLICATION_DIR}=${appLogDir}") - val configArgs =List(s"-D${Constants.GEARPUMP_CUSTOM_CONFIG_FILE}=$configFile") + s"-D${GEARPUMP_APPLICATION_ID}=${launch.appId}", + s"-D${GEARPUMP_EXECUTOR_ID}=${launch.executorId}", + s"-D${GEARPUMP_MASTER_STARTTIME}=${getFormatedTime(masterInfo.startTime)}", + s"-D${GEARPUMP_LOG_APPLICATION_DIR}=${appLogDir}") + val configArgs = List(s"-D${GEARPUMP_CUSTOM_CONFIG_FILE}=$configFile") - val username = List(s"-D${Constants.GEARPUMP_USERNAME}=${ctx.username}") + val username = List(s"-D${GEARPUMP_USERNAME}=${ctx.username}") - //remote debug executor process - val remoteDebugFlag = executorConfig.getBoolean(Constants.GEARPUMP_REMOTE_DEBUG_EXECUTOR_JVM) + // Remote debug executor process + val remoteDebugFlag = executorConfig.getBoolean(GEARPUMP_REMOTE_DEBUG_EXECUTOR_JVM) val remoteDebugConfig = if (remoteDebugFlag) { - val availablePort = Util.findFreePort.get + val availablePort = Util.findFreePort().get List( "-Xdebug", s"-Xrunjdwp:server=y,transport=dt_socket,address=${availablePort},suspend=n", - s"-D${Constants.GEARPUMP_REMOTE_DEBUG_PORT}=$availablePort" + s"-D${GEARPUMP_REMOTE_DEBUG_PORT}=$availablePort" ) } else { List.empty[String] } - val verboseGCFlag = executorConfig.getBoolean(Constants.GEARPUMP_VERBOSE_GC) + val verboseGCFlag = executorConfig.getBoolean(GEARPUMP_VERBOSE_GC) val verboseGCConfig = if (verboseGCFlag) { List( s"-Xloggc:${appLogDir}/gc-app${launch.appId}-executor-${launch.executorId}.log", @@ -431,7 +464,7 @@ private[cluster] object Worker { var destroyed = false - override def destroy: Unit = { + override def destroy(): Unit = { LOG.info(s"Destroy executor process ${ctx.mainClass}") if (!destroyed) { destroyed = true @@ -449,37 +482,38 @@ private[cluster] object Worker { if (exit == 0) { Future.successful(0) } else { - Future.failed[Int](new Exception(s"Executor exit with failure, exit value: $exit, error summary: ${info.process.logger.error}")) + Future.failed[Int](new Exception(s"Executor exit with failure, exit value: $exit, " + + s"error summary: ${info.process.logger.error}")) } } } } } - import Constants._ private def expandEnviroment(path: String): String = { - //TODO: extend this to support more environment. + // TODO: extend this to support more environment. path.replace(s"<${GEARPUMP_HOME}>", executorConfig.getString(GEARPUMP_HOME)) } - override def preStart: Unit = { - executorHandler.exitValue.onComplete{value => + override def preStart(): Unit = { + executorHandler.exitValue.onComplete { value => procLauncher.cleanProcess(appId, executorId) val result = ExecutorResult(value) self ! result } } - override def postStop: Unit = { - executorHandler.destroy + override def postStop(): Unit = { + executorHandler.destroy() } - //The folders are under ${GEARPUMP_HOME} - val daemonPathPattern = List("lib" + File.separator + "daemon", "lib" + File.separator + "yarn") + // The folders are under ${GEARPUMP_HOME} + val daemonPathPattern = List("lib" + File.separator + "daemon", "lib" + + File.separator + "yarn") override def receive: Receive = { - case ShutdownExecutor(appId, executorId, reason : String) => - executorHandler.destroy + case ShutdownExecutor(appId, executorId, reason: String) => + executorHandler.destroy() sender ! ShutdownExecutorSucceed(appId, executorId) context.stop(self) case ExecutorResult(executorResult) => @@ -506,29 +540,28 @@ private[cluster] object Worker { } trait ExecutorHandler { - def destroy : Unit - def exitValue : Future[Int] + def destroy(): Unit + def exitValue: Future[Int] } case class ProcessInfo(process: RichProcess, jarPath: Option[String], configFile: String) /** - * We will start the executor in the same JVM as worker. - * @param launch - * @param exit - */ - class InJvmExecutor(launch: LaunchExecutor, exit: Promise[Int]) extends Daemon(launch.executorJvmConfig.arguments(0), launch.executorJvmConfig.arguments(1)) { + * Starts the executor in the same JVM as worker. + */ + class InJvmExecutor(launch: LaunchExecutor, exit: Promise[Int]) + extends Daemon(launch.executorJvmConfig.arguments(0), launch.executorJvmConfig.arguments(1)) { private val exitCode = 0 override val supervisorStrategy = - OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) { + OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1.minute) { case ex: Throwable => LOG.error(s"system $name stopped ", ex) exit.failure(ex) Stop } - override def postStop : Unit = { + override def postStop(): Unit = { if (!exit.isCompleted) { exit.success(exitCode) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/main/scala/io/gearpump/jarstore/dfs/DFSJarStoreService.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/jarstore/dfs/DFSJarStoreService.scala b/daemon/src/main/scala/io/gearpump/jarstore/dfs/DFSJarStoreService.scala index 94430ce..305bdc1 100644 --- a/daemon/src/main/scala/io/gearpump/jarstore/dfs/DFSJarStoreService.scala +++ b/daemon/src/main/scala/io/gearpump/jarstore/dfs/DFSJarStoreService.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,16 +18,17 @@ package io.gearpump.jarstore.dfs import java.io.File -import akka.actor.{ActorSystem, ActorRefFactory} + +import akka.actor.ActorSystem import com.typesafe.config.Config -import org.apache.hadoop.fs.Path -import io.gearpump.jarstore.{FilePath, JarStoreService} -import io.gearpump.util.LogUtil import org.apache.hadoop.conf.Configuration -import io.gearpump.util.Constants +import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.permission.{FsAction, FsPermission} import org.slf4j.Logger +import io.gearpump.jarstore.{FilePath, JarStoreService} +import io.gearpump.util.{Constants, LogUtil} + /** * DFSJarStoreService store the uploaded jar on HDFS */ @@ -46,7 +47,8 @@ class DFSJarStoreService extends JarStoreService { } /** - * This function will copy the remote file to local file system, called from client side. + * This function will copy the remote file to local file system, called from client side. + * * @param localFile The destination of file path * @param remotePath The remote file path from JarStore */ @@ -60,6 +62,7 @@ class DFSJarStoreService extends JarStoreService { /** * This function will copy the local file to the remote JarStore, called from client side. + * * @param localFile The local file */ override def copyFromLocal(localFile: File): FilePath = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/daemon/src/main/scala/io/gearpump/jarstore/local/LocalJarStore.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/jarstore/local/LocalJarStore.scala b/daemon/src/main/scala/io/gearpump/jarstore/local/LocalJarStore.scala index ec2104b..fa1a240 100644 --- a/daemon/src/main/scala/io/gearpump/jarstore/local/LocalJarStore.scala +++ b/daemon/src/main/scala/io/gearpump/jarstore/local/LocalJarStore.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -20,19 +20,17 @@ package io.gearpump.jarstore.local import java.io.File -import akka.actor.{Actor, Props, Stash} -import akka.pattern.{ask, pipe} -import io.gearpump.cluster.ClientToMaster.GetJarStoreServer -import io.gearpump.util._ -import io.gearpump.cluster.ClientToMaster.{JarStoreServerAddress, GetJarStoreServer} +import akka.actor.{Actor, Stash} +import akka.pattern.pipe import org.slf4j.Logger -import scala.concurrent.Future +import io.gearpump.cluster.ClientToMaster.{GetJarStoreServer, JarStoreServerAddress} +import io.gearpump.util._ /** * LocalJarStore store the uploaded jar on local disk. */ -class LocalJarStore(rootDirPath : String) extends Actor with Stash { +class LocalJarStore(rootDirPath: String) extends Actor with Stash { private val LOG: Logger = LogUtil.getLogger(getClass) val host = context.system.settings.config.getString(Constants.GEARPUMP_HOSTNAME) @@ -47,7 +45,7 @@ class LocalJarStore(rootDirPath : String) extends Actor with Stash { server.start pipeTo self - def receive : Receive = { + def receive: Receive = { case FileServer.Port(port) => context.become(listen(port)) unstashAll() @@ -55,7 +53,7 @@ class LocalJarStore(rootDirPath : String) extends Actor with Stash { stash() } - def listen(port : Int) : Receive = { + def listen(port: Int): Receive = { case GetJarStoreServer => sender ! JarStoreServerAddress(s"http://$host:$port/") }
