http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/appmaster/ApplicationState.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/appmaster/ApplicationState.scala b/core/src/main/scala/io/gearpump/cluster/appmaster/ApplicationState.scala deleted file mode 100644 index 3b967f4..0000000 --- a/core/src/main/scala/io/gearpump/cluster/appmaster/ApplicationState.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.cluster.appmaster - -import io.gearpump.cluster.{AppDescription, AppJar} - -/** - * This state for single application, it is be distributed across the masters. - */ -case class ApplicationState( - appId: Int, appName: String, attemptId: Int, app: AppDescription, jar: Option[AppJar], - username: String, state: Any) extends Serializable { - - override def equals(other: Any): Boolean = { - other match { - case that: ApplicationState => - if (appId == that.appId && attemptId == that.attemptId) { - true - } else { - false - } - case _ => - false - } - } - - override def hashCode: Int = { - import akka.routing.MurmurHash._ - extendHash(appId, attemptId, startMagicA, startMagicB) - } -}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystem.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystem.scala b/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystem.scala deleted file mode 100644 index 6fcb5e7..0000000 --- a/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystem.scala +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.cluster.appmaster - -import akka.actor.{ActorRef, Address, PoisonPill} - -import io.gearpump.cluster.scheduler.Resource -import io.gearpump.cluster.worker.WorkerId -import io.gearpump.util.ActorSystemBooter.BindLifeCycle - -case class WorkerInfo(workerId: WorkerId, ref: ActorRef) - -/** - * Configurations to start an executor system on remote machine - * - * @param address Remote address where we start an Actor System. - */ -case class ExecutorSystem(executorSystemId: Int, address: Address, daemon: - ActorRef, resource: Resource, worker: WorkerInfo) { - def bindLifeCycleWith(actor: ActorRef): Unit = { - daemon ! BindLifeCycle(actor) - } - - def shutdown(): Unit = { - daemon ! PoisonPill - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncher.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncher.scala b/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncher.scala deleted file mode 100644 index 78432f4..0000000 --- a/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncher.scala +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.cluster.appmaster - -import scala.concurrent.duration._ - -import akka.actor._ -import org.slf4j.Logger - -import io.gearpump.cluster.AppMasterToWorker.LaunchExecutor -import io.gearpump.cluster.ExecutorJVMConfig -import io.gearpump.cluster.WorkerToAppMaster._ -import io.gearpump.cluster.appmaster.ExecutorSystemLauncher._ -import io.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSystemJvmConfig, Session} -import io.gearpump.cluster.scheduler.Resource -import io.gearpump.util.ActorSystemBooter.{ActorSystemRegistered, RegisterActorSystem} -import io.gearpump.util.{ActorSystemBooter, ActorUtil, Constants, LogUtil} - -/** - * This launches single executor system on target worker. - * - * Please use ExecutorSystemLauncher.props() to construct this actor - * - * @param session The session that request to launch executor system - */ -private[appmaster] -class ExecutorSystemLauncher(appId: Int, session: Session) extends Actor { - - private val LOG: Logger = LogUtil.getLogger(getClass) - - val scheduler = context.system.scheduler - implicit val executionContext = context.dispatcher - - private val systemConfig = context.system.settings.config - val timeoutSetting = systemConfig.getInt(Constants.GEARPUMP_START_EXECUTOR_SYSTEM_TIMEOUT_MS) - - val timeout = scheduler.scheduleOnce(timeoutSetting.milliseconds, - self, LaunchExecutorSystemTimeout(session)) - - def receive: Receive = waitForLaunchCommand - - def waitForLaunchCommand: Receive = { - case LaunchExecutorSystem(worker, executorSystemId, resource) => - val launcherPath = ActorUtil.getFullPath(context.system, self.path) - val jvmConfig = Option(session.executorSystemJvmConfig) - .map(getExecutorJvmConfig(_, s"app${appId}system${executorSystemId}", launcherPath)).orNull - - val launch = LaunchExecutor(appId, executorSystemId, resource, jvmConfig) - LOG.info(s"Launching Executor ...appId: $appId, executorSystemId: $executorSystemId, " + - s"slots: ${resource.slots} on worker $worker") - - worker.ref ! launch - context.become(waitForActorSystemToStart(sender, launch, worker, executorSystemId)) - } - - def waitForActorSystemToStart( - replyTo: ActorRef, launch: LaunchExecutor, worker: WorkerInfo, executorSystemId: Int) - : Receive = { - case RegisterActorSystem(systemPath) => - import launch._ - timeout.cancel() - LOG.info(s"Received RegisterActorSystem $systemPath for session ${session.requestor}") - sender ! ActorSystemRegistered(worker.ref) - val system = - ExecutorSystem(executorId, AddressFromURIString(systemPath), sender, resource, worker) - replyTo ! LaunchExecutorSystemSuccess(system, session) - context.stop(self) - case reject@ExecutorLaunchRejected(reason, ex) => - LOG.error(s"Executor Launch ${launch.resource} failed reason: $reason", ex) - replyTo ! LaunchExecutorSystemRejected(launch.resource, reason, session) - context.stop(self) - case timeout: LaunchExecutorSystemTimeout => - LOG.error(s"The Executor ActorSystem $executorSystemId has not been started in time") - replyTo ! timeout - context.stop(self) - } -} - -private[appmaster] -object ExecutorSystemLauncher { - - case class LaunchExecutorSystem(worker: WorkerInfo, systemId: Int, resource: Resource) - - case class LaunchExecutorSystemSuccess(system: ExecutorSystem, session: Session) - - case class LaunchExecutorSystemRejected(resource: Resource, reason: Any, session: Session) - - case class LaunchExecutorSystemTimeout(session: Session) - - private def getExecutorJvmConfig(conf: ExecutorSystemJvmConfig, systemName: String, - reportBack: String): ExecutorJVMConfig = { - Option(conf).map { conf => - import conf._ - ExecutorJVMConfig(classPath, jvmArguments, classOf[ActorSystemBooter].getName, - Array(systemName, reportBack), jar, username, executorAkkaConfig) - }.getOrElse(null) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala b/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala deleted file mode 100644 index c5ec600..0000000 --- a/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala +++ /dev/null @@ -1,190 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.cluster.appmaster - -import scala.concurrent.duration._ - -import akka.actor._ -import com.typesafe.config.Config - -import io.gearpump.cluster.AppMasterToMaster.RequestResource -import io.gearpump.cluster.MasterToAppMaster.ResourceAllocated -import io.gearpump.cluster._ -import io.gearpump.cluster.appmaster.ExecutorSystemLauncher._ -import io.gearpump.cluster.appmaster.ExecutorSystemScheduler._ -import io.gearpump.cluster.scheduler.{ResourceAllocation, ResourceRequest} -import io.gearpump.cluster.worker.WorkerId -import io.gearpump.util.{Constants, LogUtil} - -/** - * ExecutorSystem is also a type of resource, this class schedules ExecutorSystem for AppMaster. - * AppMaster can use this class to directly request a live executor actor systems. The communication - * in the background with Master and Worker is hidden from AppMaster. - * - * Please use ExecutorSystemScheduler.props() to construct this actor - */ -private[appmaster] -class ExecutorSystemScheduler(appId: Int, masterProxy: ActorRef, - executorSystemLauncher: (Int, Session) => Props) extends Actor { - - private val LOG = LogUtil.getLogger(getClass, app = appId) - implicit val timeout = Constants.FUTURE_TIMEOUT - implicit val actorSystem = context.system - var currentSystemId = 0 - - var resourceAgents = Map.empty[Session, ActorRef] - - def receive: Receive = { - clientCommands orElse resourceAllocationMessageHandler orElse executorSystemMessageHandler - } - - def clientCommands: Receive = { - case start: StartExecutorSystems => - LOG.info(s"starting executor systems (ExecutorSystemConfig(${start.executorSystemConfig}), " + - s"Resources(${start.resources.mkString(",")}))") - val requestor = sender() - val executorSystemConfig = start.executorSystemConfig - val session = Session(requestor, executorSystemConfig) - val agent = resourceAgents.getOrElse(session, - context.actorOf(Props(new ResourceAgent(masterProxy, session)))) - resourceAgents = resourceAgents + (session -> agent) - - start.resources.foreach { resource => - agent ! RequestResource(appId, resource) - } - - case StopExecutorSystem(executorSystem) => - executorSystem.shutdown - } - - def resourceAllocationMessageHandler: Receive = { - case ResourceAllocatedForSession(allocations, session) => - if (isSessionAlive(session)) { - allocations.foreach { resourceAllocation => - val ResourceAllocation(resource, worker, workerId) = resourceAllocation - - val launcher = context.actorOf(executorSystemLauncher(appId, session)) - launcher ! LaunchExecutorSystem(WorkerInfo(workerId, worker), currentSystemId, resource) - currentSystemId = currentSystemId + 1 - } - } - case ResourceAllocationTimeOut(session) => - if (isSessionAlive(session)) { - resourceAgents = resourceAgents - session - session.requestor ! StartExecutorSystemTimeout - } - } - - def executorSystemMessageHandler: Receive = { - case LaunchExecutorSystemSuccess(system, session) => - if (isSessionAlive(session)) { - LOG.info("LaunchExecutorSystemSuccess, send back to " + session.requestor) - system.bindLifeCycleWith(self) - session.requestor ! ExecutorSystemStarted(system, session.executorSystemJvmConfig.jar) - } else { - LOG.error("We get a ExecutorSystem back, but resource requestor is no longer valid. " + - "Will shutdown the allocated system") - system.shutdown - } - case LaunchExecutorSystemTimeout(session) => - if (isSessionAlive(session)) { - LOG.error(s"Failed to launch executor system for ${session.requestor} due to timeout") - session.requestor ! StartExecutorSystemTimeout - } - - case LaunchExecutorSystemRejected(resource, reason, session) => - if (isSessionAlive(session)) { - LOG.error(s"Failed to launch executor system, due to $reason, " + - s"will ask master to allocate new resources $resource") - resourceAgents.get(session).map { resourceAgent: ActorRef => - resourceAgent ! RequestResource(appId, ResourceRequest(resource, WorkerId.unspecified)) - } - } - } - - private def isSessionAlive(session: Session): Boolean = { - Option(session).flatMap(session => resourceAgents.get(session)).nonEmpty - } -} - -object ExecutorSystemScheduler { - - case class StartExecutorSystems( - resources: Array[ResourceRequest], executorSystemConfig: ExecutorSystemJvmConfig) - - case class ExecutorSystemStarted(system: ExecutorSystem, boundedJar: Option[AppJar]) - - case class StopExecutorSystem(system: ExecutorSystem) - - case object StartExecutorSystemTimeout - - case class ExecutorSystemJvmConfig(classPath: Array[String], jvmArguments: Array[String], - jar: Option[AppJar], username: String, executorAkkaConfig: Config = null) - - /** - * For each client which ask for an executor system, the scheduler will create a session for it. - * - */ - private[appmaster] - case class Session(requestor: ActorRef, executorSystemJvmConfig: ExecutorSystemJvmConfig) - - /** - * This is a agent for session to request resource - * - * @param session the original requester of the resource requests - */ - private[appmaster] - class ResourceAgent(master: ActorRef, session: Session) extends Actor { - private var resourceRequestor: ActorRef = null - var timeOutClock: Cancellable = null - private var unallocatedResource: Int = 0 - - import context.dispatcher - - import io.gearpump.util.Constants._ - - val timeout = context.system.settings.config.getInt(GEARPUMP_RESOURCE_ALLOCATION_TIMEOUT) - - def receive: Receive = { - case request: RequestResource => - unallocatedResource += request.request.resource.slots - Option(timeOutClock).map(_.cancel) - timeOutClock = context.system.scheduler.scheduleOnce( - timeout.seconds, self, ResourceAllocationTimeOut(session)) - resourceRequestor = sender - master ! request - case ResourceAllocated(allocations) => - unallocatedResource -= allocations.map(_.resource.slots).sum - resourceRequestor forward ResourceAllocatedForSession(allocations, session) - case timeout: ResourceAllocationTimeOut => - if (unallocatedResource > 0) { - resourceRequestor ! ResourceAllocationTimeOut(session) - // We will not receive any ResourceAllocation after timeout - context.stop(self) - } - } - } - - private[ExecutorSystemScheduler] - case class ResourceAllocatedForSession(resource: Array[ResourceAllocation], session: Session) - - private[ExecutorSystemScheduler] - case class ResourceAllocationTimeOut(session: Session) - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/appmaster/MasterConnectionKeeper.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/appmaster/MasterConnectionKeeper.scala b/core/src/main/scala/io/gearpump/cluster/appmaster/MasterConnectionKeeper.scala deleted file mode 100644 index f8c8503..0000000 --- a/core/src/main/scala/io/gearpump/cluster/appmaster/MasterConnectionKeeper.scala +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.cluster.appmaster - -import java.util.concurrent.TimeUnit -import scala.concurrent.duration.FiniteDuration - -import akka.actor._ - -import io.gearpump.cluster.AppMasterToMaster.RegisterAppMaster -import io.gearpump.cluster.MasterToAppMaster.AppMasterRegistered -import io.gearpump.cluster.appmaster.MasterConnectionKeeper.AppMasterRegisterTimeout -import io.gearpump.cluster.appmaster.MasterConnectionKeeper.MasterConnectionStatus.{MasterConnected, MasterStopped} -import io.gearpump.cluster.master.MasterProxy.{MasterRestarted, WatchMaster} -import io.gearpump.util.LogUtil - -/** - * Watches the liveness of Master. - * - * When Master is restarted, it sends RegisterAppMaster to the new Master instance. - * If Master is stopped, it sends the MasterConnectionStatus to listener - * - * please use MasterConnectionKeeper.props() to construct this actor - */ -private[appmaster] -class MasterConnectionKeeper( - register: RegisterAppMaster, masterProxy: ActorRef, masterStatusListener: ActorRef) - extends Actor { - - import context.dispatcher - - private val LOG = LogUtil.getLogger(getClass) - private var master: ActorRef = null - - // Subscribe self to masterProxy, - masterProxy ! WatchMaster(self) - - def registerAppMaster: Cancellable = { - masterProxy ! register - context.system.scheduler.scheduleOnce(FiniteDuration(30, TimeUnit.SECONDS), - self, AppMasterRegisterTimeout) - } - - context.become(waitMasterToConfirm(registerAppMaster)) - - def waitMasterToConfirm(cancelRegister: Cancellable): Receive = { - case AppMasterRegistered(appId) => - cancelRegister.cancel() - masterStatusListener ! MasterConnected - context.become(masterLivenessListener) - case AppMasterRegisterTimeout => - cancelRegister.cancel() - masterStatusListener ! MasterStopped - context.stop(self) - } - - def masterLivenessListener: Receive = { - case MasterRestarted => - LOG.info("Master restarted, re-registering appmaster....") - context.become(waitMasterToConfirm(registerAppMaster)) - case MasterStopped => - LOG.info("Master is dead, killing this AppMaster....") - masterStatusListener ! MasterStopped - context.stop(self) - } - - def receive: Receive = null -} - -private[appmaster] object MasterConnectionKeeper { - - case object AppMasterRegisterTimeout - - object MasterConnectionStatus { - - case object MasterConnected - - case object MasterStopped - - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/client/ClientContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/client/ClientContext.scala b/core/src/main/scala/io/gearpump/cluster/client/ClientContext.scala deleted file mode 100644 index 41c01d8..0000000 --- a/core/src/main/scala/io/gearpump/cluster/client/ClientContext.scala +++ /dev/null @@ -1,185 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.cluster.client - -import java.util.concurrent.TimeUnit -import scala.collection.JavaConverters._ -import scala.concurrent.duration.Duration -import scala.concurrent.{Await, Future} -import scala.util.Try - -import akka.actor.{ActorRef, ActorSystem} -import akka.util.Timeout -import com.typesafe.config.{Config, ConfigValueFactory} -import org.slf4j.Logger - -import io.gearpump.cluster.MasterToAppMaster.{AppMastersData, ReplayFromTimestampWindowTrailingEdge} -import io.gearpump.cluster.MasterToClient.ReplayApplicationResult -import io.gearpump.cluster._ -import io.gearpump.cluster.master.MasterProxy -import io.gearpump.jarstore.JarStoreService -import io.gearpump.util.Constants._ -import io.gearpump.util.{ActorUtil, Constants, LogUtil, Util} - -/** - * ClientContext is a user facing util to submit/manage an application. - * - * TODO: add interface to query master here - */ -class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) { - - def this(system: ActorSystem) = { - this(system.settings.config, system, null) - } - - def this(config: Config) = { - this(config, null, null) - } - - private val LOG: Logger = LogUtil.getLogger(getClass) - private implicit val timeout = Timeout(5, TimeUnit.SECONDS) - - implicit val system = Option(sys).getOrElse(ActorSystem(s"client${Util.randInt()}", config)) - LOG.info(s"Starting system ${system.name}") - val shouldCleanupSystem = Option(sys).isEmpty - - private val jarStoreService = JarStoreService.get(config) - jarStoreService.init(config, system) - - private lazy val master: ActorRef = { - val masters = config.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).asScala - .flatMap(Util.parseHostList) - val master = Option(_master).getOrElse(system.actorOf(MasterProxy.props(masters), - s"masterproxy${system.name}")) - LOG.info(s"Creating master proxy ${master} for master list: $masters") - master - } - - /** - * Submits an application with default jar setting. Use java property "gearpump.app.jar" if - * defined. Otherwise, it assumes the jar is on the target runtime classpath, thus will - * not send the jar across the wire. - */ - def submit(app: Application): Int = { - submit(app, System.getProperty(GEARPUMP_APP_JAR)) - } - - def submit(app: Application, jar: String): Int = { - submit(app, jar, getExecutorNum()) - } - - def submit(app: Application, jar: String, executorNum: Int): Int = { - val client = getMasterClient - val appName = checkAndAddNamePrefix(app.name, System.getProperty(GEARPUMP_APP_NAME_PREFIX)) - val submissionConfig = getSubmissionConfig(config) - .withValue(APPLICATION_EXECUTOR_NUMBER, ConfigValueFactory.fromAnyRef(executorNum)) - val appDescription = - AppDescription(appName, app.appMaster.getName, app.userConfig, submissionConfig) - val appJar = Option(jar).map(loadFile) - client.submitApplication(appDescription, appJar) - } - - private def getExecutorNum(): Int = { - Try(System.getProperty(APPLICATION_EXECUTOR_NUMBER).toInt).getOrElse(1) - } - - private def getSubmissionConfig(config: Config): Config = { - ClusterConfig.filterOutDefaultConfig(config) - } - - def replayFromTimestampWindowTrailingEdge(appId: Int): ReplayApplicationResult = { - import scala.concurrent.ExecutionContext.Implicits.global - val result = Await.result( - ActorUtil.askAppMaster[ReplayApplicationResult](master, - appId, ReplayFromTimestampWindowTrailingEdge(appId)), Duration.Inf) - result - } - - def askAppMaster[T](appId: Int, msg: Any): Future[T] = { - import scala.concurrent.ExecutionContext.Implicits.global - ActorUtil.askAppMaster[T](master, appId, msg) - } - - def listApps: AppMastersData = { - val client = getMasterClient - client.listApplications - } - - def shutdown(appId: Int): Unit = { - val client = getMasterClient - client.shutdownApplication(appId) - } - - def resolveAppID(appId: Int): ActorRef = { - val client = getMasterClient - client.resolveAppId(appId) - } - - def close(): Unit = { - if (shouldCleanupSystem) { - LOG.info(s"Shutting down system ${system.name}") - system.terminate() - } - } - - private def loadFile(jarPath: String): AppJar = { - val jarFile = new java.io.File(jarPath) - val path = jarStoreService.copyFromLocal(jarFile) - AppJar(jarFile.getName, path) - } - - private def checkAndAddNamePrefix(appName: String, namePrefix: String): String = { - val fullName = if (namePrefix != null && namePrefix != "") { - namePrefix + "_" + appName - } else { - appName - } - if (!Util.validApplicationName(fullName)) { - close() - val error = s"The application name $appName is not a proper name. An app name can " + - "be a sequence of letters, numbers or underscore character \"_\"" - throw new Exception(error) - } - fullName - } - - private def getMasterClient: MasterClient = { - val timeout = Try(config.getInt(Constants.GEARPUMP_MASTERCLIENT_TIMEOUT)).getOrElse(90) - new MasterClient(master, akka.util.Timeout(timeout, TimeUnit.SECONDS)) - } -} - -object ClientContext { - - def apply(): ClientContext = new ClientContext(ClusterConfig.default(), null, null) - - def apply(system: ActorSystem): ClientContext = { - new ClientContext(ClusterConfig.default(), system, null) - } - - def apply(system: ActorSystem, master: ActorRef): ClientContext = { - new ClientContext(ClusterConfig.default(), system, master) - } - - def apply(config: Config): ClientContext = new ClientContext(config, null, null) - - def apply(config: Config, system: ActorSystem, master: ActorRef): ClientContext = { - new ClientContext(config, system, master) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/client/MasterClient.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/client/MasterClient.scala b/core/src/main/scala/io/gearpump/cluster/client/MasterClient.scala deleted file mode 100644 index 9edaf46..0000000 --- a/core/src/main/scala/io/gearpump/cluster/client/MasterClient.scala +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.cluster.client - -import scala.concurrent.duration.Duration -import scala.concurrent.{Await, Future} -import scala.util.{Failure, Success} - -import akka.actor.ActorRef -import akka.pattern.ask -import akka.util.Timeout - -import io.gearpump.cluster.ClientToMaster._ -import io.gearpump.cluster.MasterToAppMaster.{AppMastersData, AppMastersDataRequest} -import io.gearpump.cluster.MasterToClient.{ResolveAppIdResult, ShutdownApplicationResult, SubmitApplicationResult} -import io.gearpump.cluster.{AppDescription, AppJar} - -/** - * Client to inter-operate with Master node. - * - * NOTE: Stateless, thread safe - */ -class MasterClient(master: ActorRef, timeout: Timeout) { - implicit val masterClientTimeout = timeout - - def submitApplication(app: AppDescription, appJar: Option[AppJar]): Int = { - val result = Await.result( - (master ? SubmitApplication(app, appJar)).asInstanceOf[Future[SubmitApplicationResult]], - Duration.Inf) - val appId = result.appId match { - case Success(appId) => - // scalastyle:off println - Console.println(s"Submit application succeed. The application id is $appId") - // scalastyle:on println - appId - case Failure(ex) => throw ex - } - appId - } - - def resolveAppId(appId: Int): ActorRef = { - val result = Await.result( - (master ? ResolveAppId(appId)).asInstanceOf[Future[ResolveAppIdResult]], Duration.Inf) - result.appMaster match { - case Success(appMaster) => appMaster - case Failure(ex) => throw ex - } - } - - def shutdownApplication(appId: Int): Unit = { - val result = Await.result( - (master ? ShutdownApplication(appId)).asInstanceOf[Future[ShutdownApplicationResult]], - Duration.Inf) - result.appId match { - case Success(_) => - case Failure(ex) => throw ex - } - } - - def listApplications: AppMastersData = { - val result = Await.result( - (master ? AppMastersDataRequest).asInstanceOf[Future[AppMastersData]], Duration.Inf) - result - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/main/ArgumentsParser.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/main/ArgumentsParser.scala b/core/src/main/scala/io/gearpump/cluster/main/ArgumentsParser.scala deleted file mode 100644 index 209f831..0000000 --- a/core/src/main/scala/io/gearpump/cluster/main/ArgumentsParser.scala +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.cluster.main - -import io.gearpump.cluster.main.ArgumentsParser.Syntax - -case class CLIOption[+T]( - description: String = "", required: Boolean = false, defaultValue: Option[T] = None) - -class ParseResult(optionMap: Map[String, String], remainArguments: Array[String]) { - def getInt(key: String): Int = optionMap.get(key).get.toInt - - def getString(key: String): String = optionMap.get(key).get - - def getBoolean(key: String): Boolean = optionMap.get(key).get.toBoolean - - def exists(key: String): Boolean = !(optionMap.getOrElse(key, "").isEmpty) - - def remainArgs: Array[String] = this.remainArguments -} - -/** - * Parser for command line arguments - * - * Grammar: -option1 value1 -option2 value3 -flag1 -flag2 remainArg1 remainArg2... - */ -trait ArgumentsParser { - - val ignoreUnknownArgument = false - - // scalastyle:off println - def help(): Unit = { - Console.println(s"\nHelp: $description") - var usage = List.empty[String] - options.map(kv => if (kv._2.required) { - usage = usage :+ s"-${kv._1} (required:${kv._2.required})${kv._2.description}" - } else { - usage = usage :+ s"-${kv._1} (required:${kv._2.required}, " + - s"default:${kv._2.defaultValue.getOrElse("")})${kv._2.description}" - }) - usage :+= remainArgs.map(k => s"<$k>").mkString(" ") - usage.foreach(Console.println(_)) - } - // scalastyle:on println - - def parse(args: Array[String]): ParseResult = { - val syntax = Syntax(options, remainArgs, ignoreUnknownArgument) - ArgumentsParser.parse(syntax, args) - } - - val description: String = "" - val options: Array[(String, CLIOption[Any])] = Array.empty[(String, CLIOption[Any])] - val remainArgs: Array[String] = Array.empty[String] -} - -object ArgumentsParser { - - case class Syntax( - val options: Array[(String, CLIOption[Any])], val remainArgs: Array[String], - val ignoreUnknownArgument: Boolean) - - def parse(syntax: Syntax, args: Array[String]): ParseResult = { - import syntax.{ignoreUnknownArgument, options, remainArgs} - var config = Map.empty[String, String] - var remain = Array.empty[String] - - def doParse(argument: List[String]): Unit = { - argument match { - case Nil => Unit // true if everything processed successfully - - case key :: value :: rest if key.startsWith("-") && !value.startsWith("-") => - val fixedKey = key.substring(1) - if (!options.map(_._1).contains(fixedKey)) { - if (!ignoreUnknownArgument) { - throw new Exception(s"found unknown option $fixedKey") - } else { - remain ++= Array(key, value) - } - } else { - config += fixedKey -> value - } - doParse(rest) - - case key :: rest if key.startsWith("-") => - val fixedKey = key.substring(1) - if (!options.map(_._1).contains(fixedKey)) { - throw new Exception(s"found unknown option $fixedKey") - } else { - config += fixedKey -> "true" - } - doParse(rest) - - case value :: rest => - // scalastyle:off println - Console.err.println(s"Warning: get unknown argument $value, maybe it is a main class") - // scalastyle:on println - remain ++= value :: rest - doParse(Nil) - } - } - doParse(args.toList) - - options.foreach { pair => - val (key, option) = pair - if (!config.contains(key) && !option.required) { - config += key -> option.defaultValue.getOrElse("").toString - } - } - - options.foreach { pair => - val (key, value) = pair - if (config.get(key).isEmpty) { - throw new Exception(s"Missing option ${key}...") - } - } - - if (remain.length < remainArgs.length) { - throw new Exception(s"Missing arguments ...") - } - - new ParseResult(config, remain) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/master/AppMasterLauncher.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/master/AppMasterLauncher.scala b/core/src/main/scala/io/gearpump/cluster/master/AppMasterLauncher.scala deleted file mode 100644 index fb3e5c4..0000000 --- a/core/src/main/scala/io/gearpump/cluster/master/AppMasterLauncher.scala +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.cluster.master - -import java.util.concurrent.{TimeUnit, TimeoutException} -import scala.collection.JavaConverters._ -import scala.concurrent.duration.Duration -import scala.util.{Failure, Success} - -import akka.actor.{Actor, ActorRef, Props, _} -import com.typesafe.config.Config -import org.slf4j.Logger - -import io.gearpump.cluster.AppMasterToMaster.RequestResource -import io.gearpump.cluster.AppMasterToWorker.{LaunchExecutor, ShutdownExecutor} -import io.gearpump.cluster.MasterToAppMaster.ResourceAllocated -import io.gearpump.cluster.MasterToClient.SubmitApplicationResult -import io.gearpump.cluster.WorkerToAppMaster.ExecutorLaunchRejected -import io.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, AppMasterRuntimeInfo, WorkerInfo} -import io.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest} -import io.gearpump.cluster.worker.WorkerId -import io.gearpump.cluster.{AppDescription, AppJar, _} -import io.gearpump.transport.HostPort -import io.gearpump.util.ActorSystemBooter._ -import io.gearpump.util.Constants._ -import io.gearpump.util.{ActorSystemBooter, ActorUtil, LogUtil, Util} - -/** - * - * AppMasterLauncher is a child Actor of AppManager, it is responsible - * to launch the AppMaster on the cluster. - */ -class AppMasterLauncher( - appId: Int, executorId: Int, app: AppDescription, - jar: Option[AppJar], username: String, master: ActorRef, client: Option[ActorRef]) - extends Actor { - private val LOG: Logger = LogUtil.getLogger(getClass, app = appId) - - val scheduler = context.system.scheduler - val systemConfig = context.system.settings.config - val TIMEOUT = Duration(15, TimeUnit.SECONDS) - - val appMasterAkkaConfig: Config = app.clusterConfig - - LOG.info(s"Ask Master resource to start AppMaster $appId...") - master ! RequestResource(appId, ResourceRequest(Resource(1), WorkerId.unspecified)) - - def receive: Receive = waitForResourceAllocation - - def waitForResourceAllocation: Receive = { - case ResourceAllocated(allocations) => - - val ResourceAllocation(resource, worker, workerId) = allocations(0) - LOG.info(s"Resource allocated for appMaster $appId on worker ${workerId}(${worker.path})") - - val submissionTime = System.currentTimeMillis() - - val appMasterInfo = AppMasterRuntimeInfo(appId, app.name, worker, username, - submissionTime, config = appMasterAkkaConfig) - val workerInfo = WorkerInfo(workerId, worker) - val appMasterContext = - AppMasterContext(appId, username, resource, workerInfo, jar, null, appMasterInfo) - LOG.info(s"Try to launch a executor for AppMaster on worker ${workerId} for app $appId") - val name = ActorUtil.actorNameForExecutor(appId, executorId) - val selfPath = ActorUtil.getFullPath(context.system, self.path) - - val jvmSetting = - Util.resolveJvmSetting(appMasterAkkaConfig.withFallback(systemConfig)).appMater - val executorJVM = ExecutorJVMConfig(jvmSetting.classPath, jvmSetting.vmargs, - classOf[ActorSystemBooter].getName, Array(name, selfPath), jar, - username, appMasterAkkaConfig) - - worker ! LaunchExecutor(appId, executorId, resource, executorJVM) - context.become(waitForActorSystemToStart(worker, appMasterContext, app.userConfig, resource)) - } - - def waitForActorSystemToStart( - worker: ActorRef, appContext: AppMasterContext, user: UserConfig, resource: Resource) - : Receive = { - case ExecutorLaunchRejected(reason, ex) => - LOG.error(s"Executor Launch failed reason: $reason", ex) - LOG.info(s"reallocate resource $resource to start appmaster") - master ! RequestResource(appId, ResourceRequest(resource, WorkerId.unspecified)) - context.become(waitForResourceAllocation) - case RegisterActorSystem(systemPath) => - LOG.info(s"Received RegisterActorSystem $systemPath for AppMaster") - sender ! ActorSystemRegistered(worker) - - val masterAddress = systemConfig.getStringList(GEARPUMP_CLUSTER_MASTERS) - .asScala.map(HostPort(_)).map(ActorUtil.getMasterActorPath) - sender ! CreateActor( - AppMasterRuntimeEnvironment.props(masterAddress, app, appContext), s"appdaemon$appId") - - import context.dispatcher - val appMasterTimeout = scheduler.scheduleOnce(TIMEOUT, self, - CreateActorFailed(app.appMaster, new TimeoutException)) - context.become(waitForAppMasterToStart(worker, appMasterTimeout)) - } - - def waitForAppMasterToStart(worker: ActorRef, cancel: Cancellable): Receive = { - case ActorCreated(appMaster, _) => - cancel.cancel() - sender ! BindLifeCycle(appMaster) - LOG.info(s"AppMaster is created, mission complete...") - replyToClient(SubmitApplicationResult(Success(appId))) - context.stop(self) - case CreateActorFailed(name, reason) => - cancel.cancel() - worker ! ShutdownExecutor(appId, executorId, reason.getMessage) - replyToClient(SubmitApplicationResult(Failure(reason))) - context.stop(self) - } - - def replyToClient(result: SubmitApplicationResult): Unit = { - if (client.isDefined) { - client.get.tell(result, master) - } - } -} - -object AppMasterLauncher extends AppMasterLauncherFactory { - def props(appId: Int, executorId: Int, app: AppDescription, jar: Option[AppJar], - username: String, master: ActorRef, client: Option[ActorRef]): Props = { - Props(new AppMasterLauncher(appId, executorId, app, jar, username, master, client)) - } -} - -trait AppMasterLauncherFactory { - def props(appId: Int, executorId: Int, app: AppDescription, jar: Option[AppJar], - username: String, master: ActorRef, client: Option[ActorRef]): Props -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/master/MasterProxy.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/master/MasterProxy.scala b/core/src/main/scala/io/gearpump/cluster/master/MasterProxy.scala deleted file mode 100644 index 61d95dc..0000000 --- a/core/src/main/scala/io/gearpump/cluster/master/MasterProxy.scala +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.cluster.master - -import scala.concurrent.duration.FiniteDuration - -import akka.actor._ -import org.slf4j.Logger - -import io.gearpump.transport.HostPort -import io.gearpump.util.{ActorUtil, LogUtil} - -/** - * This works with Master HA. When there are multiple Master nodes, - * This will find a active one. - */ -class MasterProxy(masters: Iterable[ActorPath], timeout: FiniteDuration) - extends Actor with Stash { - import io.gearpump.cluster.master.MasterProxy._ - - val LOG: Logger = LogUtil.getLogger(getClass, name = self.path.name) - - val contacts = masters.map { url => - LOG.info(s"Contacts point URL: $url") - context.actorSelection(url) - } - - var watchers: List[ActorRef] = List.empty[ActorRef] - - import context.dispatcher - - def findMaster(): Cancellable = { - repeatActionUtil(timeout) { - contacts foreach { contact => - LOG.info(s"sending identity to $contact") - contact ! Identify(None) - } - } - } - - context.become(establishing(findMaster())) - - LOG.info("Master Proxy is started...") - - override def postStop(): Unit = { - watchers.foreach(_ ! MasterStopped) - super.postStop() - } - - override def receive: Receive = { - case _ => - } - - def establishing(findMaster: Cancellable): Actor.Receive = { - case ActorIdentity(_, Some(receptionist)) => - context watch receptionist - LOG.info("Connected to [{}]", receptionist.path) - context.watch(receptionist) - - watchers.foreach(_ ! MasterRestarted) - unstashAll() - findMaster.cancel() - context.become(active(receptionist) orElse messageHandler(receptionist)) - case ActorIdentity(_, None) => // ok, use another instead - case msg => - LOG.info(s"Stashing ${msg.getClass.getSimpleName}") - stash() - } - - def active(receptionist: ActorRef): Actor.Receive = { - case Terminated(receptionist) => - LOG.info("Lost contact with [{}], restablishing connection", receptionist) - context.become(establishing(findMaster)) - case _: ActorIdentity => // ok, from previous establish, already handled - case WatchMaster(watcher) => - watchers = watchers :+ watcher - } - - def messageHandler(master: ActorRef): Receive = { - case msg => - LOG.debug(s"Get msg ${msg.getClass.getSimpleName}, forwarding to ${master.path}") - master forward msg - } - - def scheduler: Scheduler = context.system.scheduler - import scala.concurrent.duration._ - private def repeatActionUtil(timeout: FiniteDuration)(action: => Unit): Cancellable = { - val send = scheduler.schedule(0.seconds, 2.seconds)(action) - val suicide = scheduler.scheduleOnce(timeout) { - send.cancel() - self ! PoisonPill - } - - new Cancellable { - def cancel(): Boolean = { - val result1 = send.cancel() - val result2 = suicide.cancel() - result1 && result2 - } - - def isCancelled: Boolean = { - send.isCancelled && suicide.isCancelled - } - } - } -} - -object MasterProxy { - case object MasterRestarted - case object MasterStopped - case class WatchMaster(watcher: ActorRef) - - import scala.concurrent.duration._ - def props(masters: Iterable[HostPort], duration: FiniteDuration = 30.seconds): Props = { - val contacts = masters.map(ActorUtil.getMasterActorPath(_)) - Props(new MasterProxy(contacts, duration)) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/master/MasterSummary.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/master/MasterSummary.scala b/core/src/main/scala/io/gearpump/cluster/master/MasterSummary.scala deleted file mode 100644 index 0996381..0000000 --- a/core/src/main/scala/io/gearpump/cluster/master/MasterSummary.scala +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.cluster.master - -import io.gearpump.util.HistoryMetricsService.HistoryMetricsConfig - -/** Master status. Synced means all masters are live and synced. */ -object MasterStatus { - type Type = String - val Synced = "synced" - val UnSynced = "unsynced" -} - -case class MasterNode(host: String, port: Int) { - def toTuple: (String, Int) = { - (host, port) - } -} - -/** - * Master information returned for REST API call - */ -case class MasterSummary( - leader: MasterNode, - cluster: List[MasterNode], - aliveFor: Long, - logFile: String, - jarStore: String, - masterStatus: MasterStatus.Type, - homeDirectory: String, - activities: List[MasterActivity], - jvmName: String, - historyMetricsConfig: HistoryMetricsConfig = null) - -case class MasterActivity(time: Long, event: String) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala b/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala deleted file mode 100644 index b25162e..0000000 --- a/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.cluster.scheduler - -import akka.actor.ActorRef - -import io.gearpump.cluster.worker.WorkerId - -case class Resource(slots: Int) { - - // scalastyle:off spaces.after.plus - def +(other: Resource): Resource = Resource(slots + other.slots) - // scalastyle:on spaces.after.plus - - def -(other: Resource): Resource = Resource(slots - other.slots) - - def >(other: Resource): Boolean = slots > other.slots - - def >=(other: Resource): Boolean = !(this < other) - - def <(other: Resource): Boolean = slots < other.slots - - def <=(other: Resource): Boolean = !(this > other) - - def isEmpty: Boolean = { - slots == 0 - } -} - -/** - * Each streaming job can have a priority, the job with higher priority - * will get scheduled resource earlier than those with lower priority. - */ -object Priority extends Enumeration { - type Priority = Value - val LOW, NORMAL, HIGH = Value -} - -/** - * Relaxation.ONEWORKER means only resource (slot) from that worker will be accepted by - * the requestor application job. - */ -object Relaxation extends Enumeration { - type Relaxation = Value - - // Option ONEWORKER allow user to schedule a task on specific worker. - val ANY, ONEWORKER, SPECIFICWORKER = Value -} - -import io.gearpump.cluster.scheduler.Priority._ -import io.gearpump.cluster.scheduler.Relaxation._ - -case class ResourceRequest( - resource: Resource, workerId: WorkerId, priority: Priority = NORMAL, - relaxation: Relaxation = ANY, executorNum: Int = 1) - -case class ResourceAllocation(resource: Resource, worker: ActorRef, workerId: WorkerId) - -object Resource { - def empty: Resource = new Resource(0) - - def min(res1: Resource, res2: Resource): Resource = if (res1.slots < res2.slots) res1 else res2 -} - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/worker/ExecutorProcessLauncher.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/worker/ExecutorProcessLauncher.scala b/core/src/main/scala/io/gearpump/cluster/worker/ExecutorProcessLauncher.scala deleted file mode 100644 index 8581467..0000000 --- a/core/src/main/scala/io/gearpump/cluster/worker/ExecutorProcessLauncher.scala +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.cluster.worker - -import com.typesafe.config.Config - -import io.gearpump.cluster.scheduler.Resource -import io.gearpump.util.RichProcess - -/** - * ExecutorProcessLauncher is used to launch a process for Executor using given parameters. - * - * User can implement this interface to decide the behavior of launching a process. - * Set "gearpump.worker.executor-process-launcher" to your implemented class name. - */ -trait ExecutorProcessLauncher { - val config: Config - - /** - * This function launches a process for Executor using given parameters. - * - * @param appId The appId of the executor to be launched - * @param executorId The executorId of the executor to be launched - * @param resource The resource allocated for that executor - * @param options The command options - * @param classPath The classpath of the process - * @param mainClass The main class of the process - * @param arguments The rest arguments - */ - def createProcess( - appId: Int, executorId: Int, resource: Resource, config: Config, options: Array[String], - classPath: Array[String], mainClass: String, arguments: Array[String]): RichProcess - - /** - * This function will clean resources for a launched process. - * @param appId The appId of the launched executor - * @param executorId The executorId of launched executor - */ - def cleanProcess(appId: Int, executorId: Int): Unit -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/worker/WorkerId.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/worker/WorkerId.scala b/core/src/main/scala/io/gearpump/cluster/worker/WorkerId.scala deleted file mode 100644 index 24c6ad2..0000000 --- a/core/src/main/scala/io/gearpump/cluster/worker/WorkerId.scala +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.cluster.worker - -/** - * WorkerId is used to uniquely track a worker machine. - * - * @param sessionId sessionId is assigned by Master node for easy tracking. It is possible that - * sessionId is **NOT** unique, so always use WorkerId for comparison. - * @param registerTime the timestamp when a worker node register itself to master node - */ -case class WorkerId(sessionId: Int, registerTime: Long) - -object WorkerId { - val unspecified: WorkerId = new WorkerId(-1, 0L) - - def render(workerId: WorkerId): String = { - workerId.registerTime + "_" + workerId.sessionId - } - - def parse(str: String): WorkerId = { - val pair = str.split("_") - new WorkerId(pair(1).toInt, pair(0).toLong) - } - - implicit val workerIdOrdering: Ordering[WorkerId] = { - new Ordering[WorkerId] { - - /** Compare timestamp first, then id */ - override def compare(x: WorkerId, y: WorkerId): Int = { - if (x.registerTime < y.registerTime) { - -1 - } else if (x.registerTime == y.registerTime) { - if (x.sessionId < y.sessionId) { - -1 - } else if (x.sessionId == y.sessionId) { - 0 - } else { - 1 - } - } else { - 1 - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/cluster/worker/WorkerSummary.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/worker/WorkerSummary.scala b/core/src/main/scala/io/gearpump/cluster/worker/WorkerSummary.scala deleted file mode 100644 index cdf2d03..0000000 --- a/core/src/main/scala/io/gearpump/cluster/worker/WorkerSummary.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.cluster.worker -import io.gearpump.util.HistoryMetricsService.HistoryMetricsConfig - -/** - * Worker summary information for REST API. - */ -case class WorkerSummary( - workerId: WorkerId, - state: String, - actorPath: String, - aliveFor: Long, - logFile: String, - executors: Array[ExecutorSlots], - totalSlots: Int, - availableSlots: Int, - homeDirectory: String, - jvmName: String, - // Id used to uniquely identity this worker process in low level resource manager like YARN. - resourceManagerContainerId: String, - historyMetricsConfig: HistoryMetricsConfig = null) - -object WorkerSummary { - def empty: WorkerSummary = { - WorkerSummary(WorkerId.unspecified, "", "", 0L, "", - Array.empty[ExecutorSlots], 0, 0, "", jvmName = "", resourceManagerContainerId = "") - } -} - -case class ExecutorSlots(appId: Int, executorId: Int, slots: Int) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/jarstore/JarStoreService.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/jarstore/JarStoreService.scala b/core/src/main/scala/io/gearpump/jarstore/JarStoreService.scala deleted file mode 100644 index 54d5431..0000000 --- a/core/src/main/scala/io/gearpump/jarstore/JarStoreService.scala +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.jarstore - -import java.io.File -import java.net.URI -import java.util.ServiceLoader -import scala.collection.JavaConverters._ - -import akka.actor.ActorSystem -import com.typesafe.config.Config - -import io.gearpump.util.{Constants, Util} - -case class FilePath(path: String) - -/** - * JarStoreService is used to manage the upload/download of binary files, - * like user submitted application jar. - */ -trait JarStoreService { - /** - * The scheme of the JarStoreService. - * Like "hdfs" for HDFS file system, and "file" for a local - * file system. - */ - val scheme: String - - /** - * Init the Jar Store. - */ - def init(config: Config, system: ActorSystem) - - /** - * This function will copy the local file to the remote JarStore, called from client side. - * @param localFile The local file - */ - def copyFromLocal(localFile: File): FilePath - - /** - * This function will copy the remote file to local file system, called from client side. - * - * @param localFile The destination of file path - * @param remotePath The remote file path from JarStore - */ - def copyToLocalFile(localFile: File, remotePath: FilePath) -} - -object JarStoreService { - - /** - * Get a active JarStoreService by specifying a scheme. - * - * Please see config [[io.gearpump.util.Constants#GEARPUMP_APP_JAR_STORE_ROOT_PATH]] for more - * information. - */ - def get(config: Config): JarStoreService = { - val jarStoreRootPath = config.getString(Constants.GEARPUMP_APP_JAR_STORE_ROOT_PATH) - get(jarStoreRootPath) - } - - private lazy val jarstoreServices: List[JarStoreService] = { - ServiceLoader.load(classOf[JarStoreService]).asScala.toList - } - - private def get(rootPath: String): JarStoreService = { - val scheme = new URI(Util.resolvePath(rootPath)).getScheme - jarstoreServices.find(_.scheme == scheme).get - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/metrics/AkkaReporter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/metrics/AkkaReporter.scala b/core/src/main/scala/io/gearpump/metrics/AkkaReporter.scala deleted file mode 100644 index 3a581fb..0000000 --- a/core/src/main/scala/io/gearpump/metrics/AkkaReporter.scala +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.metrics - -import scala.collection.JavaConverters._ - -import akka.actor.{ActorRef, ActorSystem} - -import io.gearpump.codahale.metrics.{Gauge => CodaGauge, MetricRegistry} -import io.gearpump.metrics.Metrics.{Counter => CounterData, Gauge => GaugeData, Histogram => HistogramData, Meter => MeterData} -import io.gearpump.metrics.MetricsReporterService.ReportTo -import io.gearpump.util.LogUtil - -/** - * A reporter class for logging metrics values to a remote actor periodically - */ -class AkkaReporter( - system: ActorSystem, - registry: MetricRegistry) - extends ReportTo { - private val LOG = LogUtil.getLogger(getClass) - LOG.info("Start Metrics AkkaReporter") - - override def report(to: ActorRef): Unit = { - val counters = registry.getCounters() - val histograms = registry.getHistograms() - val meters = registry.getMeters() - val gauges = registry.getGauges() - - counters.entrySet().asScala.foreach { pair => - to ! CounterData(pair.getKey, pair.getValue.getCount) - } - - histograms.entrySet().asScala.foreach { pair => - val key = pair.getKey - val value = pair.getValue - val s = value.getSnapshot - to ! HistogramData( - key, s.getMean, s.getStdDev, s.getMedian, - s.get95thPercentile, s.get99thPercentile, s.get999thPercentile) - } - - meters.entrySet().asScala.foreach { pair => - val key = pair.getKey - val value = pair.getValue - to ! MeterData(key, - value.getCount, - value.getMeanRate, - value.getOneMinuteRate, - getRateUnit) - } - - gauges.entrySet().asScala.foreach { kv => - val value = kv.getValue.asInstanceOf[CodaGauge[Number]].getValue.longValue() - to ! GaugeData(kv.getKey, value) - } - } - - private def getRateUnit: String = { - "events/s" - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/metrics/Counter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/metrics/Counter.scala b/core/src/main/scala/io/gearpump/metrics/Counter.scala deleted file mode 100644 index 70c7bae..0000000 --- a/core/src/main/scala/io/gearpump/metrics/Counter.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.metrics - -import io.gearpump.codahale.metrics.{Counter => CodaHaleCounter} - -/** - * @see io.gearpump.codahale.metrics.Counter - */ -class Counter(val name: String, counter: CodaHaleCounter, sampleRate: Int = 1) { - private var sampleCount = 0L - private var toBeIncremented = 0L - - def inc() { - inc(1) - } - - def inc(n: Long) { - toBeIncremented += n - sampleCount += 1 - if (null != counter && sampleCount % sampleRate == 0) { - counter.inc(toBeIncremented) - toBeIncremented = 0 - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/metrics/Histogram.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/metrics/Histogram.scala b/core/src/main/scala/io/gearpump/metrics/Histogram.scala deleted file mode 100644 index 4673050..0000000 --- a/core/src/main/scala/io/gearpump/metrics/Histogram.scala +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.metrics - -import io.gearpump.codahale.metrics.{Histogram => CodaHaleHistogram} - -/** - * @see io.gearpump.codahale.metrics.Histogram - */ -class Histogram(val name: String, histogram: CodaHaleHistogram, sampleRate: Int = 1) { - private var sampleCount = 0L - - def update(value: Long) { - sampleCount += 1 - if (null != histogram && sampleCount % sampleRate == 0) { - histogram.update(value) - } - } - - def getMean(): Double = { - histogram.getSnapshot.getMean - } - - def getStdDev(): Double = { - histogram.getSnapshot.getStdDev - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/metrics/JvmMetricsSet.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/metrics/JvmMetricsSet.scala b/core/src/main/scala/io/gearpump/metrics/JvmMetricsSet.scala deleted file mode 100644 index 28d420a..0000000 --- a/core/src/main/scala/io/gearpump/metrics/JvmMetricsSet.scala +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.metrics - -import java.util -import scala.collection.JavaConverters._ - -import io.gearpump.codahale.metrics.jvm.{MemoryUsageGaugeSet, ThreadStatesGaugeSet} -import io.gearpump.codahale.metrics.{Metric, MetricSet} - -class JvmMetricsSet(name: String) extends MetricSet { - - override def getMetrics: util.Map[String, Metric] = { - val memoryMetrics = new MemoryUsageGaugeSet().getMetrics.asScala - val threadMetrics = new ThreadStatesGaugeSet().getMetrics.asScala - Map( - s"$name:memory.total.used" -> memoryMetrics("total.used"), - s"$name:memory.total.committed" -> memoryMetrics("total.committed"), - s"$name:memory.total.max" -> memoryMetrics("total.max"), - s"$name:memory.heap.used" -> memoryMetrics("heap.used"), - s"$name:memory.heap.committed" -> memoryMetrics("heap.committed"), - s"$name:memory.heap.max" -> memoryMetrics("heap.max"), - s"$name:thread.count" -> threadMetrics("count"), - s"$name:thread.daemon.count" -> threadMetrics("daemon.count") - ).asJava - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/metrics/Meter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/metrics/Meter.scala b/core/src/main/scala/io/gearpump/metrics/Meter.scala deleted file mode 100644 index ca79a37..0000000 --- a/core/src/main/scala/io/gearpump/metrics/Meter.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.metrics - -import io.gearpump.codahale.metrics.{Meter => CodaHaleMeter} - -/** See io.gearpump.codahale.metrics.Meter */ -class Meter(val name: String, meter: CodaHaleMeter, sampleRate: Int = 1) { - private var sampleCount = 0L - private var toBeMarked = 0L - - def mark() { - meter.mark(1) - } - - def mark(n: Long) { - toBeMarked += n - sampleCount += 1 - if (null != meter && sampleCount % sampleRate == 0) { - meter.mark(toBeMarked) - toBeMarked = 0 - } - } - - def getOneMinuteRate(): Double = { - meter.getOneMinuteRate - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/metrics/Metrics.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/metrics/Metrics.scala b/core/src/main/scala/io/gearpump/metrics/Metrics.scala deleted file mode 100644 index aad1af0..0000000 --- a/core/src/main/scala/io/gearpump/metrics/Metrics.scala +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.metrics - -import scala.collection.JavaConverters._ - -import akka.actor._ -import org.slf4j.Logger - -import io.gearpump.codahale.metrics._ -import io.gearpump.metrics -import io.gearpump.util.LogUtil - -/** Metric objects registry */ -class Metrics(sampleRate: Int) extends Extension { - - val registry = new MetricRegistry() - - def meter(name: String): metrics.Meter = { - new metrics.Meter(name, registry.meter(name), sampleRate) - } - - def histogram(name: String): Histogram = { - new Histogram(name, registry.histogram(name), sampleRate) - } - - def histogram(name: String, sampleRate: Int): Histogram = { - new Histogram(name, registry.histogram(name), sampleRate) - } - - def counter(name: String): Counter = { - new Counter(name, registry.counter(name), sampleRate) - } - - def register(set: MetricSet): Unit = { - val names = registry.getNames - val metrics = set.getMetrics.asScala.filterKeys { key => !names.contains(key) } - metrics.foreach { kv => - registry.register(kv._1, kv._2) - } - } -} - -object Metrics extends ExtensionId[Metrics] with ExtensionIdProvider { - - val LOG: Logger = LogUtil.getLogger(getClass) - import io.gearpump.util.Constants._ - - sealed trait MetricType { - def name: String - } - - object MetricType { - def unapply(obj: MetricType): Option[(Histogram, Counter, Meter, Timer, Gauge)] = { - obj match { - case x: Histogram => Some((x, null, null, null, null)) - case x: Counter => Some((null, x, null, null, null)) - case x: Meter => Some((null, null, x, null, null)) - case x: Timer => Some((null, null, null, x, null)) - case g: Gauge => Some((null, null, null, null, g)) - } - } - - def apply(h: Histogram, c: Counter, m: Meter, t: Timer, g: Gauge): MetricType = { - val result = - if (h != null) h - else if (c != null) c - else if (m != null) m - else if (t != null) t - else if (g != null) g - else null - result - } - } - - case class Histogram ( - name: String, mean: Double, - stddev: Double, median: Double, - p95: Double, p99: Double, p999: Double) - extends MetricType - - case class Counter(name: String, value: Long) extends MetricType - - case class Meter( - name: String, count: Long, meanRate: Double, - m1: Double, rateUnit: String) - extends MetricType - - case class Timer( - name: String, count: Long, min: Double, max: Double, - mean: Double, stddev: Double, median: Double, - p75: Double, p95: Double, p98: Double, - p99: Double, p999: Double, meanRate: Double, - m1: Double, m5: Double, m15: Double, - rateUnit: String, durationUnit: String) - extends MetricType - - case class Gauge(name: String, value: Long) extends MetricType - - case object ReportMetrics - - case class DemandMoreMetrics(subscriber: ActorRef) - - override def get(system: ActorSystem): Metrics = super.get(system) - - override def lookup: ExtensionId[Metrics] = Metrics - - override def createExtension(system: ExtendedActorSystem): Metrics = { - val metricsEnabled = system.settings.config.getBoolean(GEARPUMP_METRIC_ENABLED) - LOG.info(s"Metrics is enabled..., $metricsEnabled") - val sampleRate = system.settings.config.getInt(GEARPUMP_METRIC_SAMPLE_RATE) - if (metricsEnabled) { - val meters = new Metrics(sampleRate) - meters - } else { - new DummyMetrics - } - } - - class DummyMetrics extends Metrics(1) { - override def register(set: MetricSet): Unit = Unit - - private val meter = new metrics.Meter("", null) { - override def mark(): Unit = Unit - override def mark(n: Long): Unit = Unit - override def getOneMinuteRate(): Double = 0 - } - - private val histogram = new metrics.Histogram("", null) { - override def update(value: Long): Unit = Unit - override def getMean(): Double = 0 - override def getStdDev(): Double = 0 - } - - private val counter = new metrics.Counter("", null) { - override def inc(): Unit = Unit - override def inc(n: Long): Unit = Unit - } - - override def meter(name: String): metrics.Meter = meter - override def histogram(name: String): metrics.Histogram = histogram - override def counter(name: String): metrics.Counter = counter - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/metrics/MetricsAggregator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/metrics/MetricsAggregator.scala b/core/src/main/scala/io/gearpump/metrics/MetricsAggregator.scala deleted file mode 100644 index f52a060..0000000 --- a/core/src/main/scala/io/gearpump/metrics/MetricsAggregator.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.metrics - -import io.gearpump.cluster.MasterToClient.HistoryMetricsItem - -/** - * Aggregates a larger set of metrics into a smaller set - * - * Sub Class must implement a constructor with signature like this: - * MetricsAggregator(config: Config) - */ -trait MetricsAggregator { - def aggregate(options: Map[String, String], inputs: Iterator[HistoryMetricsItem]) - : List[HistoryMetricsItem] -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/io/gearpump/metrics/MetricsReporterService.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/metrics/MetricsReporterService.scala b/core/src/main/scala/io/gearpump/metrics/MetricsReporterService.scala deleted file mode 100644 index 05decdd..0000000 --- a/core/src/main/scala/io/gearpump/metrics/MetricsReporterService.scala +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.metrics - -import java.net.InetSocketAddress -import java.util.concurrent.TimeUnit -import scala.concurrent.duration._ - -import akka.actor.{Actor, ActorRef} - -import io.gearpump.codahale.metrics.graphite.{Graphite, GraphiteReporter} -import io.gearpump.codahale.metrics.{MetricFilter, Slf4jReporter} -import io.gearpump.metrics.Metrics.{DemandMoreMetrics, ReportMetrics} -import io.gearpump.metrics.MetricsReporterService.ReportTo -import io.gearpump.util.Constants._ -import io.gearpump.util.LogUtil - -/** - * Reports the metrics data to some where, like Ganglia, remote Akka actor, log files... - * - * @param metrics Holds a list of metrics object. - */ -class MetricsReporterService(metrics: Metrics) extends Actor { - - private val LOG = LogUtil.getLogger(getClass) - private implicit val system = context.system - - private val reportInterval = system.settings.config.getInt(GEARPUMP_METRIC_REPORT_INTERVAL) - private val reporter = getReporter - implicit val dispatcher = context.dispatcher - - def receive: Receive = { - // The subscriber is demanding more messages. - case DemandMoreMetrics(subscriber) => { - reporter.report(subscriber) - context.system.scheduler.scheduleOnce(reportInterval.milliseconds, - subscriber, ReportMetrics) - } - } - - def startGraphiteReporter(): ReportTo = { - val graphiteHost = system.settings.config.getString(GEARPUMP_METRIC_GRAPHITE_HOST) - val graphitePort = system.settings.config.getInt(GEARPUMP_METRIC_GRAPHITE_PORT) - - val graphite = new Graphite(new InetSocketAddress(graphiteHost, graphitePort)) - LOG.info(s"reporting to $graphiteHost, $graphitePort") - new ReportTo { - private val reporter = GraphiteReporter.forRegistry(metrics.registry) - .convertRatesTo(TimeUnit.SECONDS) - .convertDurationsTo(TimeUnit.MILLISECONDS) - .filter(MetricFilter.ALL) - .build(graphite) - - override def report(to: ActorRef): Unit = reporter.report() - } - } - - def startSlf4jReporter(): ReportTo = { - new ReportTo { - val reporter = Slf4jReporter.forRegistry(metrics.registry) - .convertRatesTo(TimeUnit.SECONDS) - .convertDurationsTo(TimeUnit.MILLISECONDS) - .filter(MetricFilter.ALL) - .outputTo(LOG) - .build() - - override def report(to: ActorRef): Unit = reporter.report() - } - } - - def startAkkaReporter(): ReportTo = { - new AkkaReporter(system, metrics.registry) - } - - def getReporter: ReportTo = { - val reporterType = system.settings.config.getString(GEARPUMP_METRIC_REPORTER) - LOG.info(s"Metrics reporter is enabled, using $reporterType reporter") - val reporter = reporterType match { - case "graphite" => startGraphiteReporter() - case "logfile" => startSlf4jReporter() - case "akka" => startAkkaReporter() - } - reporter - } -} - -object MetricsReporterService { - - /** Target where user want to report the metrics data to */ - trait ReportTo { - def report(to: ActorRef): Unit - } -} \ No newline at end of file
