http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/UserConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/UserConfig.scala b/core/src/main/scala/org/apache/gearpump/cluster/UserConfig.scala new file mode 100644 index 0000000..28a4907 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/cluster/UserConfig.scala @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster + +import akka.actor.{ActorSystem, ExtendedActorSystem} +import akka.serialization.JavaSerializer + +import io.gearpump.google.common.io.BaseEncoding + +/** + * Immutable configuration + */ +final class UserConfig(private val _config: Map[String, String]) extends Serializable { + + def withBoolean(key: String, value: Boolean): UserConfig = { + new UserConfig(_config + (key -> value.toString)) + } + + def withDouble(key: String, value: Double): UserConfig = { + new UserConfig(_config + (key -> value.toString)) + } + + def withFloat(key: String, value: Float): UserConfig = { + new UserConfig(_config + (key -> value.toString)) + } + + def withInt(key: String, value: Int): UserConfig = { + new UserConfig(_config + (key -> value.toString)) + } + + def withLong(key: String, value: Long): UserConfig = { + new UserConfig(_config + (key -> value.toString)) + } + + def withString(key: String, value: String): UserConfig = { + if (null == value) { + this + } else { + new UserConfig(_config + (key -> value)) + } + } + + def without(key: String): UserConfig = { + val config = _config - key + new UserConfig(config) + } + + def filter(p: ((String, String)) => Boolean): UserConfig = { + val updated = _config.filter(p) + new UserConfig(updated) + } + + def getBoolean(key: String): Option[Boolean] = { + _config.get(key).map(_.toBoolean) + } + + def getDouble(key: String): Option[Double] = { + _config.get(key).map(_.toDouble) + } + + def getFloat(key: String): Option[Float] = { + _config.get(key).map(_.toFloat) + } + + def getInt(key: String): Option[Int] = { + _config.get(key).map(_.toInt) + } + + def getLong(key: String): Option[Long] = { + _config.get(key).map(_.toLong) + } + + def getString(key: String): Option[String] = { + _config.get(key) + } + + def getBytes(key: String): Option[Array[Byte]] = { + _config.get(key).map(BaseEncoding.base64().decode(_)) + } + + def withBytes(key: String, value: Array[Byte]): UserConfig = { + if (null == value) { + this + } else { + this.withString(key, BaseEncoding.base64().encode(value)) + } + } + + // scalastyle:off line.size.limit + /** + * This de-serializes value to object instance + * + * To do de-serialization, this requires an implicit ActorSystem, as + * the ActorRef and possibly other akka classes deserialization + * requires an implicit ActorSystem. + * + * See Link: + * http://doc.akka.io/docs/akka/snapshot/scala/serialization.html#A_Word_About_Java_Serialization + */ + + def getValue[T](key: String)(implicit system: ActorSystem): Option[T] = { + val serializer = new JavaSerializer(system.asInstanceOf[ExtendedActorSystem]) + _config.get(key).map(BaseEncoding.base64().decode(_)) + .map(serializer.fromBinary(_).asInstanceOf[T]) + } + + /** + * This serializes the object and store it as string. + * + * To do serialization, this requires an implicit ActorSystem, as + * the ActorRef and possibly other akka classes serialization + * requires an implicit ActorSystem. + * + * See Link: + * http://doc.akka.io/docs/akka/snapshot/scala/serialization.html#A_Word_About_Java_Serialization + */ + def withValue[T <: AnyRef](key: String, value: T)(implicit system: ActorSystem): UserConfig = { + + if (null == value) { + this + } else { + val serializer = new JavaSerializer(system.asInstanceOf[ExtendedActorSystem]) + val bytes = serializer.toBinary(value) + val encoded = BaseEncoding.base64().encode(bytes) + this.withString(key, encoded) + } + } + // scalastyle:on line.size.limit + + def withConfig(other: UserConfig): UserConfig = { + if (null == other) { + this + } else { + new UserConfig(_config ++ other._config) + } + } +} + +object UserConfig { + + def empty: UserConfig = new UserConfig(Map.empty[String, String]) + + def apply(config: Map[String, String]): UserConfig = new UserConfig(config) + + def unapply(config: UserConfig): Option[Map[String, String]] = Option(config._config) +}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeEnvironment.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeEnvironment.scala b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeEnvironment.scala new file mode 100644 index 0000000..170e56a --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeEnvironment.scala @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.appmaster + +import akka.actor._ + +import org.apache.gearpump.cluster.AppMasterToMaster.RegisterAppMaster +import org.apache.gearpump.cluster.appmaster.AppMasterRuntimeEnvironment._ +import org.apache.gearpump.cluster.appmaster.ExecutorSystemScheduler.{Session, StartExecutorSystems} +import org.apache.gearpump.cluster.appmaster.MasterConnectionKeeper.MasterConnectionStatus._ +import org.apache.gearpump.cluster.master.MasterProxy +import org.apache.gearpump.cluster.{AppDescription, AppMasterContext} +import org.apache.gearpump.util.LogUtil + +/** + * This serves as runtime environment for AppMaster. + * When starting an AppMaster, we need to setup the connection to master, + * and prepare other environments. + * + * This also extend the function of Master, by providing a scheduler service for Executor System. + * AppMaster can ask Master for executor system directly. details like requesting resource, + * contacting worker to start a process, and then starting an executor system is hidden from + * AppMaster. + * + * Please use AppMasterRuntimeEnvironment.props() to construct this actor. + */ +private[appmaster] +class AppMasterRuntimeEnvironment( + appContextInput: AppMasterContext, + app: AppDescription, + masters: Iterable[ActorPath], + masterFactory: (AppId, MasterActorRef) => Props, + appMasterFactory: (AppMasterContext, AppDescription) => Props, + masterConnectionKeeperFactory: (MasterActorRef, RegisterAppMaster, ListenerActorRef) => Props) + extends Actor { + + val appId = appContextInput.appId + private val LOG = LogUtil.getLogger(getClass, app = appId) + + import scala.concurrent.duration._ + + private val master = context.actorOf( + masterFactory(appId, context.actorOf(Props(new MasterProxy(masters, 30.seconds))))) + private val appContext = appContextInput.copy(masterProxy = master) + + // Create appMaster proxy to receive command and forward to appmaster + private val appMaster = context.actorOf(appMasterFactory(appContext, app)) + context.watch(appMaster) + + private val registerAppMaster = RegisterAppMaster(appMaster, appContext.registerData) + private val masterConnectionKeeper = context.actorOf( + masterConnectionKeeperFactory(master, registerAppMaster, self)) + context.watch(masterConnectionKeeper) + + def receive: Receive = { + case MasterConnected => + LOG.info(s"Master is connected, start AppMaster ${appId}...") + appMaster ! StartAppMaster + case MasterStopped => + LOG.error(s"Master is stopped, stop AppMaster ${appId}...") + context.stop(self) + case Terminated(actor) => actor match { + case `appMaster` => + LOG.error(s"AppMaster ${appId} is stopped, shutdown myself") + context.stop(self) + case `masterConnectionKeeper` => + LOG.error(s"Master connection keeper is stopped, appId: ${appId}, shutdown myself") + context.stop(self) + case _ => // Skip + } + } +} + +object AppMasterRuntimeEnvironment { + + def props( + masters: Iterable[ActorPath], app: AppDescription, appContextInput: AppMasterContext) + : Props = { + + val master = (appId: AppId, masterProxy: MasterActorRef) => + MasterWithExecutorSystemProvider.props(appId, masterProxy) + + val appMaster = (appContext: AppMasterContext, app: AppDescription) => + LazyStartAppMaster.props(appContext, app) + + val masterConnectionKeeper = (master: MasterActorRef, registerAppMaster: + RegisterAppMaster, listener: ListenerActorRef) => Props(new MasterConnectionKeeper( + registerAppMaster, master, masterStatusListener = listener)) + + Props(new AppMasterRuntimeEnvironment( + appContextInput, app, masters, master, appMaster, masterConnectionKeeper)) + } + + /** + * This behavior like a AppMaster. Under the hood, It start start the real AppMaster in a lazy + * way. When real AppMaster is not started yet, all messages are stashed. The stashed + * messages are forwarded to real AppMaster when the real AppMaster is started. + * + * Please use LazyStartAppMaster.props to construct this actor + * + * @param appMasterProps underlying AppMaster Props + */ + private[appmaster] + class LazyStartAppMaster(appId: Int, appMasterProps: Props) extends Actor with Stash { + + private val LOG = LogUtil.getLogger(getClass, app = appId) + + def receive: Receive = null + + context.become(startAppMaster) + + def startAppMaster: Receive = { + case StartAppMaster => + val appMaster = context.actorOf(appMasterProps, "appmaster") + context.watch(appMaster) + context.become(terminationWatch(appMaster) orElse appMasterService(appMaster)) + unstashAll() + case _ => + stash() + } + + def terminationWatch(appMaster: ActorRef): Receive = { + case Terminated(appMaster) => + LOG.error("appmaster is stopped") + context.stop(self) + } + + def appMasterService(appMaster: ActorRef): Receive = { + case msg => appMaster forward msg + } + } + + private[appmaster] + object LazyStartAppMaster { + def props(appContext: AppMasterContext, app: AppDescription): Props = { + val appMasterProps = Props(Class.forName(app.appMaster), appContext, app) + Props(new LazyStartAppMaster(appContext.appId, appMasterProps)) + } + } + + private[appmaster] case object StartAppMaster + + /** + * This enhance Master by providing new service: StartExecutorSystems + * + * Please use MasterWithExecutorSystemProvider.props to construct this actor + * + */ + private[appmaster] + class MasterWithExecutorSystemProvider(master: ActorRef, executorSystemProviderProps: Props) + extends Actor { + + val executorSystemProvider: ActorRef = context.actorOf(executorSystemProviderProps) + + override def receive: Receive = { + case request: StartExecutorSystems => + executorSystemProvider forward request + case msg => + master forward msg + } + } + + private[appmaster] + object MasterWithExecutorSystemProvider { + def props(appId: Int, master: ActorRef): Props = { + + val executorSystemLauncher = (appId: Int, session: Session) => + Props(new ExecutorSystemLauncher(appId, session)) + + val scheduler = Props(new ExecutorSystemScheduler(appId, master, executorSystemLauncher)) + + Props(new MasterWithExecutorSystemProvider(master, scheduler)) + } + } + + private[appmaster] type AppId = Int + private[appmaster] type MasterActorRef = ActorRef + private[appmaster] type ListenerActorRef = ActorRef +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeInfo.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeInfo.scala b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeInfo.scala new file mode 100644 index 0000000..b3ec88c --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeInfo.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.appmaster + +import akka.actor.ActorRef +import com.typesafe.config.Config + +import org.apache.gearpump._ +import org.apache.gearpump.cluster.AppMasterRegisterData + +/** Run time info used to start an AppMaster */ +case class AppMasterRuntimeInfo( + appId: Int, + // AppName is the unique Id for an application + appName: String, + worker: ActorRef = null, + user: String = null, + submissionTime: TimeStamp = 0, + startTime: TimeStamp = 0, + finishTime: TimeStamp = 0, + config: Config = null) + extends AppMasterRegisterData http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationState.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationState.scala b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationState.scala new file mode 100644 index 0000000..7240113 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationState.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.appmaster + +import org.apache.gearpump.cluster.{AppDescription, AppJar} + +/** + * This state for single application, it is be distributed across the masters. + */ +case class ApplicationState( + appId: Int, appName: String, attemptId: Int, app: AppDescription, jar: Option[AppJar], + username: String, state: Any) extends Serializable { + + override def equals(other: Any): Boolean = { + other match { + case that: ApplicationState => + if (appId == that.appId && attemptId == that.attemptId) { + true + } else { + false + } + case _ => + false + } + } + + override def hashCode: Int = { + import akka.routing.MurmurHash._ + extendHash(appId, attemptId, startMagicA, startMagicB) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ExecutorSystem.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ExecutorSystem.scala b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ExecutorSystem.scala new file mode 100644 index 0000000..5c44652 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ExecutorSystem.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.appmaster + +import akka.actor.{ActorRef, Address, PoisonPill} + +import org.apache.gearpump.cluster.scheduler.Resource +import org.apache.gearpump.cluster.worker.WorkerId +import org.apache.gearpump.util.ActorSystemBooter.BindLifeCycle + +case class WorkerInfo(workerId: WorkerId, ref: ActorRef) + +/** + * Configurations to start an executor system on remote machine + * + * @param address Remote address where we start an Actor System. + */ +case class ExecutorSystem(executorSystemId: Int, address: Address, daemon: + ActorRef, resource: Resource, worker: WorkerInfo) { + def bindLifeCycleWith(actor: ActorRef): Unit = { + daemon ! BindLifeCycle(actor) + } + + def shutdown(): Unit = { + daemon ! PoisonPill + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ExecutorSystemLauncher.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ExecutorSystemLauncher.scala b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ExecutorSystemLauncher.scala new file mode 100644 index 0000000..0c5dbff --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ExecutorSystemLauncher.scala @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.appmaster + +import scala.concurrent.duration._ + +import akka.actor._ +import org.slf4j.Logger + +import org.apache.gearpump.cluster.AppMasterToWorker.LaunchExecutor +import org.apache.gearpump.cluster.ExecutorJVMConfig +import org.apache.gearpump.cluster.WorkerToAppMaster._ +import org.apache.gearpump.cluster.appmaster.ExecutorSystemLauncher._ +import org.apache.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSystemJvmConfig, Session} +import org.apache.gearpump.cluster.scheduler.Resource +import org.apache.gearpump.util.ActorSystemBooter.{ActorSystemRegistered, RegisterActorSystem} +import org.apache.gearpump.util.{ActorSystemBooter, ActorUtil, Constants, LogUtil} + +/** + * This launches single executor system on target worker. + * + * Please use ExecutorSystemLauncher.props() to construct this actor + * + * @param session The session that request to launch executor system + */ +private[appmaster] +class ExecutorSystemLauncher(appId: Int, session: Session) extends Actor { + + private val LOG: Logger = LogUtil.getLogger(getClass) + + val scheduler = context.system.scheduler + implicit val executionContext = context.dispatcher + + private val systemConfig = context.system.settings.config + val timeoutSetting = systemConfig.getInt(Constants.GEARPUMP_START_EXECUTOR_SYSTEM_TIMEOUT_MS) + + val timeout = scheduler.scheduleOnce(timeoutSetting.milliseconds, + self, LaunchExecutorSystemTimeout(session)) + + def receive: Receive = waitForLaunchCommand + + def waitForLaunchCommand: Receive = { + case LaunchExecutorSystem(worker, executorSystemId, resource) => + val launcherPath = ActorUtil.getFullPath(context.system, self.path) + val jvmConfig = Option(session.executorSystemJvmConfig) + .map(getExecutorJvmConfig(_, s"app${appId}system${executorSystemId}", launcherPath)).orNull + + val launch = LaunchExecutor(appId, executorSystemId, resource, jvmConfig) + LOG.info(s"Launching Executor ...appId: $appId, executorSystemId: $executorSystemId, " + + s"slots: ${resource.slots} on worker $worker") + + worker.ref ! launch + context.become(waitForActorSystemToStart(sender, launch, worker, executorSystemId)) + } + + def waitForActorSystemToStart( + replyTo: ActorRef, launch: LaunchExecutor, worker: WorkerInfo, executorSystemId: Int) + : Receive = { + case RegisterActorSystem(systemPath) => + import launch._ + timeout.cancel() + LOG.info(s"Received RegisterActorSystem $systemPath for session ${session.requestor}") + sender ! ActorSystemRegistered(worker.ref) + val system = + ExecutorSystem(executorId, AddressFromURIString(systemPath), sender, resource, worker) + replyTo ! LaunchExecutorSystemSuccess(system, session) + context.stop(self) + case reject@ExecutorLaunchRejected(reason, ex) => + LOG.error(s"Executor Launch ${launch.resource} failed reason: $reason", ex) + replyTo ! LaunchExecutorSystemRejected(launch.resource, reason, session) + context.stop(self) + case timeout: LaunchExecutorSystemTimeout => + LOG.error(s"The Executor ActorSystem $executorSystemId has not been started in time") + replyTo ! timeout + context.stop(self) + } +} + +private[appmaster] +object ExecutorSystemLauncher { + + case class LaunchExecutorSystem(worker: WorkerInfo, systemId: Int, resource: Resource) + + case class LaunchExecutorSystemSuccess(system: ExecutorSystem, session: Session) + + case class LaunchExecutorSystemRejected(resource: Resource, reason: Any, session: Session) + + case class LaunchExecutorSystemTimeout(session: Session) + + private def getExecutorJvmConfig(conf: ExecutorSystemJvmConfig, systemName: String, + reportBack: String): ExecutorJVMConfig = { + Option(conf).map { conf => + import conf._ + ExecutorJVMConfig(classPath, jvmArguments, classOf[ActorSystemBooter].getName, + Array(systemName, reportBack), jar, username, executorAkkaConfig) + }.getOrElse(null) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala new file mode 100644 index 0000000..d73cc2f --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.appmaster + +import org.apache.gearpump.cluster.worker.WorkerId + +import scala.concurrent.duration._ + +import akka.actor._ +import com.typesafe.config.Config + +import org.apache.gearpump.cluster.AppMasterToMaster.RequestResource +import org.apache.gearpump.cluster.MasterToAppMaster.ResourceAllocated +import org.apache.gearpump.cluster._ +import org.apache.gearpump.cluster.appmaster.ExecutorSystemLauncher._ +import org.apache.gearpump.cluster.appmaster.ExecutorSystemScheduler._ +import org.apache.gearpump.cluster.scheduler.{ResourceAllocation, ResourceRequest} +import org.apache.gearpump.util.{Constants, LogUtil} + +/** + * ExecutorSystem is also a type of resource, this class schedules ExecutorSystem for AppMaster. + * AppMaster can use this class to directly request a live executor actor systems. The communication + * in the background with Master and Worker is hidden from AppMaster. + * + * Please use ExecutorSystemScheduler.props() to construct this actor + */ +private[appmaster] +class ExecutorSystemScheduler(appId: Int, masterProxy: ActorRef, + executorSystemLauncher: (Int, Session) => Props) extends Actor { + + private val LOG = LogUtil.getLogger(getClass, app = appId) + implicit val timeout = Constants.FUTURE_TIMEOUT + implicit val actorSystem = context.system + var currentSystemId = 0 + + var resourceAgents = Map.empty[Session, ActorRef] + + def receive: Receive = { + clientCommands orElse resourceAllocationMessageHandler orElse executorSystemMessageHandler + } + + def clientCommands: Receive = { + case start: StartExecutorSystems => + LOG.info(s"starting executor systems (ExecutorSystemConfig(${start.executorSystemConfig}), " + + s"Resources(${start.resources.mkString(",")}))") + val requestor = sender() + val executorSystemConfig = start.executorSystemConfig + val session = Session(requestor, executorSystemConfig) + val agent = resourceAgents.getOrElse(session, + context.actorOf(Props(new ResourceAgent(masterProxy, session)))) + resourceAgents = resourceAgents + (session -> agent) + + start.resources.foreach { resource => + agent ! RequestResource(appId, resource) + } + + case StopExecutorSystem(executorSystem) => + executorSystem.shutdown + } + + def resourceAllocationMessageHandler: Receive = { + case ResourceAllocatedForSession(allocations, session) => + if (isSessionAlive(session)) { + allocations.foreach { resourceAllocation => + val ResourceAllocation(resource, worker, workerId) = resourceAllocation + + val launcher = context.actorOf(executorSystemLauncher(appId, session)) + launcher ! LaunchExecutorSystem(WorkerInfo(workerId, worker), currentSystemId, resource) + currentSystemId = currentSystemId + 1 + } + } + case ResourceAllocationTimeOut(session) => + if (isSessionAlive(session)) { + resourceAgents = resourceAgents - session + session.requestor ! StartExecutorSystemTimeout + } + } + + def executorSystemMessageHandler: Receive = { + case LaunchExecutorSystemSuccess(system, session) => + if (isSessionAlive(session)) { + LOG.info("LaunchExecutorSystemSuccess, send back to " + session.requestor) + system.bindLifeCycleWith(self) + session.requestor ! ExecutorSystemStarted(system, session.executorSystemJvmConfig.jar) + } else { + LOG.error("We get a ExecutorSystem back, but resource requestor is no longer valid. " + + "Will shutdown the allocated system") + system.shutdown + } + case LaunchExecutorSystemTimeout(session) => + if (isSessionAlive(session)) { + LOG.error(s"Failed to launch executor system for ${session.requestor} due to timeout") + session.requestor ! StartExecutorSystemTimeout + } + + case LaunchExecutorSystemRejected(resource, reason, session) => + if (isSessionAlive(session)) { + LOG.error(s"Failed to launch executor system, due to $reason, " + + s"will ask master to allocate new resources $resource") + resourceAgents.get(session).map { resourceAgent: ActorRef => + resourceAgent ! RequestResource(appId, ResourceRequest(resource, WorkerId.unspecified)) + } + } + } + + private def isSessionAlive(session: Session): Boolean = { + Option(session).flatMap(session => resourceAgents.get(session)).nonEmpty + } +} + +object ExecutorSystemScheduler { + + case class StartExecutorSystems( + resources: Array[ResourceRequest], executorSystemConfig: ExecutorSystemJvmConfig) + + case class ExecutorSystemStarted(system: ExecutorSystem, boundedJar: Option[AppJar]) + + case class StopExecutorSystem(system: ExecutorSystem) + + case object StartExecutorSystemTimeout + + case class ExecutorSystemJvmConfig(classPath: Array[String], jvmArguments: Array[String], + jar: Option[AppJar], username: String, executorAkkaConfig: Config = null) + + /** + * For each client which ask for an executor system, the scheduler will create a session for it. + * + */ + private[appmaster] + case class Session(requestor: ActorRef, executorSystemJvmConfig: ExecutorSystemJvmConfig) + + /** + * This is a agent for session to request resource + * + * @param session the original requester of the resource requests + */ + private[appmaster] + class ResourceAgent(master: ActorRef, session: Session) extends Actor { + private var resourceRequestor: ActorRef = null + var timeOutClock: Cancellable = null + private var unallocatedResource: Int = 0 + + import context.dispatcher + + import org.apache.gearpump.util.Constants._ + + val timeout = context.system.settings.config.getInt(GEARPUMP_RESOURCE_ALLOCATION_TIMEOUT) + + def receive: Receive = { + case request: RequestResource => + unallocatedResource += request.request.resource.slots + Option(timeOutClock).map(_.cancel) + timeOutClock = context.system.scheduler.scheduleOnce( + timeout.seconds, self, ResourceAllocationTimeOut(session)) + resourceRequestor = sender + master ! request + case ResourceAllocated(allocations) => + unallocatedResource -= allocations.map(_.resource.slots).sum + resourceRequestor forward ResourceAllocatedForSession(allocations, session) + case timeout: ResourceAllocationTimeOut => + if (unallocatedResource > 0) { + resourceRequestor ! ResourceAllocationTimeOut(session) + // We will not receive any ResourceAllocation after timeout + context.stop(self) + } + } + } + + private[ExecutorSystemScheduler] + case class ResourceAllocatedForSession(resource: Array[ResourceAllocation], session: Session) + + private[ExecutorSystemScheduler] + case class ResourceAllocationTimeOut(session: Session) + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/appmaster/MasterConnectionKeeper.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/MasterConnectionKeeper.scala b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/MasterConnectionKeeper.scala new file mode 100644 index 0000000..fd2d11a --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/MasterConnectionKeeper.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.appmaster + +import java.util.concurrent.TimeUnit +import scala.concurrent.duration.FiniteDuration + +import akka.actor._ + +import org.apache.gearpump.cluster.AppMasterToMaster.RegisterAppMaster +import org.apache.gearpump.cluster.MasterToAppMaster.AppMasterRegistered +import org.apache.gearpump.cluster.appmaster.MasterConnectionKeeper.AppMasterRegisterTimeout +import org.apache.gearpump.cluster.appmaster.MasterConnectionKeeper.MasterConnectionStatus.{MasterConnected, MasterStopped} +import org.apache.gearpump.cluster.master.MasterProxy.{MasterRestarted, WatchMaster} +import org.apache.gearpump.util.LogUtil + +/** + * Watches the liveness of Master. + * + * When Master is restarted, it sends RegisterAppMaster to the new Master instance. + * If Master is stopped, it sends the MasterConnectionStatus to listener + * + * please use MasterConnectionKeeper.props() to construct this actor + */ +private[appmaster] +class MasterConnectionKeeper( + register: RegisterAppMaster, masterProxy: ActorRef, masterStatusListener: ActorRef) + extends Actor { + + import context.dispatcher + + private val LOG = LogUtil.getLogger(getClass) + private var master: ActorRef = null + + // Subscribe self to masterProxy, + masterProxy ! WatchMaster(self) + + def registerAppMaster: Cancellable = { + masterProxy ! register + context.system.scheduler.scheduleOnce(FiniteDuration(30, TimeUnit.SECONDS), + self, AppMasterRegisterTimeout) + } + + context.become(waitMasterToConfirm(registerAppMaster)) + + def waitMasterToConfirm(cancelRegister: Cancellable): Receive = { + case AppMasterRegistered(appId) => + cancelRegister.cancel() + masterStatusListener ! MasterConnected + context.become(masterLivenessListener) + case AppMasterRegisterTimeout => + cancelRegister.cancel() + masterStatusListener ! MasterStopped + context.stop(self) + } + + def masterLivenessListener: Receive = { + case MasterRestarted => + LOG.info("Master restarted, re-registering appmaster....") + context.become(waitMasterToConfirm(registerAppMaster)) + case MasterStopped => + LOG.info("Master is dead, killing this AppMaster....") + masterStatusListener ! MasterStopped + context.stop(self) + } + + def receive: Receive = null +} + +private[appmaster] object MasterConnectionKeeper { + + case object AppMasterRegisterTimeout + + object MasterConnectionStatus { + + case object MasterConnected + + case object MasterStopped + + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala b/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala new file mode 100644 index 0000000..245f1bc --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.client + +import java.util.concurrent.TimeUnit +import scala.collection.JavaConverters._ +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, Future} +import scala.util.Try + +import akka.actor.{ActorRef, ActorSystem} +import akka.util.Timeout +import com.typesafe.config.{Config, ConfigValueFactory} +import org.slf4j.Logger + +import org.apache.gearpump.cluster.MasterToAppMaster.{AppMastersData, ReplayFromTimestampWindowTrailingEdge} +import org.apache.gearpump.cluster.MasterToClient.ReplayApplicationResult +import org.apache.gearpump.cluster._ +import org.apache.gearpump.cluster.master.MasterProxy +import org.apache.gearpump.jarstore.JarStoreService +import org.apache.gearpump.util.Constants._ +import org.apache.gearpump.util.{ActorUtil, Constants, LogUtil, Util} + +/** + * ClientContext is a user facing util to submit/manage an application. + * + * TODO: add interface to query master here + */ +class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) { + + def this(system: ActorSystem) = { + this(system.settings.config, system, null) + } + + def this(config: Config) = { + this(config, null, null) + } + + private val LOG: Logger = LogUtil.getLogger(getClass) + private implicit val timeout = Timeout(5, TimeUnit.SECONDS) + + implicit val system = Option(sys).getOrElse(ActorSystem(s"client${Util.randInt()}", config)) + LOG.info(s"Starting system ${system.name}") + val shouldCleanupSystem = Option(sys).isEmpty + + private val jarStoreService = JarStoreService.get(config) + jarStoreService.init(config, system) + + private lazy val master: ActorRef = { + val masters = config.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).asScala + .flatMap(Util.parseHostList) + val master = Option(_master).getOrElse(system.actorOf(MasterProxy.props(masters), + s"masterproxy${system.name}")) + LOG.info(s"Creating master proxy ${master} for master list: $masters") + master + } + + /** + * Submits an application with default jar setting. Use java property "gearpump.app.jar" if + * defined. Otherwise, it assumes the jar is on the target runtime classpath, thus will + * not send the jar across the wire. + */ + def submit(app: Application): Int = { + submit(app, System.getProperty(GEARPUMP_APP_JAR)) + } + + def submit(app: Application, jar: String): Int = { + submit(app, jar, getExecutorNum()) + } + + def submit(app: Application, jar: String, executorNum: Int): Int = { + val client = getMasterClient + val appName = checkAndAddNamePrefix(app.name, System.getProperty(GEARPUMP_APP_NAME_PREFIX)) + val submissionConfig = getSubmissionConfig(config) + .withValue(APPLICATION_EXECUTOR_NUMBER, ConfigValueFactory.fromAnyRef(executorNum)) + val appDescription = + AppDescription(appName, app.appMaster.getName, app.userConfig, submissionConfig) + val appJar = Option(jar).map(loadFile) + client.submitApplication(appDescription, appJar) + } + + private def getExecutorNum(): Int = { + Try(System.getProperty(APPLICATION_EXECUTOR_NUMBER).toInt).getOrElse(1) + } + + private def getSubmissionConfig(config: Config): Config = { + ClusterConfig.filterOutDefaultConfig(config) + } + + def replayFromTimestampWindowTrailingEdge(appId: Int): ReplayApplicationResult = { + import scala.concurrent.ExecutionContext.Implicits.global + val result = Await.result( + ActorUtil.askAppMaster[ReplayApplicationResult](master, + appId, ReplayFromTimestampWindowTrailingEdge(appId)), Duration.Inf) + result + } + + def askAppMaster[T](appId: Int, msg: Any): Future[T] = { + import scala.concurrent.ExecutionContext.Implicits.global + ActorUtil.askAppMaster[T](master, appId, msg) + } + + def listApps: AppMastersData = { + val client = getMasterClient + client.listApplications + } + + def shutdown(appId: Int): Unit = { + val client = getMasterClient + client.shutdownApplication(appId) + } + + def resolveAppID(appId: Int): ActorRef = { + val client = getMasterClient + client.resolveAppId(appId) + } + + def close(): Unit = { + if (shouldCleanupSystem) { + LOG.info(s"Shutting down system ${system.name}") + system.terminate() + } + } + + private def loadFile(jarPath: String): AppJar = { + val jarFile = new java.io.File(jarPath) + val path = jarStoreService.copyFromLocal(jarFile) + AppJar(jarFile.getName, path) + } + + private def checkAndAddNamePrefix(appName: String, namePrefix: String): String = { + val fullName = if (namePrefix != null && namePrefix != "") { + namePrefix + "_" + appName + } else { + appName + } + if (!Util.validApplicationName(fullName)) { + close() + val error = s"The application name $appName is not a proper name. An app name can " + + "be a sequence of letters, numbers or underscore character \"_\"" + throw new Exception(error) + } + fullName + } + + private def getMasterClient: MasterClient = { + val timeout = Try(config.getInt(Constants.GEARPUMP_MASTERCLIENT_TIMEOUT)).getOrElse(90) + new MasterClient(master, akka.util.Timeout(timeout, TimeUnit.SECONDS)) + } +} + +object ClientContext { + + def apply(): ClientContext = new ClientContext(ClusterConfig.default(), null, null) + + def apply(system: ActorSystem): ClientContext = { + new ClientContext(ClusterConfig.default(), system, null) + } + + def apply(system: ActorSystem, master: ActorRef): ClientContext = { + new ClientContext(ClusterConfig.default(), system, master) + } + + def apply(config: Config): ClientContext = new ClientContext(config, null, null) + + def apply(config: Config, system: ActorSystem, master: ActorRef): ClientContext = { + new ClientContext(config, system, master) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/client/MasterClient.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/client/MasterClient.scala b/core/src/main/scala/org/apache/gearpump/cluster/client/MasterClient.scala new file mode 100644 index 0000000..77ebedf --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/cluster/client/MasterClient.scala @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.client + +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, Future} +import scala.util.{Failure, Success} + +import akka.actor.ActorRef +import akka.pattern.ask +import akka.util.Timeout + +import org.apache.gearpump.cluster.ClientToMaster._ +import org.apache.gearpump.cluster.MasterToAppMaster.{AppMastersData, AppMastersDataRequest} +import org.apache.gearpump.cluster.MasterToClient.{ResolveAppIdResult, ShutdownApplicationResult, SubmitApplicationResult} +import org.apache.gearpump.cluster.{AppDescription, AppJar} + +/** + * Client to inter-operate with Master node. + * + * NOTE: Stateless, thread safe + */ +class MasterClient(master: ActorRef, timeout: Timeout) { + implicit val masterClientTimeout = timeout + + def submitApplication(app: AppDescription, appJar: Option[AppJar]): Int = { + val result = Await.result( + (master ? SubmitApplication(app, appJar)).asInstanceOf[Future[SubmitApplicationResult]], + Duration.Inf) + val appId = result.appId match { + case Success(appId) => + // scalastyle:off println + Console.println(s"Submit application succeed. The application id is $appId") + // scalastyle:on println + appId + case Failure(ex) => throw ex + } + appId + } + + def resolveAppId(appId: Int): ActorRef = { + val result = Await.result( + (master ? ResolveAppId(appId)).asInstanceOf[Future[ResolveAppIdResult]], Duration.Inf) + result.appMaster match { + case Success(appMaster) => appMaster + case Failure(ex) => throw ex + } + } + + def shutdownApplication(appId: Int): Unit = { + val result = Await.result( + (master ? ShutdownApplication(appId)).asInstanceOf[Future[ShutdownApplicationResult]], + Duration.Inf) + result.appId match { + case Success(_) => + case Failure(ex) => throw ex + } + } + + def listApplications: AppMastersData = { + val result = Await.result( + (master ? AppMastersDataRequest).asInstanceOf[Future[AppMastersData]], Duration.Inf) + result + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/main/ArgumentsParser.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/main/ArgumentsParser.scala b/core/src/main/scala/org/apache/gearpump/cluster/main/ArgumentsParser.scala new file mode 100644 index 0000000..02c6f1a --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/cluster/main/ArgumentsParser.scala @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.main + +import org.apache.gearpump.cluster.main.ArgumentsParser.Syntax + +case class CLIOption[+T]( + description: String = "", required: Boolean = false, defaultValue: Option[T] = None) + +class ParseResult(optionMap: Map[String, String], remainArguments: Array[String]) { + def getInt(key: String): Int = optionMap.get(key).get.toInt + + def getString(key: String): String = optionMap.get(key).get + + def getBoolean(key: String): Boolean = optionMap.get(key).get.toBoolean + + def exists(key: String): Boolean = !(optionMap.getOrElse(key, "").isEmpty) + + def remainArgs: Array[String] = this.remainArguments +} + +/** + * Parser for command line arguments + * + * Grammar: -option1 value1 -option2 value3 -flag1 -flag2 remainArg1 remainArg2... + */ +trait ArgumentsParser { + + val ignoreUnknownArgument = false + + // scalastyle:off println + def help(): Unit = { + Console.println(s"\nHelp: $description") + var usage = List.empty[String] + options.map(kv => if (kv._2.required) { + usage = usage :+ s"-${kv._1} (required:${kv._2.required})${kv._2.description}" + } else { + usage = usage :+ s"-${kv._1} (required:${kv._2.required}, " + + s"default:${kv._2.defaultValue.getOrElse("")})${kv._2.description}" + }) + usage :+= remainArgs.map(k => s"<$k>").mkString(" ") + usage.foreach(Console.println(_)) + } + // scalastyle:on println + + def parse(args: Array[String]): ParseResult = { + val syntax = Syntax(options, remainArgs, ignoreUnknownArgument) + ArgumentsParser.parse(syntax, args) + } + + val description: String = "" + val options: Array[(String, CLIOption[Any])] = Array.empty[(String, CLIOption[Any])] + val remainArgs: Array[String] = Array.empty[String] +} + +object ArgumentsParser { + + case class Syntax( + val options: Array[(String, CLIOption[Any])], val remainArgs: Array[String], + val ignoreUnknownArgument: Boolean) + + def parse(syntax: Syntax, args: Array[String]): ParseResult = { + import syntax.{ignoreUnknownArgument, options, remainArgs} + var config = Map.empty[String, String] + var remain = Array.empty[String] + + def doParse(argument: List[String]): Unit = { + argument match { + case Nil => Unit // true if everything processed successfully + + case key :: value :: rest if key.startsWith("-") && !value.startsWith("-") => + val fixedKey = key.substring(1) + if (!options.map(_._1).contains(fixedKey)) { + if (!ignoreUnknownArgument) { + throw new Exception(s"found unknown option $fixedKey") + } else { + remain ++= Array(key, value) + } + } else { + config += fixedKey -> value + } + doParse(rest) + + case key :: rest if key.startsWith("-") => + val fixedKey = key.substring(1) + if (!options.map(_._1).contains(fixedKey)) { + throw new Exception(s"found unknown option $fixedKey") + } else { + config += fixedKey -> "true" + } + doParse(rest) + + case value :: rest => + // scalastyle:off println + Console.err.println(s"Warning: get unknown argument $value, maybe it is a main class") + // scalastyle:on println + remain ++= value :: rest + doParse(Nil) + } + } + doParse(args.toList) + + options.foreach { pair => + val (key, option) = pair + if (!config.contains(key) && !option.required) { + config += key -> option.defaultValue.getOrElse("").toString + } + } + + options.foreach { pair => + val (key, value) = pair + if (config.get(key).isEmpty) { + throw new Exception(s"Missing option ${key}...") + } + } + + if (remain.length < remainArgs.length) { + throw new Exception(s"Missing arguments ...") + } + + new ParseResult(config, remain) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/master/AppMasterLauncher.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/master/AppMasterLauncher.scala b/core/src/main/scala/org/apache/gearpump/cluster/master/AppMasterLauncher.scala new file mode 100644 index 0000000..9305d5c --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/cluster/master/AppMasterLauncher.scala @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.master + +import java.util.concurrent.{TimeUnit, TimeoutException} +import org.apache.gearpump.cluster.worker.WorkerId + +import scala.collection.JavaConverters._ +import scala.concurrent.duration.Duration +import scala.util.{Failure, Success} + +import akka.actor.{Actor, ActorRef, Props, _} +import com.typesafe.config.Config +import org.slf4j.Logger + +import org.apache.gearpump.cluster.AppMasterToMaster.RequestResource +import org.apache.gearpump.cluster.AppMasterToWorker.{LaunchExecutor, ShutdownExecutor} +import org.apache.gearpump.cluster.MasterToAppMaster.ResourceAllocated +import org.apache.gearpump.cluster.MasterToClient.SubmitApplicationResult +import org.apache.gearpump.cluster.WorkerToAppMaster.ExecutorLaunchRejected +import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, AppMasterRuntimeInfo, WorkerInfo} +import org.apache.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest} +import org.apache.gearpump.cluster.{AppDescription, AppJar, _} +import org.apache.gearpump.transport.HostPort +import org.apache.gearpump.util.ActorSystemBooter._ +import org.apache.gearpump.util.Constants._ +import org.apache.gearpump.util.{ActorSystemBooter, ActorUtil, LogUtil, Util} + +/** + * + * AppMasterLauncher is a child Actor of AppManager, it is responsible + * to launch the AppMaster on the cluster. + */ +class AppMasterLauncher( + appId: Int, executorId: Int, app: AppDescription, + jar: Option[AppJar], username: String, master: ActorRef, client: Option[ActorRef]) + extends Actor { + private val LOG: Logger = LogUtil.getLogger(getClass, app = appId) + + val scheduler = context.system.scheduler + val systemConfig = context.system.settings.config + val TIMEOUT = Duration(15, TimeUnit.SECONDS) + + val appMasterAkkaConfig: Config = app.clusterConfig + + LOG.info(s"Ask Master resource to start AppMaster $appId...") + master ! RequestResource(appId, ResourceRequest(Resource(1), WorkerId.unspecified)) + + def receive: Receive = waitForResourceAllocation + + def waitForResourceAllocation: Receive = { + case ResourceAllocated(allocations) => + + val ResourceAllocation(resource, worker, workerId) = allocations(0) + LOG.info(s"Resource allocated for appMaster $appId on worker ${workerId}(${worker.path})") + + val submissionTime = System.currentTimeMillis() + + val appMasterInfo = AppMasterRuntimeInfo(appId, app.name, worker, username, + submissionTime, config = appMasterAkkaConfig) + val workerInfo = WorkerInfo(workerId, worker) + val appMasterContext = + AppMasterContext(appId, username, resource, workerInfo, jar, null, appMasterInfo) + LOG.info(s"Try to launch a executor for AppMaster on worker ${workerId} for app $appId") + val name = ActorUtil.actorNameForExecutor(appId, executorId) + val selfPath = ActorUtil.getFullPath(context.system, self.path) + + val jvmSetting = + Util.resolveJvmSetting(appMasterAkkaConfig.withFallback(systemConfig)).appMater + val executorJVM = ExecutorJVMConfig(jvmSetting.classPath, jvmSetting.vmargs, + classOf[ActorSystemBooter].getName, Array(name, selfPath), jar, + username, appMasterAkkaConfig) + + worker ! LaunchExecutor(appId, executorId, resource, executorJVM) + context.become(waitForActorSystemToStart(worker, appMasterContext, app.userConfig, resource)) + } + + def waitForActorSystemToStart( + worker: ActorRef, appContext: AppMasterContext, user: UserConfig, resource: Resource) + : Receive = { + case ExecutorLaunchRejected(reason, ex) => + LOG.error(s"Executor Launch failed reason: $reason", ex) + LOG.info(s"reallocate resource $resource to start appmaster") + master ! RequestResource(appId, ResourceRequest(resource, WorkerId.unspecified)) + context.become(waitForResourceAllocation) + case RegisterActorSystem(systemPath) => + LOG.info(s"Received RegisterActorSystem $systemPath for AppMaster") + sender ! ActorSystemRegistered(worker) + + val masterAddress = systemConfig.getStringList(GEARPUMP_CLUSTER_MASTERS) + .asScala.map(HostPort(_)).map(ActorUtil.getMasterActorPath) + sender ! CreateActor( + AppMasterRuntimeEnvironment.props(masterAddress, app, appContext), s"appdaemon$appId") + + import context.dispatcher + val appMasterTimeout = scheduler.scheduleOnce(TIMEOUT, self, + CreateActorFailed(app.appMaster, new TimeoutException)) + context.become(waitForAppMasterToStart(worker, appMasterTimeout)) + } + + def waitForAppMasterToStart(worker: ActorRef, cancel: Cancellable): Receive = { + case ActorCreated(appMaster, _) => + cancel.cancel() + sender ! BindLifeCycle(appMaster) + LOG.info(s"AppMaster is created, mission complete...") + replyToClient(SubmitApplicationResult(Success(appId))) + context.stop(self) + case CreateActorFailed(name, reason) => + cancel.cancel() + worker ! ShutdownExecutor(appId, executorId, reason.getMessage) + replyToClient(SubmitApplicationResult(Failure(reason))) + context.stop(self) + } + + def replyToClient(result: SubmitApplicationResult): Unit = { + if (client.isDefined) { + client.get.tell(result, master) + } + } +} + +object AppMasterLauncher extends AppMasterLauncherFactory { + def props(appId: Int, executorId: Int, app: AppDescription, jar: Option[AppJar], + username: String, master: ActorRef, client: Option[ActorRef]): Props = { + Props(new AppMasterLauncher(appId, executorId, app, jar, username, master, client)) + } +} + +trait AppMasterLauncherFactory { + def props(appId: Int, executorId: Int, app: AppDescription, jar: Option[AppJar], + username: String, master: ActorRef, client: Option[ActorRef]): Props +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/master/MasterProxy.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/master/MasterProxy.scala b/core/src/main/scala/org/apache/gearpump/cluster/master/MasterProxy.scala new file mode 100644 index 0000000..e2ee00e --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/cluster/master/MasterProxy.scala @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.master + +import scala.concurrent.duration.FiniteDuration + +import akka.actor._ +import org.slf4j.Logger + +import org.apache.gearpump.transport.HostPort +import org.apache.gearpump.util.{ActorUtil, LogUtil} + +/** + * This works with Master HA. When there are multiple Master nodes, + * This will find a active one. + */ +class MasterProxy(masters: Iterable[ActorPath], timeout: FiniteDuration) + extends Actor with Stash { + import org.apache.gearpump.cluster.master.MasterProxy._ + + val LOG: Logger = LogUtil.getLogger(getClass, name = self.path.name) + + val contacts = masters.map { url => + LOG.info(s"Contacts point URL: $url") + context.actorSelection(url) + } + + var watchers: List[ActorRef] = List.empty[ActorRef] + + import context.dispatcher + + def findMaster(): Cancellable = { + repeatActionUtil(timeout) { + contacts foreach { contact => + LOG.info(s"sending identity to $contact") + contact ! Identify(None) + } + } + } + + context.become(establishing(findMaster())) + + LOG.info("Master Proxy is started...") + + override def postStop(): Unit = { + watchers.foreach(_ ! MasterStopped) + super.postStop() + } + + override def receive: Receive = { + case _ => + } + + def establishing(findMaster: Cancellable): Actor.Receive = { + case ActorIdentity(_, Some(receptionist)) => + context watch receptionist + LOG.info("Connected to [{}]", receptionist.path) + context.watch(receptionist) + + watchers.foreach(_ ! MasterRestarted) + unstashAll() + findMaster.cancel() + context.become(active(receptionist) orElse messageHandler(receptionist)) + case ActorIdentity(_, None) => // ok, use another instead + case msg => + LOG.info(s"Stashing ${msg.getClass.getSimpleName}") + stash() + } + + def active(receptionist: ActorRef): Actor.Receive = { + case Terminated(receptionist) => + LOG.info("Lost contact with [{}], restablishing connection", receptionist) + context.become(establishing(findMaster)) + case _: ActorIdentity => // ok, from previous establish, already handled + case WatchMaster(watcher) => + watchers = watchers :+ watcher + } + + def messageHandler(master: ActorRef): Receive = { + case msg => + LOG.debug(s"Get msg ${msg.getClass.getSimpleName}, forwarding to ${master.path}") + master forward msg + } + + def scheduler: Scheduler = context.system.scheduler + import scala.concurrent.duration._ + private def repeatActionUtil(timeout: FiniteDuration)(action: => Unit): Cancellable = { + val send = scheduler.schedule(0.seconds, 2.seconds)(action) + val suicide = scheduler.scheduleOnce(timeout) { + send.cancel() + self ! PoisonPill + } + + new Cancellable { + def cancel(): Boolean = { + val result1 = send.cancel() + val result2 = suicide.cancel() + result1 && result2 + } + + def isCancelled: Boolean = { + send.isCancelled && suicide.isCancelled + } + } + } +} + +object MasterProxy { + case object MasterRestarted + case object MasterStopped + case class WatchMaster(watcher: ActorRef) + + import scala.concurrent.duration._ + def props(masters: Iterable[HostPort], duration: FiniteDuration = 30.seconds): Props = { + val contacts = masters.map(ActorUtil.getMasterActorPath(_)) + Props(new MasterProxy(contacts, duration)) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/master/MasterSummary.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/master/MasterSummary.scala b/core/src/main/scala/org/apache/gearpump/cluster/master/MasterSummary.scala new file mode 100644 index 0000000..d182b22 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/cluster/master/MasterSummary.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.master + +import org.apache.gearpump.util.HistoryMetricsService.HistoryMetricsConfig + +/** Master status. Synced means all masters are live and synced. */ +object MasterStatus { + type Type = String + val Synced = "synced" + val UnSynced = "unsynced" +} + +case class MasterNode(host: String, port: Int) { + def toTuple: (String, Int) = { + (host, port) + } +} + +/** + * Master information returned for REST API call + */ +case class MasterSummary( + leader: MasterNode, + cluster: List[MasterNode], + aliveFor: Long, + logFile: String, + jarStore: String, + masterStatus: MasterStatus.Type, + homeDirectory: String, + activities: List[MasterActivity], + jvmName: String, + historyMetricsConfig: HistoryMetricsConfig = null) + +case class MasterActivity(time: Long, event: String) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Resource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Resource.scala b/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Resource.scala new file mode 100644 index 0000000..c21d396 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Resource.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.scheduler + +import akka.actor.ActorRef +import org.apache.gearpump.cluster.worker.WorkerId + +case class Resource(slots: Int) { + + // scalastyle:off spaces.after.plus + def +(other: Resource): Resource = Resource(slots + other.slots) + // scalastyle:on spaces.after.plus + + def -(other: Resource): Resource = Resource(slots - other.slots) + + def >(other: Resource): Boolean = slots > other.slots + + def >=(other: Resource): Boolean = !(this < other) + + def <(other: Resource): Boolean = slots < other.slots + + def <=(other: Resource): Boolean = !(this > other) + + def isEmpty: Boolean = { + slots == 0 + } +} + +/** + * Each streaming job can have a priority, the job with higher priority + * will get scheduled resource earlier than those with lower priority. + */ +object Priority extends Enumeration { + type Priority = Value + val LOW, NORMAL, HIGH = Value +} + +/** + * Relaxation.ONEWORKER means only resource (slot) from that worker will be accepted by + * the requestor application job. + */ +object Relaxation extends Enumeration { + type Relaxation = Value + + // Option ONEWORKER allow user to schedule a task on specific worker. + val ANY, ONEWORKER, SPECIFICWORKER = Value +} + +import org.apache.gearpump.cluster.scheduler.Priority._ +import org.apache.gearpump.cluster.scheduler.Relaxation._ + +case class ResourceRequest( + resource: Resource, workerId: WorkerId, priority: Priority = NORMAL, + relaxation: Relaxation = ANY, executorNum: Int = 1) + +case class ResourceAllocation(resource: Resource, worker: ActorRef, workerId: WorkerId) + +object Resource { + def empty: Resource = new Resource(0) + + def min(res1: Resource, res2: Resource): Resource = if (res1.slots < res2.slots) res1 else res2 +} + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/worker/ExecutorProcessLauncher.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/worker/ExecutorProcessLauncher.scala b/core/src/main/scala/org/apache/gearpump/cluster/worker/ExecutorProcessLauncher.scala new file mode 100644 index 0000000..58f7dd8 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/cluster/worker/ExecutorProcessLauncher.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.worker + +import com.typesafe.config.Config + +import org.apache.gearpump.cluster.scheduler.Resource +import org.apache.gearpump.util.RichProcess + +/** + * ExecutorProcessLauncher is used to launch a process for Executor using given parameters. + * + * User can implement this interface to decide the behavior of launching a process. + * Set "gearpump.worker.executor-process-launcher" to your implemented class name. + */ +trait ExecutorProcessLauncher { + val config: Config + + /** + * This function launches a process for Executor using given parameters. + * + * @param appId The appId of the executor to be launched + * @param executorId The executorId of the executor to be launched + * @param resource The resource allocated for that executor + * @param options The command options + * @param classPath The classpath of the process + * @param mainClass The main class of the process + * @param arguments The rest arguments + */ + def createProcess( + appId: Int, executorId: Int, resource: Resource, config: Config, options: Array[String], + classPath: Array[String], mainClass: String, arguments: Array[String]): RichProcess + + /** + * This function will clean resources for a launched process. + * @param appId The appId of the launched executor + * @param executorId The executorId of launched executor + */ + def cleanProcess(appId: Int, executorId: Int): Unit +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/worker/WorkerId.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/worker/WorkerId.scala b/core/src/main/scala/org/apache/gearpump/cluster/worker/WorkerId.scala new file mode 100644 index 0000000..93213e4 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/cluster/worker/WorkerId.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.worker + +/** + * WorkerId is used to uniquely track a worker machine. + * + * @param sessionId sessionId is assigned by Master node for easy tracking. It is possible that + * sessionId is **NOT** unique, so always use WorkerId for comparison. + * @param registerTime the timestamp when a worker node register itself to master node + */ +case class WorkerId(sessionId: Int, registerTime: Long) + +object WorkerId { + val unspecified: WorkerId = new WorkerId(-1, 0L) + + def render(workerId: WorkerId): String = { + workerId.registerTime + "_" + workerId.sessionId + } + + def parse(str: String): WorkerId = { + val pair = str.split("_") + new WorkerId(pair(1).toInt, pair(0).toLong) + } + + implicit val workerIdOrdering: Ordering[WorkerId] = { + new Ordering[WorkerId] { + + /** Compare timestamp first, then id */ + override def compare(x: WorkerId, y: WorkerId): Int = { + if (x.registerTime < y.registerTime) { + -1 + } else if (x.registerTime == y.registerTime) { + if (x.sessionId < y.sessionId) { + -1 + } else if (x.sessionId == y.sessionId) { + 0 + } else { + 1 + } + } else { + 1 + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/cluster/worker/WorkerSummary.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/worker/WorkerSummary.scala b/core/src/main/scala/org/apache/gearpump/cluster/worker/WorkerSummary.scala new file mode 100644 index 0000000..f4c0587 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/cluster/worker/WorkerSummary.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.worker +import org.apache.gearpump.util.HistoryMetricsService.HistoryMetricsConfig + +/** + * Worker summary information for REST API. + */ +case class WorkerSummary( + workerId: WorkerId, + state: String, + actorPath: String, + aliveFor: Long, + logFile: String, + executors: Array[ExecutorSlots], + totalSlots: Int, + availableSlots: Int, + homeDirectory: String, + jvmName: String, + // Id used to uniquely identity this worker process in low level resource manager like YARN. + resourceManagerContainerId: String, + historyMetricsConfig: HistoryMetricsConfig = null) + +object WorkerSummary { + def empty: WorkerSummary = { + WorkerSummary(WorkerId.unspecified, "", "", 0L, "", + Array.empty[ExecutorSlots], 0, 0, "", jvmName = "", resourceManagerContainerId = "") + } +} + +case class ExecutorSlots(appId: Int, executorId: Int, slots: Int) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/jarstore/JarStoreService.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/jarstore/JarStoreService.scala b/core/src/main/scala/org/apache/gearpump/jarstore/JarStoreService.scala new file mode 100644 index 0000000..0ba9558 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/jarstore/JarStoreService.scala @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.jarstore + +import java.io.File +import java.net.URI +import java.util.ServiceLoader +import scala.collection.JavaConverters._ + +import akka.actor.ActorSystem +import com.typesafe.config.Config + +import org.apache.gearpump.util.{Constants, Util} + +case class FilePath(path: String) + +/** + * JarStoreService is used to manage the upload/download of binary files, + * like user submitted application jar. + */ +trait JarStoreService { + /** + * The scheme of the JarStoreService. + * Like "hdfs" for HDFS file system, and "file" for a local + * file system. + */ + val scheme: String + + /** + * Init the Jar Store. + */ + def init(config: Config, system: ActorSystem) + + /** + * This function will copy the local file to the remote JarStore, called from client side. + * @param localFile The local file + */ + def copyFromLocal(localFile: File): FilePath + + /** + * This function will copy the remote file to local file system, called from client side. + * + * @param localFile The destination of file path + * @param remotePath The remote file path from JarStore + */ + def copyToLocalFile(localFile: File, remotePath: FilePath) +} + +object JarStoreService { + + /** + * Get a active JarStoreService by specifying a scheme. + * + * Please see config [[org.apache.gearpump.util.Constants#GEARPUMP_APP_JAR_STORE_ROOT_PATH]] for + * more information. + */ + def get(config: Config): JarStoreService = { + val jarStoreRootPath = config.getString(Constants.GEARPUMP_APP_JAR_STORE_ROOT_PATH) + get(jarStoreRootPath) + } + + private lazy val jarstoreServices: List[JarStoreService] = { + ServiceLoader.load(classOf[JarStoreService]).asScala.toList + } + + private def get(rootPath: String): JarStoreService = { + val scheme = new URI(Util.resolvePath(rootPath)).getScheme + jarstoreServices.find(_.scheme == scheme).get + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/metrics/AkkaReporter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/metrics/AkkaReporter.scala b/core/src/main/scala/org/apache/gearpump/metrics/AkkaReporter.scala new file mode 100644 index 0000000..7010048 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/metrics/AkkaReporter.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.metrics + +import scala.collection.JavaConverters._ + +import akka.actor.{ActorRef, ActorSystem} + +import io.gearpump.codahale.metrics.{Gauge => CodaGauge, MetricRegistry} +import org.apache.gearpump.metrics.Metrics.{Counter => CounterData, Gauge => GaugeData, Histogram => HistogramData, Meter => MeterData} +import org.apache.gearpump.metrics.MetricsReporterService.ReportTo +import org.apache.gearpump.util.LogUtil + +/** + * A reporter class for logging metrics values to a remote actor periodically + */ +class AkkaReporter( + system: ActorSystem, + registry: MetricRegistry) + extends ReportTo { + private val LOG = LogUtil.getLogger(getClass) + LOG.info("Start Metrics AkkaReporter") + + override def report(to: ActorRef): Unit = { + val counters = registry.getCounters() + val histograms = registry.getHistograms() + val meters = registry.getMeters() + val gauges = registry.getGauges() + + counters.entrySet().asScala.foreach { pair => + to ! CounterData(pair.getKey, pair.getValue.getCount) + } + + histograms.entrySet().asScala.foreach { pair => + val key = pair.getKey + val value = pair.getValue + val s = value.getSnapshot + to ! HistogramData( + key, s.getMean, s.getStdDev, s.getMedian, + s.get95thPercentile, s.get99thPercentile, s.get999thPercentile) + } + + meters.entrySet().asScala.foreach { pair => + val key = pair.getKey + val value = pair.getValue + to ! MeterData(key, + value.getCount, + value.getMeanRate, + value.getOneMinuteRate, + getRateUnit) + } + + gauges.entrySet().asScala.foreach { kv => + val value = kv.getValue.asInstanceOf[CodaGauge[Number]].getValue.longValue() + to ! GaugeData(kv.getKey, value) + } + } + + private def getRateUnit: String = { + "events/s" + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/metrics/Counter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/metrics/Counter.scala b/core/src/main/scala/org/apache/gearpump/metrics/Counter.scala new file mode 100644 index 0000000..a6ce799 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/metrics/Counter.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.metrics + +import io.gearpump.codahale.metrics.{Counter => CodaHaleCounter} + +/** + * @see io.gearpump.codahale.metrics.Counter + */ +class Counter(val name: String, counter: CodaHaleCounter, sampleRate: Int = 1) { + private var sampleCount = 0L + private var toBeIncremented = 0L + + def inc() { + inc(1) + } + + def inc(n: Long) { + toBeIncremented += n + sampleCount += 1 + if (null != counter && sampleCount % sampleRate == 0) { + counter.inc(toBeIncremented) + toBeIncremented = 0 + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/metrics/Histogram.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/metrics/Histogram.scala b/core/src/main/scala/org/apache/gearpump/metrics/Histogram.scala new file mode 100644 index 0000000..7d7e19b --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/metrics/Histogram.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.metrics + +import io.gearpump.codahale.metrics.{Histogram => CodaHaleHistogram} + +/** + * @see io.gearpump.codahale.metrics.Histogram + */ +class Histogram(val name: String, histogram: CodaHaleHistogram, sampleRate: Int = 1) { + private var sampleCount = 0L + + def update(value: Long) { + sampleCount += 1 + if (null != histogram && sampleCount % sampleRate == 0) { + histogram.update(value) + } + } + + def getMean(): Double = { + histogram.getSnapshot.getMean + } + + def getStdDev(): Double = { + histogram.getSnapshot.getStdDev + } +} \ No newline at end of file
