http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/AppDescription.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/AppDescription.scala b/core/src/main/scala/io/gearpump/cluster/AppDescription.scala index e8af854..799c20a 100644 --- a/core/src/main/scala/io/gearpump/cluster/AppDescription.scala +++ b/core/src/main/scala/io/gearpump/cluster/AppDescription.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,44 +18,64 @@ package io.gearpump.cluster +import scala.reflect.ClassTag + import akka.actor.{Actor, ActorRef, ActorSystem} import com.typesafe.config.{Config, ConfigFactory} + import io.gearpump.cluster.appmaster.WorkerInfo import io.gearpump.cluster.scheduler.Resource import io.gearpump.jarstore.FilePath -import io.gearpump.util.Util - -import scala.reflect.ClassTag /** * This contains all information to run an application * - * @param name: The name of this application - * @param appMaster: The class name of AppMaster Actor - * @param userConfig: user configuration. - * @param clusterConfig: The user provided cluster config, it will override gear.conf when starting - * new applications. In most cases, you wouldnot need to change it. If you do need to change it, - * use ClusterConfigSource(filePath) to construct the object, while filePath points to the .conf file. + * @param name The name of this application + * @param appMaster The class name of AppMaster Actor + * @param userConfig user configuration. + * @param clusterConfig User provided cluster config, it overrides gear.conf when starting + * new applications. In most cases, you should not need to change it. If you do + * really need to change it, please use ClusterConfigSource(filePath) to + * construct the object, while filePath points to the .conf file. */ +case class AppDescription( + name: String, appMaster: String, userConfig: UserConfig, + clusterConfig: Config = ConfigFactory.empty()) -case class AppDescription(name : String, appMaster : String, userConfig: UserConfig, clusterConfig: Config = ConfigFactory.empty()) - +/** + * Each job, streaming or not streaming, need to provide an Application class. + * The master uses this class to start AppMaster. + */ trait Application { + + /** Name of this application, must be unique in the system */ def name: String + + /** Custom user configuration */ def userConfig(implicit system: ActorSystem): UserConfig + + /** + * AppMaster class, must have a constructor like this: + * this(appContext: AppMasterContext, app: AppDescription) + */ def appMaster: Class[_ <: ApplicationMaster] } object Application { - def apply[T <: ApplicationMaster](name: String, userConfig: UserConfig)(implicit tag: ClassTag[T]): Application = { - new DefaultApplication(name, userConfig, tag.runtimeClass.asInstanceOf[Class[_ <: ApplicationMaster]]) + def apply[T <: ApplicationMaster]( + name: String, userConfig: UserConfig)(implicit tag: ClassTag[T]): Application = { + new DefaultApplication(name, userConfig, + tag.runtimeClass.asInstanceOf[Class[_ <: ApplicationMaster]]) } - class DefaultApplication(override val name: String, inputUserConfig: UserConfig, val appMaster: Class[_ <: ApplicationMaster]) extends Application { + class DefaultApplication( + override val name: String, inputUserConfig: UserConfig, + val appMaster: Class[_ <: ApplicationMaster]) extends Application { override def userConfig(implicit system: ActorSystem): UserConfig = inputUserConfig } - def ApplicationToAppDescription(app: Application)(implicit system: ActorSystem): AppDescription = { + def ApplicationToAppDescription(app: Application)(implicit system: ActorSystem) + : AppDescription = { val filterJvmReservedKeys = ClusterConfig.filterOutDefaultConfig(system.settings.config) AppDescription(app.name, app.appMaster.getName, app.userConfig, filterJvmReservedKeys) } @@ -69,57 +89,57 @@ abstract class ApplicationMaster extends Actor /** * This contains context information when starting an AppMaster * - * @param appId: application instance id assigned, it is unique in the cluster - * @param username: The username who submitted this application - * @param resource: Resouce allocated to start this AppMaster daemon. AppMaster are allowed to - * request more resource from Master. - * @param appJar: application Jar. If the jar is already in classpath, then it can be None. - * @param masterProxy: The proxy to master actor, it will bridge the messages between appmaster and master - * @param registerData: The AppMaster are required to register this data back to Master by RegisterAppMaster - * + * @param appId application instance id assigned, it is unique in the cluster + * @param username The username who submitted this application + * @param resource Resouce allocated to start this AppMaster daemon. AppMaster are allowed to + * request more resource from Master. + * @param appJar application Jar. If the jar is already in classpath, then it can be None. + * @param masterProxy The proxy to master actor, it bridges the messages between appmaster + * and master + * @param registerData AppMaster are required to send this data to Master by when doing + * RegisterAppMaster. */ case class AppMasterContext( - appId : Int, - username : String, - resource : Resource, + appId: Int, + username: String, + resource: Resource, workerInfo: WorkerInfo, - appJar : Option[AppJar], - masterProxy : ActorRef, - registerData : AppMasterRegisterData) + appJar: Option[AppJar], + masterProxy: ActorRef, + registerData: AppMasterRegisterData) /** * Jar file container in the cluster * - * @param name: A meaningful name to represent this jar - * @param filePath: Where the jar file is stored. + * @param name A meaningful name to represent this jar + * @param filePath Where the jar file is stored. */ case class AppJar(name: String, filePath: FilePath) - /** - * TODO: ExecutorContext doesn't belong here. - * Need to move to other places + * Serves as the context to start an Executor JVM. */ -case class ExecutorContext(executorId : Int, worker: WorkerInfo, appId : Int, appName: String, - appMaster : ActorRef, resource : Resource) - +// TODO: ExecutorContext doesn't belong to this package in logic. +case class ExecutorContext( + executorId: Int, worker: WorkerInfo, appId: Int, appName: String, + appMaster: ActorRef, resource: Resource) /** - * TODO: ExecutorJVMConfig doesn't belong here. - * Need to move to other places - */ -/** - * @param classPath: When a worker create a executor, the parent worker's classpath will - * be automatically inherited, the application jar will also be added to runtime - * classpath automatically. Sometimes, you still want to add some extraclasspath, - * you can do this by specify classPath option. + * JVM configurations to start an Executor JVM. + * + * @param classPath When executor is created by a worker JVM, executor automatically inherits + * parent worker's classpath. Sometimes, you still want to add some extra + * classpath, you can do this by specify classPath option. * @param jvmArguments java arguments like -Dxx=yy * @param mainClass Executor main class name like io.gearpump.xx.AppMaster * @param arguments Executor command line arguments * @param jar application jar - * @param executorAkkaConfig Akka config used to initialize the actor system of this executor. It will - * use io.gearpump.util.Constants.GEARPUMP_CUSTOM_CONFIG_FILE to pass the config to executor - * process - * + * @param executorAkkaConfig Akka config used to initialize the actor system of this executor. + * It uses io.gearpump.util.Constants.GEARPUMP_CUSTOM_CONFIG_FILE + * to pass the config to executor process */ -case class ExecutorJVMConfig(classPath : Array[String], jvmArguments : Array[String], mainClass : String, arguments : Array[String], jar: Option[AppJar], username : String, executorAkkaConfig: Config = ConfigFactory.empty()) \ No newline at end of file +// TODO: ExecutorContext doesn't belong to this package in logic. +case class ExecutorJVMConfig( + classPath: Array[String], jvmArguments: Array[String], mainClass: String, + arguments: Array[String], jar: Option[AppJar], username: String, + executorAkkaConfig: Config = ConfigFactory.empty()) \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/ClusterConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/ClusterConfig.scala b/core/src/main/scala/io/gearpump/cluster/ClusterConfig.scala index 64d5c42..7bae6d6 100644 --- a/core/src/main/scala/io/gearpump/cluster/ClusterConfig.scala +++ b/core/src/main/scala/io/gearpump/cluster/ClusterConfig.scala @@ -19,24 +19,27 @@ package io.gearpump.cluster import java.io.File + import com.typesafe.config._ + import io.gearpump.util.Constants._ -import io.gearpump.util.{Util, FileUtils, Constants, LogUtil} -import scala.collection.JavaConversions._ +import io.gearpump.util.{Constants, FileUtils, LogUtil, Util} /** * * All Gearpump application should use this class to load configurations. * - * Compared with Akka built-in [[ConfigFactory]], this class will also - * resolve file gear.conf and geardefault.conf. + * Compared with Akka built-in com.typesafe.config.ConfigFactory, this class also + * resolve config from file gear.conf and geardefault.conf. * * Overriding order: + * {{{ * System Properties * > Custom configuration file (by using system property -Dgearpump.config.file) > * > gear.conf * > geardefault.conf * > reference.conf + * }}} */ object ClusterConfig { @@ -81,11 +84,10 @@ object ClusterConfig { load(configFile).ui } - /** * try to load system property gearpump.config.file, or use configFile */ - private def load(configFile: String) : Configs = { + private def load(configFile: String): Configs = { val file = Option(System.getProperty(GEARPUMP_CUSTOM_CONFIG_FILE)) file match { case Some(path) => @@ -100,7 +102,7 @@ object ClusterConfig { val APPLICATION = "application.conf" val LOG = LogUtil.getLogger(getClass) - def saveConfig(conf : Config, file : File) : Unit = { + def saveConfig(conf: Config, file: File): Unit = { val serialized = conf.root().render() FileUtils.write(file, serialized) } @@ -113,13 +115,13 @@ object ClusterConfig { } } - // filter JVM reserved keys and akka default reference.conf + /** filter JVM reserved keys and akka default reference.conf */ def filterOutDefaultConfig(input: Config): Config = { val updated = filterOutJvmReservedKeys(input) Util.filterOutOrigin(updated, "reference.conf") } - private[gearpump] def load(source: ClusterConfigSource) : Configs = { + private[gearpump] def load(source: ClusterConfigSource): Configs = { val systemProperties = getSystemProperties @@ -162,8 +164,8 @@ object ClusterConfig { ) private def getSystemProperties: Config = { - // exclude default java system properties - JVM_RESERVED_PROPERTIES.foldLeft(ConfigFactory.systemProperties()) {(config, property) => + // Excludes default java system properties + JVM_RESERVED_PROPERTIES.foldLeft(ConfigFactory.systemProperties()) { (config, property) => config.withoutPath(property) } } @@ -177,5 +179,6 @@ object ClusterConfig { filterJvmReservedKeys } - protected class Configs (val master: Config, val worker: Config, val ui: Config, val default: Config) + protected class Configs( + val master: Config, val worker: Config, val ui: Config, val default: Config) } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/ClusterConfigSource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/ClusterConfigSource.scala b/core/src/main/scala/io/gearpump/cluster/ClusterConfigSource.scala index 7e8a91d..3a248d7 100644 --- a/core/src/main/scala/io/gearpump/cluster/ClusterConfigSource.scala +++ b/core/src/main/scala/io/gearpump/cluster/ClusterConfigSource.scala @@ -19,9 +19,10 @@ package io.gearpump.cluster import java.io.File -import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions} import scala.language.implicitConversions +import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions} + /** * Data Source of ClusterConfig * http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/ClusterMessage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/ClusterMessage.scala b/core/src/main/scala/io/gearpump/cluster/ClusterMessage.scala index aac45ab..5a42ea3 100644 --- a/core/src/main/scala/io/gearpump/cluster/ClusterMessage.scala +++ b/core/src/main/scala/io/gearpump/cluster/ClusterMessage.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,26 +18,25 @@ package io.gearpump.cluster +import scala.util.Try + import akka.actor.ActorRef import com.typesafe.config.Config -import io.gearpump.{WorkerId, TimeStamp} + +import io.gearpump.TimeStamp import io.gearpump.cluster.MasterToAppMaster.AppMasterStatus import io.gearpump.cluster.master.MasterSummary import io.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest} -import io.gearpump.cluster.worker.WorkerSummary +import io.gearpump.cluster.worker.{WorkerId, WorkerSummary} import io.gearpump.metrics.Metrics.MetricType -import scala.util.Try - -/** - * Application Flow - */ - object ClientToMaster { case object AddMaster case class AddWorker(count: Int) case class RemoveMaster(masterContainerId: String) case class RemoveWorker(workerContainerId: String) + + /** Command result of AddMaster, RemoveMaster, and etc... */ case class CommandResult(success: Boolean, exception: String = null) { override def toString: String = { val tag = getClass.getSimpleName @@ -48,50 +47,89 @@ object ClientToMaster { } } } - case class SubmitApplication(appDescription: AppDescription, appJar: Option[AppJar], username : String = System.getProperty("user.name")) + + /** Submit an application to master */ + case class SubmitApplication( + appDescription: AppDescription, appJar: Option[AppJar], + username: String = System.getProperty("user.name")) + case class RestartApplication(appId: Int) case class ShutdownApplication(appId: Int) + + /** Client send ResolveAppId to Master to resolves AppMaster actor path by providing appId */ case class ResolveAppId(appId: Int) + /** Client send ResolveWorkerId to master to get the Actor path of worker. */ case class ResolveWorkerId(workerId: WorkerId) + /** Get an active Jar store to upload job jars, like wordcount.jar */ case object GetJarStoreServer + /** Service address of JarStore */ case class JarStoreServerAddress(url: String) + /** Query AppMaster config by providing appId */ case class QueryAppMasterConfig(appId: Int) + /** Query worker config */ case class QueryWorkerConfig(workerId: WorkerId) + /** Query master config */ case object QueryMasterConfig + /** Options for read the metrics from the cluster */ object ReadOption { type ReadOption = String val Key: String = "readOption" + /** Read the latest record of the metrics, only return 1 record for one metric name (id) */ val ReadLatest: ReadOption = "readLatest" + /** Read recent metrics from cluster, typically it contains metrics in 5 minutes */ val ReadRecent = "readRecent" + /** + * Read the history metrics, typically it contains metrics for 48 hours + * + * NOTE: Each hour only contain one or two data points. + */ val ReadHistory = "readHistory" } + /** Query history metrics from master or app master. */ + case class QueryHistoryMetrics( + path: String, readOption: ReadOption.ReadOption = ReadOption.ReadLatest, + aggregatorClazz: String = "", options: Map[String, String] = Map.empty[String, String]) - case class QueryHistoryMetrics(path: String, readOption: ReadOption.ReadOption = ReadOption.ReadLatest, aggregatorClazz: String = "", options: Map[String, String] = Map.empty[String, String]) - + /** + * If there are message loss, the clock would pause for a while. This message is used to + * pin-point which task has stalling clock value, and usually it means something wrong on + * that machine. + */ case class GetStallingTasks(appId: Int) + /** + * Request app master for a short list of cluster app that administrators should be aware of. + */ case class GetLastFailure(appId: Int) } object MasterToClient { - case class SubmitApplicationResult(appId : Try[Int]) + + /** Result of SubmitApplication */ + // TODO: Merge with SubmitApplicationResultValue and change this to (appId: Option, ex: Exception) + case class SubmitApplicationResult(appId: Try[Int]) + case class SubmitApplicationResultValue(appId: Int) - case class ShutdownApplicationResult(appId : Try[Int]) + + case class ShutdownApplicationResult(appId: Try[Int]) case class ReplayApplicationResult(appId: Try[Int]) + + /** Return Actor ref of app master */ case class ResolveAppIdResult(appMaster: Try[ActorRef]) + /** Return Actor ref of worker */ case class ResolveWorkerIdResult(worker: Try[ActorRef]) case class AppMasterConfig(config: Config) @@ -102,25 +140,64 @@ object MasterToClient { case class HistoryMetricsItem(time: TimeStamp, value: MetricType) + /** + * History metrics returned from master, worker, or app master. + * + * All metric items are organized like a tree, path is used to navigate through the tree. + * For example, when querying with path == "executor0.task1.throughput*", the metrics + * provider picks metrics whose source matches the path. + * + * @param path The path client provided. The returned metrics are the result query of this path. + * @param metrics The detailed metrics. + */ case class HistoryMetrics(path: String, metrics: List[HistoryMetricsItem]) + /** Return the last error of this streaming application job */ case class LastFailure(time: TimeStamp, error: String) } trait AppMasterRegisterData object AppMasterToMaster { - case class RegisterAppMaster(appMaster: ActorRef, registerData : AppMasterRegisterData) + + /** + * Register an AppMaster by providing a ActorRef, and registerData + * @param registerData The registerData is provided by Master when starting the app master. + * App master should return the registerData back to master. + * Typically registerData hold some context information for this app Master. + */ + + case class RegisterAppMaster(appMaster: ActorRef, registerData: AppMasterRegisterData) + case class InvalidAppMaster(appId: Int, appMaster: String, reason: Throwable) + case class RequestResource(appId: Int, request: ResourceRequest) + /** + * Each application job can save some data in the distributed cluster storage on master nodes. + * + * @param appId App Id of the client application who send the request. + * @param key Key name + * @param value Value to store on distributed cluster storage on master nodes + */ case class SaveAppData(appId: Int, key: String, value: Any) + + /** The application specific data is successfully stored */ case object AppDataSaved + + /** Fail to store the application data */ case object SaveAppDataFailed + /** Fetch the application specific data that stored previously */ case class GetAppData(appId: Int, key: String) + + /** The KV data returned for query GetAppData */ case class GetAppDataResult(key: String, value: Any) + /** + * AppMasterSummary returned to REST API query. Streaming and Non-streaming + * have very different application info. AppMasterSummary is the common interface. + */ trait AppMasterSummary { def appType: String def appId: Int @@ -132,37 +209,43 @@ object AppMasterToMaster { def user: String } + /** Represents a generic application that is not a streaming job */ case class GeneralAppMasterSummary( - appId: Int, - appType: String = "general", - appName: String = null, - actorPath: String = null, - status: AppMasterStatus = MasterToAppMaster.AppMasterActive, - startTime: TimeStamp = 0L, - uptime: TimeStamp = 0L, - user: String = null) + appId: Int, + appType: String = "general", + appName: String = null, + actorPath: String = null, + status: AppMasterStatus = MasterToAppMaster.AppMasterActive, + startTime: TimeStamp = 0L, + uptime: TimeStamp = 0L, + user: String = null) extends AppMasterSummary + /** Fetches the list of workers from Master */ case object GetAllWorkers + + /** Get worker data of workerId */ case class GetWorkerData(workerId: WorkerId) + + /** Response to GetWorkerData */ case class WorkerData(workerDescription: WorkerSummary) + /** Get Master data */ case object GetMasterData + + /** Response to GetMasterData */ case class MasterData(masterDescription: MasterSummary) } object MasterToAppMaster { - case class ResourceAllocated(allocations: Array[ResourceAllocation]){ - override def equals(other: Any): Boolean = { - other match { - case that: ResourceAllocated => - allocations.sortBy(_.workerId).sameElements(that.allocations.sortBy(_.workerId)) - case _ => - false - } - } - } + + /** Resource allocated for application xx */ + case class ResourceAllocated(allocations: Array[ResourceAllocation]) + + /** Master confirm reception of RegisterAppMaster message */ case class AppMasterRegistered(appId: Int) + + /** Shutdown the application job */ case object ShutdownAppMaster type AppMasterStatus = String @@ -171,7 +254,11 @@ object MasterToAppMaster { val AppMasterNonExist: AppMasterStatus = "nonexist" sealed trait StreamingType - case class AppMasterData(status: AppMasterStatus, appId: Int = 0, appName: String = null, appMasterPath: String = null, workerPath: String = null, submissionTime: TimeStamp = 0, startTime: TimeStamp = 0, finishTime: TimeStamp = 0, user: String = null) + case class AppMasterData( + status: AppMasterStatus, appId: Int = 0, appName: String = null, appMasterPath: String = null, + workerPath: String = null, submissionTime: TimeStamp = 0, startTime: TimeStamp = 0, + finishTime: TimeStamp = 0, user: String = null) + case class AppMasterDataRequest(appId: Int, detail: Boolean = false) case class AppMastersData(appMasters: List[AppMasterData]) @@ -185,8 +272,10 @@ object MasterToAppMaster { } object AppMasterToWorker { - case class LaunchExecutor(appId: Int, executorId: Int, resource: Resource, executorJvmConfig: ExecutorJVMConfig) - case class ShutdownExecutor(appId : Int, executorId : Int, reason : String) + case class LaunchExecutor( + appId: Int, executorId: Int, resource: Resource, executorJvmConfig: ExecutorJVMConfig) + + case class ShutdownExecutor(appId: Int, executorId: Int, reason: String) case class ChangeExecutorResource(appId: Int, executorId: Int, resource: Resource) } @@ -196,4 +285,3 @@ object WorkerToAppMaster { case class ShutdownExecutorFailed(reason: String = null, ex: Throwable = null) } - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/UserConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/UserConfig.scala b/core/src/main/scala/io/gearpump/cluster/UserConfig.scala index 0756d54..61de1dd 100644 --- a/core/src/main/scala/io/gearpump/cluster/UserConfig.scala +++ b/core/src/main/scala/io/gearpump/cluster/UserConfig.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,14 +18,15 @@ package io.gearpump.cluster -import akka.actor.{ExtendedActorSystem, ActorSystem} +import akka.actor.{ActorSystem, ExtendedActorSystem} import akka.serialization.JavaSerializer + import io.gearpump.google.common.io.BaseEncoding /** * Immutable configuration */ -final class UserConfig(private val _config: Map[String, String]) extends Serializable{ +final class UserConfig(private val _config: Map[String, String]) extends Serializable { def withBoolean(key: String, value: Boolean): UserConfig = { new UserConfig(_config + (key -> value.toString)) @@ -39,7 +40,7 @@ final class UserConfig(private val _config: Map[String, String]) extends Serial new UserConfig(_config + (key -> value.toString)) } - def withInt(key: String, value: Int) : UserConfig = { + def withInt(key: String, value: Int): UserConfig = { new UserConfig(_config + (key -> value.toString)) } @@ -77,7 +78,7 @@ final class UserConfig(private val _config: Map[String, String]) extends Serial _config.get(key).map(_.toFloat) } - def getInt(key : String) : Option[Int] = { + def getInt(key: String): Option[Int] = { _config.get(key).map(_.toInt) } @@ -85,11 +86,11 @@ final class UserConfig(private val _config: Map[String, String]) extends Serial _config.get(key).map(_.toLong) } - def getString(key : String) : Option[String] = { + def getString(key: String): Option[String] = { _config.get(key) } - def getBytes(key: String) : Option[Array[Byte]] = { + def getBytes(key: String): Option[Array[Byte]] = { _config.get(key).map(BaseEncoding.base64().decode(_)) } @@ -101,31 +102,35 @@ final class UserConfig(private val _config: Map[String, String]) extends Serial } } + // scalastyle:off line.size.limit /** - * This will de-serialize value to object instance + * This de-serializes value to object instance * * To do de-serialization, this requires an implicit ActorSystem, as * the ActorRef and possibly other akka classes deserialization * requires an implicit ActorSystem. * - * @see [[http://doc.akka.io/docs/akka/snapshot/scala/serialization.html#A_Word_About_Java_Serialization]] + * See Link: + * http://doc.akka.io/docs/akka/snapshot/scala/serialization.html#A_Word_About_Java_Serialization */ - def getValue[T](key: String)(implicit system: ActorSystem): Option[T] = { + + def getValue[T](key: String)(implicit system: ActorSystem): Option[T] = { val serializer = new JavaSerializer(system.asInstanceOf[ExtendedActorSystem]) _config.get(key).map(BaseEncoding.base64().decode(_)) .map(serializer.fromBinary(_).asInstanceOf[T]) } /** - * This will serialize the object and store it as string. + * This serializes the object and store it as string. * * To do serialization, this requires an implicit ActorSystem, as * the ActorRef and possibly other akka classes serialization * requires an implicit ActorSystem. * - * @see [[http://doc.akka.io/docs/akka/snapshot/scala/serialization.html#A_Word_About_Java_Serialization]] + * See Link: + * http://doc.akka.io/docs/akka/snapshot/scala/serialization.html#A_Word_About_Java_Serialization */ - def withValue[T<: AnyRef](key: String, value: T)(implicit system: ActorSystem): UserConfig = { + def withValue[T <: AnyRef](key: String, value: T)(implicit system: ActorSystem): UserConfig = { if (null == value) { this @@ -136,8 +141,9 @@ final class UserConfig(private val _config: Map[String, String]) extends Serial this.withString(key, encoded) } } + // scalastyle:on line.size.limit - def withConfig(other: UserConfig) = { + def withConfig(other: UserConfig): UserConfig = { if (null == other) { this } else { @@ -146,11 +152,11 @@ final class UserConfig(private val _config: Map[String, String]) extends Serial } } -object UserConfig{ +object UserConfig { - def empty = new UserConfig(Map.empty[String, String]) + def empty: UserConfig = new UserConfig(Map.empty[String, String]) - def apply(config : Map[String, String]) = new UserConfig(config) + def apply(config: Map[String, String]): UserConfig = new UserConfig(config) def unapply(config: UserConfig): Option[Map[String, String]] = Option(config._config) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeEnvironment.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeEnvironment.scala b/core/src/main/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeEnvironment.scala index 1e34678..4e8582b 100644 --- a/core/src/main/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeEnvironment.scala +++ b/core/src/main/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeEnvironment.scala @@ -19,32 +19,34 @@ package io.gearpump.cluster.appmaster import akka.actor._ + import io.gearpump.cluster.AppMasterToMaster.RegisterAppMaster import io.gearpump.cluster.appmaster.AppMasterRuntimeEnvironment._ import io.gearpump.cluster.appmaster.ExecutorSystemScheduler.{Session, StartExecutorSystems} import io.gearpump.cluster.appmaster.MasterConnectionKeeper.MasterConnectionStatus._ import io.gearpump.cluster.master.MasterProxy -import io.gearpump.cluster.{AppMasterContext, AppDescription} +import io.gearpump.cluster.{AppDescription, AppMasterContext} import io.gearpump.util.LogUtil /** * This serves as runtime environment for AppMaster. - * When starting an AppMaster, we need to setup the connection to master, and prepare other environemnts. + * When starting an AppMaster, we need to setup the connection to master, + * and prepare other environments. * * This also extend the function of Master, by providing a scheduler service for Executor System. * AppMaster can ask Master for executor system directly. details like requesting resource, - * contacting worker to start a process, and then starting an executor system is hidden from AppMaster. + * contacting worker to start a process, and then starting an executor system is hidden from + * AppMaster. * * Please use AppMasterRuntimeEnvironment.props() to construct this actor. - * */ private[appmaster] -class AppMasterRuntimeEnvironment ( +class AppMasterRuntimeEnvironment( appContextInput: AppMasterContext, app: AppDescription, masters: Iterable[ActorPath], masterFactory: (AppId, MasterActorRef) => Props, - appMasterFactory: (AppMasterContext, AppDescription)=> Props, + appMasterFactory: (AppMasterContext, AppDescription) => Props, masterConnectionKeeperFactory: (MasterActorRef, RegisterAppMaster, ListenerActorRef) => Props) extends Actor { @@ -52,15 +54,18 @@ class AppMasterRuntimeEnvironment ( private val LOG = LogUtil.getLogger(getClass, app = appId) import scala.concurrent.duration._ - private val master = context.actorOf(masterFactory(appId, context.actorOf(Props(new MasterProxy(masters, 30 seconds))))) + + private val master = context.actorOf( + masterFactory(appId, context.actorOf(Props(new MasterProxy(masters, 30.seconds))))) private val appContext = appContextInput.copy(masterProxy = master) - //create appMaster proxy to receive command and forward to appmaster + // Create appMaster proxy to receive command and forward to appmaster private val appMaster = context.actorOf(appMasterFactory(appContext, app)) context.watch(appMaster) private val registerAppMaster = RegisterAppMaster(appMaster, appContext.registerData) - private val masterConnectionKeeper = context.actorOf(masterConnectionKeeperFactory(master, registerAppMaster, self)) + private val masterConnectionKeeper = context.actorOf( + masterConnectionKeeperFactory(master, registerAppMaster, self)) context.watch(masterConnectionKeeper) def receive: Receive = { @@ -72,20 +77,21 @@ class AppMasterRuntimeEnvironment ( context.stop(self) case Terminated(actor) => actor match { case `appMaster` => - LOG.error (s"AppMaster ${appId} is stopped, shutdown myself") - context.stop (self) + LOG.error(s"AppMaster ${appId} is stopped, shutdown myself") + context.stop(self) case `masterConnectionKeeper` => - LOG.error (s"Master connection keeper is stopped, appId: ${appId}, shutdown myself") - context.stop (self) - case _ => //skip + LOG.error(s"Master connection keeper is stopped, appId: ${appId}, shutdown myself") + context.stop(self) + case _ => // Skip } } } object AppMasterRuntimeEnvironment { - - def props(masters: Iterable[ActorPath], app : AppDescription, appContextInput: AppMasterContext): Props = { + def props( + masters: Iterable[ActorPath], app: AppDescription, appContextInput: AppMasterContext) + : Props = { val master = (appId: AppId, masterProxy: MasterActorRef) => MasterWithExecutorSystemProvider.props(appId, masterProxy) @@ -93,26 +99,25 @@ object AppMasterRuntimeEnvironment { val appMaster = (appContext: AppMasterContext, app: AppDescription) => LazyStartAppMaster.props(appContext, app) - val masterConnectionKeeper = - (master: MasterActorRef, registerAppMaster: RegisterAppMaster, listener: ListenerActorRef) => - Props(new MasterConnectionKeeper(registerAppMaster, master, masterStatusListener = listener)) + val masterConnectionKeeper = (master: MasterActorRef, registerAppMaster: + RegisterAppMaster, listener: ListenerActorRef) => Props(new MasterConnectionKeeper( + registerAppMaster, master, masterStatusListener = listener)) Props(new AppMasterRuntimeEnvironment( appContextInput, app, masters, master, appMaster, masterConnectionKeeper)) } /** - * This behavior as AppMaster, and will lazy start the real AppMaster. When real AppMaster - * is not started yet, all messages will be stashed. The stashed messages will be forwarded to - * real AppMaster when it is started. + * This behavior like a AppMaster. Under the hood, It start start the real AppMaster in a lazy + * way. When real AppMaster is not started yet, all messages are stashed. The stashed + * messages are forwarded to real AppMaster when the real AppMaster is started. * * Please use LazyStartAppMaster.props to construct this actor * - * @param appId * @param appMasterProps underlying AppMaster Props */ private[appmaster] - class LazyStartAppMaster (appId: Int, appMasterProps: Props) extends Actor with Stash { + class LazyStartAppMaster(appId: Int, appMasterProps: Props) extends Actor with Stash { private val LOG = LogUtil.getLogger(getClass, app = appId) @@ -151,14 +156,11 @@ object AppMasterRuntimeEnvironment { private[appmaster] case object StartAppMaster - /** * This enhance Master by providing new service: StartExecutorSystems * - * * Please use MasterWithExecutorSystemProvider.props to construct this actor + * Please use MasterWithExecutorSystemProvider.props to construct this actor * - * @param master - * @param executorSystemProviderProps */ private[appmaster] class MasterWithExecutorSystemProvider(master: ActorRef, executorSystemProviderProps: Props) @@ -168,7 +170,7 @@ object AppMasterRuntimeEnvironment { override def receive: Receive = { case request: StartExecutorSystems => - executorSystemProvider forward request + executorSystemProvider forward request case msg => master forward msg } @@ -181,13 +183,12 @@ object AppMasterRuntimeEnvironment { val executorSystemLauncher = (appId: Int, session: Session) => Props(new ExecutorSystemLauncher(appId, session)) - val scheduler = Props(new ExecutorSystemScheduler(appId, master, executorSystemLauncher)) + val scheduler = Props(new ExecutorSystemScheduler(appId, master, executorSystemLauncher)) Props(new MasterWithExecutorSystemProvider(master, scheduler)) } } - private[appmaster] type AppId = Int private[appmaster] type MasterActorRef = ActorRef private[appmaster] type ListenerActorRef = ActorRef http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeInfo.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeInfo.scala b/core/src/main/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeInfo.scala index 11414cf..5b3e0c5 100644 --- a/core/src/main/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeInfo.scala +++ b/core/src/main/scala/io/gearpump/cluster/appmaster/AppMasterRuntimeInfo.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,21 +15,24 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package io.gearpump.cluster.appmaster import akka.actor.ActorRef import com.typesafe.config.Config + import io.gearpump._ import io.gearpump.cluster.AppMasterRegisterData +/** Run time info used to start an AppMaster */ case class AppMasterRuntimeInfo( - appId: Int, - // appName is the unique Id for an application - appName: String, - worker : ActorRef = null, - user: String = null, - submissionTime: TimeStamp = 0, - startTime: TimeStamp = 0, - finishTime: TimeStamp = 0, - config: Config = null) + appId: Int, + // AppName is the unique Id for an application + appName: String, + worker: ActorRef = null, + user: String = null, + submissionTime: TimeStamp = 0, + startTime: TimeStamp = 0, + finishTime: TimeStamp = 0, + config: Config = null) extends AppMasterRegisterData http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/appmaster/ApplicationState.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/appmaster/ApplicationState.scala b/core/src/main/scala/io/gearpump/cluster/appmaster/ApplicationState.scala index 430c958..3b967f4 100644 --- a/core/src/main/scala/io/gearpump/cluster/appmaster/ApplicationState.scala +++ b/core/src/main/scala/io/gearpump/cluster/appmaster/ApplicationState.scala @@ -18,28 +18,30 @@ package io.gearpump.cluster.appmaster -import io.gearpump.cluster.{AppJar, AppDescription} +import io.gearpump.cluster.{AppDescription, AppJar} /** - * This state will be persisted across the masters. - */ -case class ApplicationState(appId : Int, appName: String, attemptId : Int, app : AppDescription, jar: Option[AppJar], username : String, state : Any) extends Serializable { + * This state for single application, it is be distributed across the masters. + */ +case class ApplicationState( + appId: Int, appName: String, attemptId: Int, app: AppDescription, jar: Option[AppJar], + username: String, state: Any) extends Serializable { - override def equals(other: Any): Boolean = { - other match { - case that: ApplicationState => - if (appId == that.appId && attemptId == that.attemptId) { - true - } else { - false - } - case _ => - false - } - } + override def equals(other: Any): Boolean = { + other match { + case that: ApplicationState => + if (appId == that.appId && attemptId == that.attemptId) { + true + } else { + false + } + case _ => + false + } + } - override def hashCode: Int = { - import akka.routing.MurmurHash._ - extendHash(appId, attemptId, startMagicA, startMagicB) - } - } + override def hashCode: Int = { + import akka.routing.MurmurHash._ + extendHash(appId, attemptId, startMagicA, startMagicB) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystem.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystem.scala b/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystem.scala index 400b61c..6fcb5e7 100644 --- a/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystem.scala +++ b/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystem.scala @@ -19,22 +19,25 @@ package io.gearpump.cluster.appmaster import akka.actor.{ActorRef, Address, PoisonPill} -import io.gearpump.WorkerId + import io.gearpump.cluster.scheduler.Resource +import io.gearpump.cluster.worker.WorkerId import io.gearpump.util.ActorSystemBooter.BindLifeCycle case class WorkerInfo(workerId: WorkerId, ref: ActorRef) /** - * This contains JVM configurations to start an executor system + * Configurations to start an executor system on remote machine + * + * @param address Remote address where we start an Actor System. */ case class ExecutorSystem(executorSystemId: Int, address: Address, daemon: -ActorRef, resource: Resource, worker: WorkerInfo) { + ActorRef, resource: Resource, worker: WorkerInfo) { def bindLifeCycleWith(actor: ActorRef): Unit = { daemon ! BindLifeCycle(actor) } - def shutdown: Unit = { + def shutdown(): Unit = { daemon ! PoisonPill } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncher.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncher.scala b/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncher.scala index 80af748..78432f4 100644 --- a/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncher.scala +++ b/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncher.scala @@ -18,41 +18,42 @@ package io.gearpump.cluster.appmaster +import scala.concurrent.duration._ + import akka.actor._ +import org.slf4j.Logger + import io.gearpump.cluster.AppMasterToWorker.LaunchExecutor import io.gearpump.cluster.ExecutorJVMConfig -import io.gearpump.cluster.scheduler.Resource -import io.gearpump.util.{Constants, ActorSystemBooter, ActorUtil, LogUtil} import io.gearpump.cluster.WorkerToAppMaster._ import io.gearpump.cluster.appmaster.ExecutorSystemLauncher._ import io.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSystemJvmConfig, Session} -import io.gearpump.util.ActorSystemBooter.{ActorSystemRegistered, BindLifeCycle, RegisterActorSystem} -import org.slf4j.Logger - -import scala.concurrent.duration._ +import io.gearpump.cluster.scheduler.Resource +import io.gearpump.util.ActorSystemBooter.{ActorSystemRegistered, RegisterActorSystem} +import io.gearpump.util.{ActorSystemBooter, ActorUtil, Constants, LogUtil} /** * This launches single executor system on target worker. * * Please use ExecutorSystemLauncher.props() to construct this actor * - * @param appId * @param session The session that request to launch executor system */ private[appmaster] -class ExecutorSystemLauncher (appId: Int, session: Session) extends Actor { +class ExecutorSystemLauncher(appId: Int, session: Session) extends Actor { private val LOG: Logger = LogUtil.getLogger(getClass) val scheduler = context.system.scheduler implicit val executionContext = context.dispatcher - val timeoutSetting = context.system.settings.config.getInt(Constants.GEARPUMP_START_EXECUTOR_SYSTEM_TIMEOUT_MS) + private val systemConfig = context.system.settings.config + val timeoutSetting = systemConfig.getInt(Constants.GEARPUMP_START_EXECUTOR_SYSTEM_TIMEOUT_MS) - val timeout = scheduler.scheduleOnce(timeoutSetting milliseconds, + val timeout = scheduler.scheduleOnce(timeoutSetting.milliseconds, self, LaunchExecutorSystemTimeout(session)) - def receive : Receive = waitForLaunchCommand + def receive: Receive = waitForLaunchCommand def waitForLaunchCommand: Receive = { case LaunchExecutorSystem(worker, executorSystemId, resource) => @@ -61,23 +62,27 @@ class ExecutorSystemLauncher (appId: Int, session: Session) extends Actor { .map(getExecutorJvmConfig(_, s"app${appId}system${executorSystemId}", launcherPath)).orNull val launch = LaunchExecutor(appId, executorSystemId, resource, jvmConfig) - LOG.info(s"Launching Executor ...appId: $appId, executorSystemId: $executorSystemId, slots: ${resource.slots} on worker $worker") + LOG.info(s"Launching Executor ...appId: $appId, executorSystemId: $executorSystemId, " + + s"slots: ${resource.slots} on worker $worker") worker.ref ! launch context.become(waitForActorSystemToStart(sender, launch, worker, executorSystemId)) } - def waitForActorSystemToStart(replyTo: ActorRef, launch: LaunchExecutor, worker: WorkerInfo, executorSystemId: Int) : Receive = { + def waitForActorSystemToStart( + replyTo: ActorRef, launch: LaunchExecutor, worker: WorkerInfo, executorSystemId: Int) + : Receive = { case RegisterActorSystem(systemPath) => import launch._ timeout.cancel() LOG.info(s"Received RegisterActorSystem $systemPath for session ${session.requestor}") sender ! ActorSystemRegistered(worker.ref) - val system = ExecutorSystem(executorId, AddressFromURIString(systemPath), sender, resource, worker) + val system = + ExecutorSystem(executorId, AddressFromURIString(systemPath), sender, resource, worker) replyTo ! LaunchExecutorSystemSuccess(system, session) context.stop(self) - case reject @ ExecutorLaunchRejected(reason, ex) => - LOG.error(s"Executor Launch ${launch.resource} failed reasonï¼$reason", ex) + case reject@ExecutorLaunchRejected(reason, ex) => + LOG.error(s"Executor Launch ${launch.resource} failed reason: $reason", ex) replyTo ! LaunchExecutorSystemRejected(launch.resource, reason, session) context.stop(self) case timeout: LaunchExecutorSystemTimeout => @@ -89,6 +94,7 @@ class ExecutorSystemLauncher (appId: Int, session: Session) extends Actor { private[appmaster] object ExecutorSystemLauncher { + case class LaunchExecutorSystem(worker: WorkerInfo, systemId: Int, resource: Resource) case class LaunchExecutorSystemSuccess(system: ExecutorSystem, session: Session) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala b/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala index aa980b5..c5ec600 100644 --- a/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala +++ b/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala @@ -18,29 +18,29 @@ package io.gearpump.cluster.appmaster +import scala.concurrent.duration._ + import akka.actor._ import com.typesafe.config.Config -import io.gearpump.WorkerId + import io.gearpump.cluster.AppMasterToMaster.RequestResource import io.gearpump.cluster.MasterToAppMaster.ResourceAllocated import io.gearpump.cluster._ import io.gearpump.cluster.appmaster.ExecutorSystemLauncher._ import io.gearpump.cluster.appmaster.ExecutorSystemScheduler._ import io.gearpump.cluster.scheduler.{ResourceAllocation, ResourceRequest} +import io.gearpump.cluster.worker.WorkerId import io.gearpump.util.{Constants, LogUtil} -import scala.concurrent.duration._ - /** * ExecutorSystem is also a type of resource, this class schedules ExecutorSystem for AppMaster. * AppMaster can use this class to directly request a live executor actor systems. The communication * in the background with Master and Worker is hidden from AppMaster. * * Please use ExecutorSystemScheduler.props() to construct this actor - * -*/ + */ private[appmaster] -class ExecutorSystemScheduler (appId: Int, masterProxy: ActorRef, +class ExecutorSystemScheduler(appId: Int, masterProxy: ActorRef, executorSystemLauncher: (Int, Session) => Props) extends Actor { private val LOG = LogUtil.getLogger(getClass, app = appId) @@ -50,18 +50,22 @@ class ExecutorSystemScheduler (appId: Int, masterProxy: ActorRef, var resourceAgents = Map.empty[Session, ActorRef] - def receive: Receive = clientCommands orElse resourceAllocationMessageHandler orElse executorSystemMessageHandler + def receive: Receive = { + clientCommands orElse resourceAllocationMessageHandler orElse executorSystemMessageHandler + } def clientCommands: Receive = { case start: StartExecutorSystems => - LOG.info(s"starting executor systems (ExecutorSystemConfig(${start.executorSystemConfig}), Resources(${start.resources.mkString(",")}))") + LOG.info(s"starting executor systems (ExecutorSystemConfig(${start.executorSystemConfig}), " + + s"Resources(${start.resources.mkString(",")}))") val requestor = sender() val executorSystemConfig = start.executorSystemConfig - val session = Session(requestor, executorSystemConfig) - val agent = resourceAgents.getOrElse(session, context.actorOf(Props(new ResourceAgent(masterProxy, session)))) + val session = Session(requestor, executorSystemConfig) + val agent = resourceAgents.getOrElse(session, + context.actorOf(Props(new ResourceAgent(masterProxy, session)))) resourceAgents = resourceAgents + (session -> agent) - start.resources.foreach {resource => + start.resources.foreach { resource => agent ! RequestResource(appId, resource) } @@ -87,14 +91,15 @@ class ExecutorSystemScheduler (appId: Int, masterProxy: ActorRef, } } - def executorSystemMessageHandler : Receive = { + def executorSystemMessageHandler: Receive = { case LaunchExecutorSystemSuccess(system, session) => if (isSessionAlive(session)) { LOG.info("LaunchExecutorSystemSuccess, send back to " + session.requestor) system.bindLifeCycleWith(self) session.requestor ! ExecutorSystemStarted(system, session.executorSystemJvmConfig.jar) } else { - LOG.error("We get a ExecutorSystem back, but resource requestor is no longer valid. Will shutdown the allocated system") + LOG.error("We get a ExecutorSystem back, but resource requestor is no longer valid. " + + "Will shutdown the allocated system") system.shutdown } case LaunchExecutorSystemTimeout(session) => @@ -105,8 +110,11 @@ class ExecutorSystemScheduler (appId: Int, masterProxy: ActorRef, case LaunchExecutorSystemRejected(resource, reason, session) => if (isSessionAlive(session)) { - LOG.error(s"Failed to launch executor system, due to $reason, will ask master to allocate new resources $resource") - resourceAgents.get(session).map(_ ! RequestResource(appId, ResourceRequest(resource, WorkerId.unspecified))) + LOG.error(s"Failed to launch executor system, due to $reason, " + + s"will ask master to allocate new resources $resource") + resourceAgents.get(session).map { resourceAgent: ActorRef => + resourceAgent ! RequestResource(appId, ResourceRequest(resource, WorkerId.unspecified)) + } } } @@ -117,27 +125,28 @@ class ExecutorSystemScheduler (appId: Int, masterProxy: ActorRef, object ExecutorSystemScheduler { - case class StartExecutorSystems(resources: Array[ResourceRequest], executorSystemConfig: ExecutorSystemJvmConfig) + case class StartExecutorSystems( + resources: Array[ResourceRequest], executorSystemConfig: ExecutorSystemJvmConfig) + case class ExecutorSystemStarted(system: ExecutorSystem, boundedJar: Option[AppJar]) + case class StopExecutorSystem(system: ExecutorSystem) - case object StartExecutorSystemTimeout - case class ExecutorSystemJvmConfig(classPath : Array[String], jvmArguments : Array[String], - jar: Option[AppJar], username : String, executorAkkaConfig: Config = null) + case object StartExecutorSystemTimeout + case class ExecutorSystemJvmConfig(classPath: Array[String], jvmArguments: Array[String], + jar: Option[AppJar], username: String, executorAkkaConfig: Config = null) /** * For each client which ask for an executor system, the scheduler will create a session for it. * - * @param requestor - * @param executorSystemJvmConfig */ - private [appmaster] + private[appmaster] case class Session(requestor: ActorRef, executorSystemJvmConfig: ExecutorSystemJvmConfig) /** * This is a agent for session to request resource - * @param master + * * @param session the original requester of the resource requests */ private[appmaster] @@ -145,17 +154,19 @@ object ExecutorSystemScheduler { private var resourceRequestor: ActorRef = null var timeOutClock: Cancellable = null private var unallocatedResource: Int = 0 + import context.dispatcher - import Constants._ - val timeout = context.system.settings.config.getInt(GEARPUMP_RESOURCE_ALLOCATION_TIMEOUT) + import io.gearpump.util.Constants._ + val timeout = context.system.settings.config.getInt(GEARPUMP_RESOURCE_ALLOCATION_TIMEOUT) def receive: Receive = { case request: RequestResource => unallocatedResource += request.request.resource.slots Option(timeOutClock).map(_.cancel) - timeOutClock = context.system.scheduler.scheduleOnce(timeout seconds, self, ResourceAllocationTimeOut(session)) + timeOutClock = context.system.scheduler.scheduleOnce( + timeout.seconds, self, ResourceAllocationTimeOut(session)) resourceRequestor = sender master ! request case ResourceAllocated(allocations) => @@ -164,7 +175,7 @@ object ExecutorSystemScheduler { case timeout: ResourceAllocationTimeOut => if (unallocatedResource > 0) { resourceRequestor ! ResourceAllocationTimeOut(session) - //we will not receive any ResourceAllocation after timeout + // We will not receive any ResourceAllocation after timeout context.stop(self) } } @@ -175,4 +186,5 @@ object ExecutorSystemScheduler { private[ExecutorSystemScheduler] case class ResourceAllocationTimeOut(session: Session) + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/appmaster/MasterConnectionKeeper.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/appmaster/MasterConnectionKeeper.scala b/core/src/main/scala/io/gearpump/cluster/appmaster/MasterConnectionKeeper.scala index 300c4ea..f8c8503 100644 --- a/core/src/main/scala/io/gearpump/cluster/appmaster/MasterConnectionKeeper.scala +++ b/core/src/main/scala/io/gearpump/cluster/appmaster/MasterConnectionKeeper.scala @@ -19,31 +19,27 @@ package io.gearpump.cluster.appmaster import java.util.concurrent.TimeUnit +import scala.concurrent.duration.FiniteDuration import akka.actor._ -import io.gearpump.cluster.master.MasterProxy.MasterRestarted + import io.gearpump.cluster.AppMasterToMaster.RegisterAppMaster import io.gearpump.cluster.MasterToAppMaster.AppMasterRegistered import io.gearpump.cluster.appmaster.MasterConnectionKeeper.AppMasterRegisterTimeout import io.gearpump.cluster.appmaster.MasterConnectionKeeper.MasterConnectionStatus.{MasterConnected, MasterStopped} -import io.gearpump.cluster.master.MasterProxy.WatchMaster +import io.gearpump.cluster.master.MasterProxy.{MasterRestarted, WatchMaster} import io.gearpump.util.LogUtil -import scala.concurrent.duration.FiniteDuration - /** - * This will watch the liveness of Master. - * When Master is restarted, it will send RegisterAppMaster to the new Master instance. - * If Master is stopped, it will send the MasterConnectionStatus to listener + * Watches the liveness of Master. * - * please use MasterConnectionKeeper.props() to construct this actor + * When Master is restarted, it sends RegisterAppMaster to the new Master instance. + * If Master is stopped, it sends the MasterConnectionStatus to listener * - * @param register - * @param masterProxy - * @param masterStatusListener + * please use MasterConnectionKeeper.props() to construct this actor */ private[appmaster] -class MasterConnectionKeeper ( +class MasterConnectionKeeper( register: RegisterAppMaster, masterProxy: ActorRef, masterStatusListener: ActorRef) extends Actor { @@ -52,12 +48,13 @@ class MasterConnectionKeeper ( private val LOG = LogUtil.getLogger(getClass) private var master: ActorRef = null - //Subscribe self to masterProxy, + // Subscribe self to masterProxy, masterProxy ! WatchMaster(self) def registerAppMaster: Cancellable = { masterProxy ! register - context.system.scheduler.scheduleOnce(FiniteDuration(30, TimeUnit.SECONDS), self, AppMasterRegisterTimeout) + context.system.scheduler.scheduleOnce(FiniteDuration(30, TimeUnit.SECONDS), + self, AppMasterRegisterTimeout) } context.become(waitMasterToConfirm(registerAppMaster)) @@ -87,10 +84,15 @@ class MasterConnectionKeeper ( } private[appmaster] object MasterConnectionKeeper { + case object AppMasterRegisterTimeout object MasterConnectionStatus { + case object MasterConnected + case object MasterStopped + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/client/ClientContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/client/ClientContext.scala b/core/src/main/scala/io/gearpump/cluster/client/ClientContext.scala index 3a2868c..41c01d8 100644 --- a/core/src/main/scala/io/gearpump/cluster/client/ClientContext.scala +++ b/core/src/main/scala/io/gearpump/cluster/client/ClientContext.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,10 +19,16 @@ package io.gearpump.cluster.client import java.util.concurrent.TimeUnit +import scala.collection.JavaConverters._ +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, Future} +import scala.util.Try import akka.actor.{ActorRef, ActorSystem} import akka.util.Timeout -import com.typesafe.config.{ConfigValueFactory, Config} +import com.typesafe.config.{Config, ConfigValueFactory} +import org.slf4j.Logger + import io.gearpump.cluster.MasterToAppMaster.{AppMastersData, ReplayFromTimestampWindowTrailingEdge} import io.gearpump.cluster.MasterToClient.ReplayApplicationResult import io.gearpump.cluster._ @@ -30,17 +36,12 @@ import io.gearpump.cluster.master.MasterProxy import io.gearpump.jarstore.JarStoreService import io.gearpump.util.Constants._ import io.gearpump.util.{ActorUtil, Constants, LogUtil, Util} -import org.slf4j.Logger - -import scala.collection.JavaConversions._ -import scala.concurrent.{Await, Future} -import scala.concurrent.duration.Duration -import scala.util.Try /** * ClientContext is a user facing util to submit/manage an application. + * + * TODO: add interface to query master here */ -//TODO: add interface to query master here class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) { def this(system: ActorSystem) = { @@ -54,7 +55,7 @@ class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) { private val LOG: Logger = LogUtil.getLogger(getClass) private implicit val timeout = Timeout(5, TimeUnit.SECONDS) - implicit val system = Option(sys).getOrElse(ActorSystem(s"client${Util.randInt}" , config)) + implicit val system = Option(sys).getOrElse(ActorSystem(s"client${Util.randInt()}", config)) LOG.info(s"Starting system ${system.name}") val shouldCleanupSystem = Option(sys).isEmpty @@ -62,16 +63,18 @@ class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) { jarStoreService.init(config, system) private lazy val master: ActorRef = { - val masters = config.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).toList.flatMap(Util.parseHostList) - val master = Option(_master).getOrElse(system.actorOf(MasterProxy.props(masters), s"masterproxy${system.name}")) + val masters = config.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).asScala + .flatMap(Util.parseHostList) + val master = Option(_master).getOrElse(system.actorOf(MasterProxy.props(masters), + s"masterproxy${system.name}")) LOG.info(s"Creating master proxy ${master} for master list: $masters") master } /** - * Submit an application with default jar setting. Use java property - * "gearpump.app.jar" if defined. Otherwise, will assume the jar is on - * the target runtime classpath, and will not send it. + * Submits an application with default jar setting. Use java property "gearpump.app.jar" if + * defined. Otherwise, it assumes the jar is on the target runtime classpath, thus will + * not send the jar across the wire. */ def submit(app: Application): Int = { submit(app, System.getProperty(GEARPUMP_APP_JAR)) @@ -86,7 +89,8 @@ class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) { val appName = checkAndAddNamePrefix(app.name, System.getProperty(GEARPUMP_APP_NAME_PREFIX)) val submissionConfig = getSubmissionConfig(config) .withValue(APPLICATION_EXECUTOR_NUMBER, ConfigValueFactory.fromAnyRef(executorNum)) - val appDescription = AppDescription(appName, app.appMaster.getName, app.userConfig, submissionConfig) + val appDescription = + AppDescription(appName, app.appMaster.getName, app.userConfig, submissionConfig) val appJar = Option(jar).map(loadFile) client.submitApplication(appDescription, appJar) } @@ -99,9 +103,11 @@ class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) { ClusterConfig.filterOutDefaultConfig(config) } - def replayFromTimestampWindowTrailingEdge(appId : Int): ReplayApplicationResult = { + def replayFromTimestampWindowTrailingEdge(appId: Int): ReplayApplicationResult = { import scala.concurrent.ExecutionContext.Implicits.global - val result = Await.result(ActorUtil.askAppMaster[ReplayApplicationResult](master, appId,ReplayFromTimestampWindowTrailingEdge(appId)), Duration.Inf) + val result = Await.result( + ActorUtil.askAppMaster[ReplayApplicationResult](master, + appId, ReplayFromTimestampWindowTrailingEdge(appId)), Duration.Inf) result } @@ -115,30 +121,30 @@ class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) { client.listApplications } - def shutdown(appId : Int) : Unit = { + def shutdown(appId: Int): Unit = { val client = getMasterClient client.shutdownApplication(appId) } - def resolveAppID(appId: Int) : ActorRef = { + def resolveAppID(appId: Int): ActorRef = { val client = getMasterClient client.resolveAppId(appId) } - def close() : Unit = { + def close(): Unit = { if (shouldCleanupSystem) { LOG.info(s"Shutting down system ${system.name}") - system.shutdown() + system.terminate() } } - private def loadFile(jarPath : String) : AppJar = { + private def loadFile(jarPath: String): AppJar = { val jarFile = new java.io.File(jarPath) val path = jarStoreService.copyFromLocal(jarFile) AppJar(jarFile.getName, path) } - private def checkAndAddNamePrefix(appName: String, namePrefix: String) : String = { + private def checkAndAddNamePrefix(appName: String, namePrefix: String): String = { val fullName = if (namePrefix != null && namePrefix != "") { namePrefix + "_" + appName } else { @@ -163,12 +169,17 @@ object ClientContext { def apply(): ClientContext = new ClientContext(ClusterConfig.default(), null, null) - def apply(system: ActorSystem) = new ClientContext(ClusterConfig.default(), system, null) + def apply(system: ActorSystem): ClientContext = { + new ClientContext(ClusterConfig.default(), system, null) + } - def apply(system: ActorSystem, master: ActorRef) = new ClientContext(ClusterConfig.default(), system, master) + def apply(system: ActorSystem, master: ActorRef): ClientContext = { + new ClientContext(ClusterConfig.default(), system, master) + } def apply(config: Config): ClientContext = new ClientContext(config, null, null) - def apply(config: Config, system: ActorSystem, master: ActorRef): ClientContext - = new ClientContext(config, system, master) + def apply(config: Config, system: ActorSystem, master: ActorRef): ClientContext = { + new ClientContext(config, system, master) + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/client/MasterClient.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/client/MasterClient.scala b/core/src/main/scala/io/gearpump/cluster/client/MasterClient.scala index 5800e8d..9edaf46 100644 --- a/core/src/main/scala/io/gearpump/cluster/client/MasterClient.scala +++ b/core/src/main/scala/io/gearpump/cluster/client/MasterClient.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,31 +18,36 @@ package io.gearpump.cluster.client +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, Future} +import scala.util.{Failure, Success} + import akka.actor.ActorRef import akka.pattern.ask import akka.util.Timeout -import io.gearpump.cluster.ClientToMaster._ -import io.gearpump.cluster.MasterToAppMaster.{AppMastersData, AppMastersDataRequest, ReplayFromTimestampWindowTrailingEdge} -import io.gearpump.cluster.MasterToClient.{ReplayApplicationResult, ResolveAppIdResult, ShutdownApplicationResult, SubmitApplicationResult} -import io.gearpump.cluster.{AppJar, AppDescription} -import io.gearpump.util.{ActorUtil, Constants} -import scala.concurrent.duration.Duration -import scala.concurrent.{ExecutionContext, Await, Future} -import scala.util.{Failure, Success} +import io.gearpump.cluster.ClientToMaster._ +import io.gearpump.cluster.MasterToAppMaster.{AppMastersData, AppMastersDataRequest} +import io.gearpump.cluster.MasterToClient.{ResolveAppIdResult, ShutdownApplicationResult, SubmitApplicationResult} +import io.gearpump.cluster.{AppDescription, AppJar} /** - * Client to Master node. - * Stateless, thread safe + * Client to inter-operate with Master node. + * + * NOTE: Stateless, thread safe */ -class MasterClient(master : ActorRef, timeout: Timeout) { +class MasterClient(master: ActorRef, timeout: Timeout) { implicit val masterClientTimeout = timeout - def submitApplication(app : AppDescription, appJar: Option[AppJar]) : Int = { - val result = Await.result( (master ? SubmitApplication(app, appJar)).asInstanceOf[Future[SubmitApplicationResult]], Duration.Inf) + def submitApplication(app: AppDescription, appJar: Option[AppJar]): Int = { + val result = Await.result( + (master ? SubmitApplication(app, appJar)).asInstanceOf[Future[SubmitApplicationResult]], + Duration.Inf) val appId = result.appId match { case Success(appId) => + // scalastyle:off println Console.println(s"Submit application succeed. The application id is $appId") + // scalastyle:on println appId case Failure(ex) => throw ex } @@ -50,15 +55,18 @@ class MasterClient(master : ActorRef, timeout: Timeout) { } def resolveAppId(appId: Int): ActorRef = { - val result = Await.result((master ? ResolveAppId(appId)).asInstanceOf[Future[ResolveAppIdResult]], Duration.Inf) + val result = Await.result( + (master ? ResolveAppId(appId)).asInstanceOf[Future[ResolveAppIdResult]], Duration.Inf) result.appMaster match { case Success(appMaster) => appMaster case Failure(ex) => throw ex } } - def shutdownApplication(appId : Int) : Unit = { - val result = Await.result((master ? ShutdownApplication(appId)).asInstanceOf[Future[ShutdownApplicationResult]], Duration.Inf) + def shutdownApplication(appId: Int): Unit = { + val result = Await.result( + (master ? ShutdownApplication(appId)).asInstanceOf[Future[ShutdownApplicationResult]], + Duration.Inf) result.appId match { case Success(_) => case Failure(ex) => throw ex @@ -66,7 +74,8 @@ class MasterClient(master : ActorRef, timeout: Timeout) { } def listApplications: AppMastersData = { - val result = Await.result((master ? AppMastersDataRequest).asInstanceOf[Future[AppMastersData]], Duration.Inf) + val result = Await.result( + (master ? AppMastersDataRequest).asInstanceOf[Future[AppMastersData]], Duration.Inf) result } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/main/ArgumentsParser.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/main/ArgumentsParser.scala b/core/src/main/scala/io/gearpump/cluster/main/ArgumentsParser.scala index f290241..209f831 100644 --- a/core/src/main/scala/io/gearpump/cluster/main/ArgumentsParser.scala +++ b/core/src/main/scala/io/gearpump/cluster/main/ArgumentsParser.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -20,39 +20,44 @@ package io.gearpump.cluster.main import io.gearpump.cluster.main.ArgumentsParser.Syntax -case class CLIOption[+T] (description:String = "", required: Boolean = false, defaultValue: Option[T] = None) +case class CLIOption[+T]( + description: String = "", required: Boolean = false, defaultValue: Option[T] = None) -class ParseResult(optionMap : Map[String, String], remainArguments : Array[String]) { - def getInt(key : String) = optionMap.get(key).get.toInt +class ParseResult(optionMap: Map[String, String], remainArguments: Array[String]) { + def getInt(key: String): Int = optionMap.get(key).get.toInt - def getString (key : String) = optionMap.get(key).get + def getString(key: String): String = optionMap.get(key).get - def getBoolean (key : String) = optionMap.get(key).get.toBoolean + def getBoolean(key: String): Boolean = optionMap.get(key).get.toBoolean - def exists(key : String) = !optionMap.getOrElse(key,"").isEmpty + def exists(key: String): Boolean = !(optionMap.getOrElse(key, "").isEmpty) - def remainArgs : Array[String] = this.remainArguments + def remainArgs: Array[String] = this.remainArguments } /** - * Parse command line arguments + * Parser for command line arguments + * * Grammar: -option1 value1 -option2 value3 -flag1 -flag2 remainArg1 remainArg2... */ -trait ArgumentsParser { +trait ArgumentsParser { val ignoreUnknownArgument = false - def help: Unit = { - Console.err.println(s"\nHelp: $description") + // scalastyle:off println + def help(): Unit = { + Console.println(s"\nHelp: $description") var usage = List.empty[String] - options.map(kv => if(kv._2.required) { + options.map(kv => if (kv._2.required) { usage = usage :+ s"-${kv._1} (required:${kv._2.required})${kv._2.description}" } else { - usage = usage :+ s"-${kv._1} (required:${kv._2.required}, default:${kv._2.defaultValue.getOrElse("")})${kv._2.description}" + usage = usage :+ s"-${kv._1} (required:${kv._2.required}, " + + s"default:${kv._2.defaultValue.getOrElse("")})${kv._2.description}" }) usage :+= remainArgs.map(k => s"<$k>").mkString(" ") - usage.foreach(Console.err.println(_)) + usage.foreach(Console.println(_)) } + // scalastyle:on println def parse(args: Array[String]): ParseResult = { val syntax = Syntax(options, remainArgs, ignoreUnknownArgument) @@ -60,16 +65,18 @@ trait ArgumentsParser { } val description: String = "" - val options : Array[(String, CLIOption[Any])] = Array.empty[(String, CLIOption[Any])] - val remainArgs : Array[String] = Array.empty[String] + val options: Array[(String, CLIOption[Any])] = Array.empty[(String, CLIOption[Any])] + val remainArgs: Array[String] = Array.empty[String] } object ArgumentsParser { - case class Syntax(val options: Array[(String, CLIOption[Any])], val remainArgs : Array[String], val ignoreUnknownArgument: Boolean) + case class Syntax( + val options: Array[(String, CLIOption[Any])], val remainArgs: Array[String], + val ignoreUnknownArgument: Boolean) def parse(syntax: Syntax, args: Array[String]): ParseResult = { - import syntax.{options, remainArgs, ignoreUnknownArgument} + import syntax.{ignoreUnknownArgument, options, remainArgs} var config = Map.empty[String, String] var remain = Array.empty[String] @@ -100,14 +107,16 @@ object ArgumentsParser { doParse(rest) case value :: rest => + // scalastyle:off println Console.err.println(s"Warning: get unknown argument $value, maybe it is a main class") + // scalastyle:on println remain ++= value :: rest doParse(Nil) } } doParse(args.toList) - options.foreach{pair => + options.foreach { pair => val (key, option) = pair if (!config.contains(key) && !option.required) { config += key -> option.defaultValue.getOrElse("").toString http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/master/AppMasterLauncher.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/master/AppMasterLauncher.scala b/core/src/main/scala/io/gearpump/cluster/master/AppMasterLauncher.scala index d391bba..fb3e5c4 100644 --- a/core/src/main/scala/io/gearpump/cluster/master/AppMasterLauncher.scala +++ b/core/src/main/scala/io/gearpump/cluster/master/AppMasterLauncher.scala @@ -19,36 +19,36 @@ package io.gearpump.cluster.master import java.util.concurrent.{TimeUnit, TimeoutException} +import scala.collection.JavaConverters._ +import scala.concurrent.duration.Duration +import scala.util.{Failure, Success} import akka.actor.{Actor, ActorRef, Props, _} import com.typesafe.config.Config -import io.gearpump.cluster.{AppJar, AppDescription} +import org.slf4j.Logger + import io.gearpump.cluster.AppMasterToMaster.RequestResource import io.gearpump.cluster.AppMasterToWorker.{LaunchExecutor, ShutdownExecutor} import io.gearpump.cluster.MasterToAppMaster.ResourceAllocated import io.gearpump.cluster.MasterToClient.SubmitApplicationResult import io.gearpump.cluster.WorkerToAppMaster.ExecutorLaunchRejected -import io.gearpump.cluster._ -import io.gearpump.cluster.appmaster.{WorkerInfo, AppMasterRuntimeEnvironment, AppMasterRuntimeInfo} +import io.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, AppMasterRuntimeInfo, WorkerInfo} import io.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest} +import io.gearpump.cluster.worker.WorkerId +import io.gearpump.cluster.{AppDescription, AppJar, _} import io.gearpump.transport.HostPort import io.gearpump.util.ActorSystemBooter._ import io.gearpump.util.Constants._ import io.gearpump.util.{ActorSystemBooter, ActorUtil, LogUtil, Util} -import org.slf4j.Logger -import scala.collection.JavaConverters._ -import scala.concurrent.duration.Duration -import scala.util.{Failure, Success} -import io.gearpump.WorkerId /** * * AppMasterLauncher is a child Actor of AppManager, it is responsible * to launch the AppMaster on the cluster. */ class AppMasterLauncher( - appId : Int, executorId: Int, app : AppDescription, - jar: Option[AppJar], username : String, master : ActorRef, client: Option[ActorRef]) + appId: Int, executorId: Int, app: AppDescription, + jar: Option[AppJar], username: String, master: ActorRef, client: Option[ActorRef]) extends Actor { private val LOG: Logger = LogUtil.getLogger(getClass, app = appId) @@ -61,9 +61,9 @@ class AppMasterLauncher( LOG.info(s"Ask Master resource to start AppMaster $appId...") master ! RequestResource(appId, ResourceRequest(Resource(1), WorkerId.unspecified)) - def receive : Receive = waitForResourceAllocation + def receive: Receive = waitForResourceAllocation - def waitForResourceAllocation : Receive = { + def waitForResourceAllocation: Receive = { case ResourceAllocated(allocations) => val ResourceAllocation(resource, worker, workerId) = allocations(0) @@ -74,22 +74,27 @@ class AppMasterLauncher( val appMasterInfo = AppMasterRuntimeInfo(appId, app.name, worker, username, submissionTime, config = appMasterAkkaConfig) val workerInfo = WorkerInfo(workerId, worker) - val appMasterContext = AppMasterContext(appId, username, resource, workerInfo, jar, null, appMasterInfo) + val appMasterContext = + AppMasterContext(appId, username, resource, workerInfo, jar, null, appMasterInfo) LOG.info(s"Try to launch a executor for AppMaster on worker ${workerId} for app $appId") val name = ActorUtil.actorNameForExecutor(appId, executorId) val selfPath = ActorUtil.getFullPath(context.system, self.path) - val jvmSetting = Util.resolveJvmSetting(appMasterAkkaConfig.withFallback(systemConfig)).appMater - val executorJVM = ExecutorJVMConfig(jvmSetting.classPath ,jvmSetting.vmargs, - classOf[ActorSystemBooter].getName, Array(name, selfPath), jar, username, appMasterAkkaConfig) + val jvmSetting = + Util.resolveJvmSetting(appMasterAkkaConfig.withFallback(systemConfig)).appMater + val executorJVM = ExecutorJVMConfig(jvmSetting.classPath, jvmSetting.vmargs, + classOf[ActorSystemBooter].getName, Array(name, selfPath), jar, + username, appMasterAkkaConfig) worker ! LaunchExecutor(appId, executorId, resource, executorJVM) context.become(waitForActorSystemToStart(worker, appMasterContext, app.userConfig, resource)) } - def waitForActorSystemToStart(worker : ActorRef, appContext : AppMasterContext, user : UserConfig, resource: Resource) : Receive = { + def waitForActorSystemToStart( + worker: ActorRef, appContext: AppMasterContext, user: UserConfig, resource: Resource) + : Receive = { case ExecutorLaunchRejected(reason, ex) => - LOG.error(s"Executor Launch failed reasonï¼$reason", ex) + LOG.error(s"Executor Launch failed reason: $reason", ex) LOG.info(s"reallocate resource $resource to start appmaster") master ! RequestResource(appId, ResourceRequest(resource, WorkerId.unspecified)) context.become(waitForResourceAllocation) @@ -99,7 +104,8 @@ class AppMasterLauncher( val masterAddress = systemConfig.getStringList(GEARPUMP_CLUSTER_MASTERS) .asScala.map(HostPort(_)).map(ActorUtil.getMasterActorPath) - sender ! CreateActor(AppMasterRuntimeEnvironment.props(masterAddress, app, appContext), s"appdaemon$appId") + sender ! CreateActor( + AppMasterRuntimeEnvironment.props(masterAddress, app, appContext), s"appdaemon$appId") import context.dispatcher val appMasterTimeout = scheduler.scheduleOnce(TIMEOUT, self, @@ -107,7 +113,7 @@ class AppMasterLauncher( context.become(waitForAppMasterToStart(worker, appMasterTimeout)) } - def waitForAppMasterToStart(worker : ActorRef, cancel: Cancellable) : Receive = { + def waitForAppMasterToStart(worker: ActorRef, cancel: Cancellable): Receive = { case ActorCreated(appMaster, _) => cancel.cancel() sender ! BindLifeCycle(appMaster) @@ -121,19 +127,21 @@ class AppMasterLauncher( context.stop(self) } - def replyToClient(result : SubmitApplicationResult) : Unit = { + def replyToClient(result: SubmitApplicationResult): Unit = { if (client.isDefined) { client.get.tell(result, master) } } } -object AppMasterLauncher extends AppMasterLauncherFactory{ - def props(appId : Int, executorId: Int, app : AppDescription, jar: Option[AppJar], username : String, master : ActorRef, client: Option[ActorRef]) = { +object AppMasterLauncher extends AppMasterLauncherFactory { + def props(appId: Int, executorId: Int, app: AppDescription, jar: Option[AppJar], + username: String, master: ActorRef, client: Option[ActorRef]): Props = { Props(new AppMasterLauncher(appId, executorId, app, jar, username, master, client)) } } trait AppMasterLauncherFactory { - def props(appId : Int, executorId: Int, app : AppDescription, jar: Option[AppJar], username : String, master : ActorRef, client: Option[ActorRef]) : Props + def props(appId: Int, executorId: Int, app: AppDescription, jar: Option[AppJar], + username: String, master: ActorRef, client: Option[ActorRef]): Props } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/cluster/master/MasterProxy.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/master/MasterProxy.scala b/core/src/main/scala/io/gearpump/cluster/master/MasterProxy.scala index 5d9e410..61d95dc 100644 --- a/core/src/main/scala/io/gearpump/cluster/master/MasterProxy.scala +++ b/core/src/main/scala/io/gearpump/cluster/master/MasterProxy.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,24 +18,21 @@ package io.gearpump.cluster.master +import scala.concurrent.duration.FiniteDuration import akka.actor._ -import io.gearpump.transport.HostPort -import io.gearpump.util.{ActorUtil, LogUtil} import org.slf4j.Logger -import scala.concurrent.duration.{FiniteDuration, Duration} +import io.gearpump.transport.HostPort +import io.gearpump.util.{ActorUtil, LogUtil} /** * This works with Master HA. When there are multiple Master nodes, * This will find a active one. - * - * - * @param masters */ -class MasterProxy (masters: Iterable[ActorPath], timeout: FiniteDuration) +class MasterProxy(masters: Iterable[ActorPath], timeout: FiniteDuration) extends Actor with Stash { - import MasterProxy._ + import io.gearpump.cluster.master.MasterProxy._ val LOG: Logger = LogUtil.getLogger(getClass, name = self.path.name) @@ -48,10 +45,12 @@ class MasterProxy (masters: Iterable[ActorPath], timeout: FiniteDuration) import context.dispatcher - def findMaster() = repeatActionUtil(timeout){ - contacts foreach { contact => - LOG.info(s"sending identity to $contact") - contact ! Identify(None) + def findMaster(): Cancellable = { + repeatActionUtil(timeout) { + contacts foreach { contact => + LOG.info(s"sending identity to $contact") + contact ! Identify(None) + } } } @@ -64,11 +63,11 @@ class MasterProxy (masters: Iterable[ActorPath], timeout: FiniteDuration) super.postStop() } - override def receive : Receive = { - case _=> + override def receive: Receive = { + case _ => } - def establishing(findMaster : Cancellable): Actor.Receive = { + def establishing(findMaster: Cancellable): Actor.Receive = { case ActorIdentity(_, Some(receptionist)) => context watch receptionist LOG.info("Connected to [{}]", receptionist.path) @@ -85,10 +84,10 @@ class MasterProxy (masters: Iterable[ActorPath], timeout: FiniteDuration) } def active(receptionist: ActorRef): Actor.Receive = { - case Terminated(receptionist) â + case Terminated(receptionist) => LOG.info("Lost contact with [{}], restablishing connection", receptionist) context.become(establishing(findMaster)) - case _: ActorIdentity â // ok, from previous establish, already handled + case _: ActorIdentity => // ok, from previous establish, already handled case WatchMaster(watcher) => watchers = watchers :+ watcher } @@ -99,10 +98,10 @@ class MasterProxy (masters: Iterable[ActorPath], timeout: FiniteDuration) master forward msg } - def scheduler = context.system.scheduler + def scheduler: Scheduler = context.system.scheduler import scala.concurrent.duration._ - private def repeatActionUtil(timeout: FiniteDuration)(action : => Unit) : Cancellable = { - val send = scheduler.schedule(0 seconds, 2 seconds)(action) + private def repeatActionUtil(timeout: FiniteDuration)(action: => Unit): Cancellable = { + val send = scheduler.schedule(0.seconds, 2.seconds)(action) val suicide = scheduler.scheduleOnce(timeout) { send.cancel() self ! PoisonPill @@ -128,7 +127,7 @@ object MasterProxy { case class WatchMaster(watcher: ActorRef) import scala.concurrent.duration._ - def props(masters: Iterable[HostPort], duration: FiniteDuration = 30 seconds): Props = { + def props(masters: Iterable[HostPort], duration: FiniteDuration = 30.seconds): Props = { val contacts = masters.map(ActorUtil.getMasterActorPath(_)) Props(new MasterProxy(contacts, duration)) }
