http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala deleted file mode 100644 index 9a3a119..0000000 --- a/daemon/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala +++ /dev/null @@ -1,355 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.cluster.master - -import scala.concurrent.Future -import scala.concurrent.duration._ -import scala.util.{Failure, Success} - -import akka.actor._ -import akka.pattern.ask -import org.slf4j.Logger - -import org.apache.gearpump.cluster.AppMasterToMaster.{AppDataSaved, SaveAppDataFailed, _} -import org.apache.gearpump.cluster.AppMasterToWorker._ -import org.apache.gearpump.cluster.ClientToMaster._ -import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterDataRequest, AppMastersDataRequest, _} -import org.apache.gearpump.cluster.MasterToClient._ -import org.apache.gearpump.cluster.WorkerToAppMaster.{ShutdownExecutorFailed, _} -import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeInfo, ApplicationState} -import org.apache.gearpump.cluster.master.AppManager._ -import org.apache.gearpump.cluster.master.InMemoryKVService.{GetKVResult, PutKVResult, PutKVSuccess, _} -import org.apache.gearpump.cluster.master.Master._ -import org.apache.gearpump.util.Constants._ -import org.apache.gearpump.util.{ActorUtil, TimeOutScheduler, Util, _} - -/** - * AppManager is dedicated child of Master to manager all applications. - */ -private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLauncherFactory) - extends Actor with Stash with TimeOutScheduler { - - private val LOG: Logger = LogUtil.getLogger(getClass) - - private val EXECUTOR_ID: Int = APPMASTER_DEFAULT_EXECUTOR_ID - private val appMasterMaxRetries: Int = 5 - private val appMasterRetryTimeRange: Duration = 20.seconds - - implicit val timeout = FUTURE_TIMEOUT - implicit val executionContext = context.dispatcher - - // Next available appId - private var nextAppId: Int = 1 - - // From appId to appMaster data - // Applications not in activeAppMasters or deadAppMasters are in pending status - private var appMasterRegistry = Map.empty[Int, (ActorRef, AppMasterRuntimeInfo)] - - // Active appMaster list where applications are in active status - private var activeAppMasters = Set.empty[Int] - - // Dead appMaster list where applications are in inactive status - private var deadAppMasters = Set.empty[Int] - - private var appMasterRestartPolicies = Map.empty[Int, RestartPolicy] - - def receive: Receive = null - - kvService ! GetKV(MASTER_GROUP, MASTER_STATE) - context.become(waitForMasterState) - - def waitForMasterState: Receive = { - case GetKVSuccess(_, result) => - val masterState = result.asInstanceOf[MasterState] - if (masterState != null) { - this.nextAppId = masterState.maxId + 1 - this.activeAppMasters = masterState.activeAppMasters - this.deadAppMasters = masterState.deadAppMasters - this.appMasterRegistry = masterState.appMasterRegistry - } - context.become(receiveHandler) - unstashAll() - case GetKVFailed(ex) => - LOG.error("Failed to get master state, shutting down master to avoid data corruption...") - context.parent ! PoisonPill - case msg => - LOG.info(s"Get message ${msg.getClass.getSimpleName}") - stash() - } - - def receiveHandler: Receive = { - val msg = "Application Manager started. Ready for application submission..." - LOG.info(msg) - clientMsgHandler orElse appMasterMessage orElse selfMsgHandler orElse workerMessage orElse - appDataStoreService orElse terminationWatch - } - - def clientMsgHandler: Receive = { - case SubmitApplication(app, jar, username) => - LOG.info(s"Submit Application ${app.name}($nextAppId) by $username...") - val client = sender() - if (applicationNameExist(app.name)) { - client ! SubmitApplicationResult(Failure( - new Exception(s"Application name ${app.name} already existed"))) - } else { - context.actorOf(launcher.props(nextAppId, EXECUTOR_ID, app, jar, username, context.parent, - Some(client)), s"launcher${nextAppId}_${Util.randInt()}") - - val appState = new ApplicationState(nextAppId, app.name, 0, app, jar, username, null) - appMasterRestartPolicies += nextAppId -> - new RestartPolicy(appMasterMaxRetries, appMasterRetryTimeRange) - kvService ! PutKV(nextAppId.toString, APP_STATE, appState) - nextAppId += 1 - } - - case RestartApplication(appId) => - val client = sender() - (kvService ? GetKV(appId.toString, APP_STATE)).asInstanceOf[Future[GetKVResult]].map { - case GetKVSuccess(_, result) => - val appState = result.asInstanceOf[ApplicationState] - if (appState != null) { - LOG.info(s"Shutting down the application (restart), $appId") - self ! ShutdownApplication(appId) - self.tell(SubmitApplication(appState.app, appState.jar, appState.username), client) - } else { - client ! SubmitApplicationResult(Failure( - new Exception(s"Failed to restart, because the application $appId does not exist.") - )) - } - case GetKVFailed(ex) => - client ! SubmitApplicationResult(Failure( - new Exception(s"Unable to obtain the Master State. " + - s"Application $appId will not be restarted.") - )) - } - - case ShutdownApplication(appId) => - LOG.info(s"App Manager Shutting down application $appId") - val (_, appInfo) = appMasterRegistry.get(appId) - .filter { case (_, info) => !deadAppMasters.contains(info.appId)} - .getOrElse((null, null)) - Option(appInfo) match { - case Some(info) => - val worker = info.worker - val workerPath = Option(worker).map(_.path).orNull - LOG.info(s"Shutdown AppMaster at $workerPath, appId: $appId, executorId: $EXECUTOR_ID") - cleanApplicationData(appId) - val shutdown = ShutdownExecutor(appId, EXECUTOR_ID, - s"AppMaster $appId shutdown requested by master...") - sendMsgWithTimeOutCallBack(worker, shutdown, 30000, shutDownExecutorTimeOut()) - sender ! ShutdownApplicationResult(Success(appId)) - case None => - val errorMsg = s"Failed to find registration information for appId: $appId" - LOG.error(errorMsg) - sender ! ShutdownApplicationResult(Failure(new Exception(errorMsg))) - } - - case ResolveAppId(appId) => - val (appMaster, _) = appMasterRegistry.getOrElse(appId, (null, null)) - if (null != appMaster) { - sender ! ResolveAppIdResult(Success(appMaster)) - } else { - sender ! ResolveAppIdResult(Failure(new Exception(s"Can not find Application: $appId"))) - } - - case AppMastersDataRequest => - var appMastersData = collection.mutable.ListBuffer[AppMasterData]() - appMasterRegistry.foreach(pair => { - val (id, (appMaster: ActorRef, info: AppMasterRuntimeInfo)) = pair - val appMasterPath = ActorUtil.getFullPath(context.system, appMaster.path) - val workerPath = Option(info.worker).map(worker => - ActorUtil.getFullPath(context.system, worker.path)) - val status = getAppMasterStatus(id) - appMastersData += AppMasterData( - status, id, info.appName, appMasterPath, workerPath.orNull, - info.submissionTime, info.startTime, info.finishTime, info.user) - }) - - sender ! AppMastersData(appMastersData.toList) - - case QueryAppMasterConfig(appId) => - val config = - if (appMasterRegistry.contains(appId)) { - val (_, info) = appMasterRegistry(appId) - info.config - } else { - null - } - sender ! AppMasterConfig(config) - - case appMasterDataRequest: AppMasterDataRequest => - val appId = appMasterDataRequest.appId - val appStatus = getAppMasterStatus(appId) - - appStatus match { - case AppMasterNonExist => - sender ! AppMasterData(AppMasterNonExist) - case _ => - val (appMaster, info) = appMasterRegistry(appId) - val appMasterPath = ActorUtil.getFullPath(context.system, appMaster.path) - val workerPath = Option(info.worker).map( - worker => ActorUtil.getFullPath(context.system, worker.path)).orNull - sender ! AppMasterData( - appStatus, appId, info.appName, appMasterPath, workerPath, - info.submissionTime, info.startTime, info.finishTime, info.user) - } - } - - def workerMessage: Receive = { - case ShutdownExecutorSucceed(appId, executorId) => - LOG.info(s"Shut down executor $executorId for application $appId successfully") - case failed: ShutdownExecutorFailed => - LOG.error(failed.reason) - } - - private def getAppMasterStatus(appId: Int): AppMasterStatus = { - if (activeAppMasters.contains(appId)) { - AppMasterActive - } else if (deadAppMasters.contains(appId)) { - AppMasterInActive - } else if (appMasterRegistry.contains(appId)) { - AppMasterPending - } else { - AppMasterNonExist - } - } - - private def shutDownExecutorTimeOut(): Unit = { - LOG.error(s"Shut down executor time out") - } - - def appMasterMessage: Receive = { - case RegisterAppMaster(appMaster, registerBack: AppMasterRuntimeInfo) => - val startTime = System.currentTimeMillis() - val register = registerBack.copy(startTime = startTime) - - LOG.info(s"Register AppMaster for app: ${register.appId}, $register") - context.watch(appMaster) - appMasterRegistry += register.appId -> (appMaster, register) - kvService ! PutKV(MASTER_GROUP, MASTER_STATE, - MasterState(nextAppId, appMasterRegistry, activeAppMasters, deadAppMasters)) - sender ! AppMasterRegistered(register.appId) - - case ActivateAppMaster(appId) => - LOG.info(s"Activate AppMaster for app $appId") - activeAppMasters += appId - kvService ! PutKV(MASTER_GROUP, MASTER_STATE, - MasterState(this.nextAppId, appMasterRegistry, activeAppMasters, deadAppMasters)) - sender ! AppMasterActivated(appId) - } - - def appDataStoreService: Receive = { - case SaveAppData(appId, key, value) => - val client = sender() - (kvService ? PutKV(appId.toString, key, value)).asInstanceOf[Future[PutKVResult]].map { - case PutKVSuccess => - client ! AppDataSaved - case PutKVFailed(k, ex) => - client ! SaveAppDataFailed - } - case GetAppData(appId, key) => - val client = sender() - (kvService ? GetKV(appId.toString, key)).asInstanceOf[Future[GetKVResult]].map { - case GetKVSuccess(privateKey, value) => - client ! GetAppDataResult(key, value) - case GetKVFailed(ex) => - client ! GetAppDataResult(key, null) - } - } - - def terminationWatch: Receive = { - case terminate: Terminated => - LOG.info(s"AppMaster(${terminate.actor.path}) is terminated, " + - s"network down: ${terminate.getAddressTerminated}") - - // Now we assume that the only normal way to stop the application is submitting a - // ShutdownApplication request - val application = appMasterRegistry.find { appInfo => - val (_, (actorRef, _)) = appInfo - actorRef.compareTo(terminate.actor) == 0 - } - if (application.nonEmpty) { - val appId = application.get._1 - (kvService ? GetKV(appId.toString, APP_STATE)).asInstanceOf[Future[GetKVResult]].map { - case GetKVSuccess(_, result) => - val appState = result.asInstanceOf[ApplicationState] - if (appState != null) { - LOG.info(s"Recovering application, $appId") - self ! RecoverApplication(appState) - } else { - LOG.error(s"Cannot find application state for $appId") - } - case GetKVFailed(ex) => - LOG.error(s"Cannot find master state to recover") - } - } - } - - def selfMsgHandler: Receive = { - case RecoverApplication(state) => - val appId = state.appId - if (appMasterRestartPolicies.get(appId).get.allowRestart) { - LOG.info(s"AppManager Recovering Application $appId...") - activeAppMasters -= appId - kvService ! PutKV(MASTER_GROUP, MASTER_STATE, - MasterState(this.nextAppId, appMasterRegistry, activeAppMasters, deadAppMasters)) - context.actorOf(launcher.props(appId, EXECUTOR_ID, state.app, state.jar, state.username, - context.parent, None), s"launcher${appId}_${Util.randInt()}") - } else { - LOG.error(s"Application $appId failed too many times") - } - } - - case class RecoverApplication(applicationStatus: ApplicationState) - - private def cleanApplicationData(appId: Int): Unit = { - if (appMasterRegistry.contains(appId)) { - // Add the dead app to dead appMasters - deadAppMasters += appId - // Remove the dead app from active appMasters - activeAppMasters -= appId - - appMasterRegistry += appId -> { - val (ref, info) = appMasterRegistry(appId) - (ref, info.copy(finishTime = System.currentTimeMillis())) - } - kvService ! PutKV(MASTER_GROUP, MASTER_STATE, - MasterState(this.nextAppId, appMasterRegistry, activeAppMasters, deadAppMasters)) - kvService ! DeleteKVGroup(appId.toString) - } - } - - private def applicationNameExist(appName: String): Boolean = { - appMasterRegistry.values.exists { case (_, info) => - info.appName == appName && !deadAppMasters.contains(info.appId) - } - } -} - -object AppManager { - final val APP_STATE = "app_state" - // The id is used in KVStore - final val MASTER_STATE = "master_state" - - case class MasterState( - maxId: Int, - appMasterRegistry: Map[Int, (ActorRef, AppMasterRuntimeInfo)], - activeAppMasters: Set[Int], - deadAppMasters: Set[Int]) -}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala deleted file mode 100644 index 3e54214..0000000 --- a/daemon/src/main/scala/org/apache/gearpump/cluster/master/InMemoryKVService.scala +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.cluster.master - -import java.util.concurrent.TimeUnit -import scala.concurrent.TimeoutException -import scala.concurrent.duration.Duration - -import akka.actor._ -import akka.cluster.Cluster -import akka.cluster.ddata.Replicator._ -import akka.cluster.ddata.{DistributedData, LWWMap, LWWMapKey} -import org.slf4j.Logger - -import org.apache.gearpump.util.LogUtil - -/** - * A replicated simple in-memory KV service. The replications are stored on all masters. - */ -class InMemoryKVService extends Actor with Stash { - import org.apache.gearpump.cluster.master.InMemoryKVService._ - - private val KV_SERVICE = "gearpump_kvservice" - - private val LOG: Logger = LogUtil.getLogger(getClass) - private val replicator = DistributedData(context.system).replicator - private implicit val cluster = Cluster(context.system) - - // Optimize write path, we can tolerate one master down for recovery. - private val timeout = Duration(15, TimeUnit.SECONDS) - private val readMajority = ReadMajority(timeout) - private val writeMajority = WriteMajority(timeout) - - private def groupKey(group: String): LWWMapKey[Any] = { - LWWMapKey[Any](KV_SERVICE + "_" + group) - } - - def receive: Receive = kvService - - def kvService: Receive = { - - case GetKV(group: String, key: String) => - val request = Request(sender(), key) - replicator ! Get(groupKey(group), readMajority, Some(request)) - case success@GetSuccess(group: LWWMapKey[Any @unchecked], Some(request: Request)) => - val appData = success.get(group) - LOG.info(s"Successfully retrived group: ${group.id}") - request.client ! GetKVSuccess(request.key, appData.get(request.key).orNull) - case NotFound(group: LWWMapKey[Any @unchecked], Some(request: Request)) => - LOG.info(s"We cannot find group $group") - request.client ! GetKVSuccess(request.key, null) - case GetFailure(group: LWWMapKey[Any @unchecked], Some(request: Request)) => - val error = s"Failed to get application data, the request key is ${request.key}" - LOG.error(error) - request.client ! GetKVFailed(new Exception(error)) - - case PutKV(group: String, key: String, value: Any) => - val request = Request(sender(), key) - val update = Update(groupKey(group), LWWMap(), writeMajority, Some(request)) { map => - map + (key -> value) - } - replicator ! update - case UpdateSuccess(group: LWWMapKey[Any @unchecked], Some(request: Request)) => - request.client ! PutKVSuccess - case ModifyFailure(group: LWWMapKey[Any @unchecked], error, cause, Some(request: Request)) => - request.client ! PutKVFailed(request.key, new Exception(error, cause)) - case UpdateTimeout(group: LWWMapKey[Any @unchecked], Some(request: Request)) => - request.client ! PutKVFailed(request.key, new TimeoutException()) - - case delete@DeleteKVGroup(group: String) => - replicator ! Delete(groupKey(group), writeMajority) - case DeleteSuccess(group) => - LOG.info(s"KV Group ${group.id} is deleted") - case ReplicationDeleteFailure(group) => - LOG.error(s"Failed to delete KV Group ${group.id}...") - case DataDeleted(group) => - LOG.error(s"Group ${group.id} is deleted, you can no longer put/get/delete this group...") - } -} - -object InMemoryKVService { - /** - * KV Service related - */ - case class GetKV(group: String, key: String) - - trait GetKVResult - - case class GetKVSuccess(key: String, value: Any) extends GetKVResult - - case class GetKVFailed(ex: Throwable) extends GetKVResult - - case class PutKV(group: String, key: String, value: Any) - - case class DeleteKVGroup(group: String) - - case class GroupDeleted(group: String) extends GetKVResult with PutKVResult - - trait PutKVResult - - case object PutKVSuccess extends PutKVResult - - case class PutKVFailed(key: String, ex: Throwable) extends PutKVResult - - case class Request(client: ActorRef, key: String) -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/main/scala/org/apache/gearpump/cluster/master/Master.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/master/Master.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/master/Master.scala deleted file mode 100644 index 6b4df07..0000000 --- a/daemon/src/main/scala/org/apache/gearpump/cluster/master/Master.scala +++ /dev/null @@ -1,311 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.cluster.master - -import java.lang.management.ManagementFactory -import org.apache.gearpump.cluster.worker.WorkerId -import org.apache.gearpump.jarstore.JarStoreServer - -import scala.collection.JavaConverters._ -import scala.collection.immutable - -import akka.actor._ -import akka.remote.DisassociatedEvent -import com.typesafe.config.Config -import org.apache.commons.lang.exception.ExceptionUtils -import org.slf4j.Logger - -import org.apache.gearpump.cluster.AppMasterToMaster._ -import org.apache.gearpump.cluster.ClientToMaster._ -import org.apache.gearpump.cluster.ClusterConfig -import org.apache.gearpump.cluster.MasterToAppMaster._ -import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, MasterConfig, ResolveWorkerIdResult} -import org.apache.gearpump.cluster.MasterToWorker._ -import org.apache.gearpump.cluster.WorkerToMaster._ -import org.apache.gearpump.cluster.master.InMemoryKVService._ -import org.apache.gearpump.cluster.master.Master.{MasterInfo, WorkerTerminated, _} -import org.apache.gearpump.cluster.scheduler.Scheduler.ApplicationFinished -import org.apache.gearpump.metrics.Metrics.ReportMetrics -import org.apache.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService} -import org.apache.gearpump.transport.HostPort -import org.apache.gearpump.util.Constants._ -import org.apache.gearpump.util.HistoryMetricsService.HistoryMetricsConfig -import org.apache.gearpump.util._ - -/** - * Master Actor who manages resources of the whole cluster. - * It is like the resource manager of YARN. - */ -private[cluster] class Master extends Actor with Stash { - private val LOG: Logger = LogUtil.getLogger(getClass) - private val systemConfig: Config = context.system.settings.config - private implicit val timeout = Constants.FUTURE_TIMEOUT - private val kvService = context.actorOf(Props(new InMemoryKVService()), "kvService") - // Resources and resourceRequests can be dynamically constructed by - // heartbeat of worker and appmaster when master singleton is migrated. - // We don't need to persist them in cluster - private var appManager: ActorRef = null - - private var scheduler: ActorRef = null - - private var workers = new immutable.HashMap[ActorRef, WorkerId] - - private val birth = System.currentTimeMillis() - - private var nextWorkerId = 0 - - def receive: Receive = null - - // Register jvm metrics - Metrics(context.system).register(new JvmMetricsSet(s"master")) - - LOG.info("master is started at " + ActorUtil.getFullPath(context.system, self.path) + "...") - - val jarStoreRootPath = systemConfig.getString(Constants.GEARPUMP_APP_JAR_STORE_ROOT_PATH) - - private val jarStore = context.actorOf(Props(classOf[JarStoreServer], jarStoreRootPath)) - - private val hostPort = HostPort(ActorUtil.getSystemAddress(context.system).hostPort) - - // Maintain the list of active masters. - private var masters: List[MasterNode] = { - // Add myself into the list of initial masters. - List(MasterNode(hostPort.host, hostPort.port)) - } - - val metricsEnabled = systemConfig.getBoolean(GEARPUMP_METRIC_ENABLED) - - val getHistoryMetricsConfig = HistoryMetricsConfig(systemConfig) - val historyMetricsService = if (metricsEnabled) { - val historyMetricsService = { - context.actorOf(Props(new HistoryMetricsService("master", getHistoryMetricsConfig))) - } - - val metricsReportService = context.actorOf( - Props(new MetricsReporterService(Metrics(context.system)))) - historyMetricsService.tell(ReportMetrics, metricsReportService) - Some(historyMetricsService) - } else { - None - } - - kvService ! GetKV(MASTER_GROUP, WORKER_ID) - context.become(waitForNextWorkerId) - - def waitForNextWorkerId: Receive = { - case GetKVSuccess(_, result) => - if (result != null) { - this.nextWorkerId = result.asInstanceOf[Int] - } else { - LOG.warn("Cannot find existing state in the distributed cluster...") - } - context.become(receiveHandler) - unstashAll() - case GetKVFailed(ex) => - LOG.error("Failed to get worker id, shutting down master to avoid data corruption...") - context.parent ! PoisonPill - case msg => - LOG.info(s"Get message ${msg.getClass.getSimpleName}") - stash() - } - - def receiveHandler: Receive = workerMsgHandler orElse - appMasterMsgHandler orElse - onMasterListChange orElse - clientMsgHandler orElse - metricsService orElse - jarStoreService orElse - terminationWatch orElse - disassociated orElse - kvServiceMsgHandler orElse - ActorUtil.defaultMsgHandler(self) - - def workerMsgHandler: Receive = { - case RegisterNewWorker => - val workerId = WorkerId(nextWorkerId, System.currentTimeMillis()) - nextWorkerId += 1 - kvService ! PutKV(MASTER_GROUP, WORKER_ID, nextWorkerId) - val workerHostname = ActorUtil.getHostname(sender()) - LOG.info(s"Register new from $workerHostname ....") - self forward RegisterWorker(workerId) - - case RegisterWorker(id) => - context.watch(sender()) - sender ! WorkerRegistered(id, MasterInfo(self, birth)) - scheduler forward WorkerRegistered(id, MasterInfo(self, birth)) - workers += (sender() -> id) - val workerHostname = ActorUtil.getHostname(sender()) - LOG.info(s"Register Worker with id $id from $workerHostname ....") - case resourceUpdate: ResourceUpdate => - scheduler forward resourceUpdate - } - - def jarStoreService: Receive = { - case GetJarStoreServer => - jarStore forward GetJarStoreServer - } - - def kvServiceMsgHandler: Receive = { - case PutKVSuccess => - // Skip - case PutKVFailed(key, exception) => - LOG.error(s"Put KV of key $key to InMemoryKVService failed.\n" + - ExceptionUtils.getStackTrace(exception)) - } - - def metricsService: Receive = { - case query: QueryHistoryMetrics => - if (historyMetricsService.isEmpty) { - // Returns empty metrics so that we don't hang the UI - sender ! HistoryMetrics(query.path, List.empty[HistoryMetricsItem]) - } else { - historyMetricsService.get forward query - } - } - - def appMasterMsgHandler: Receive = { - case request: RequestResource => - scheduler forward request - case registerAppMaster: RegisterAppMaster => - appManager forward registerAppMaster - case activateAppMaster: ActivateAppMaster => - appManager forward activateAppMaster - case save: SaveAppData => - appManager forward save - case get: GetAppData => - appManager forward get - case GetAllWorkers => - sender ! WorkerList(workers.values.toList) - case GetMasterData => - val aliveFor = System.currentTimeMillis() - birth - val logFileDir = LogUtil.daemonLogDir(systemConfig).getAbsolutePath - val userDir = System.getProperty("user.dir") - - val masterDescription = - MasterSummary( - MasterNode(hostPort.host, hostPort.port), - masters, - aliveFor, - logFileDir, - jarStoreRootPath, - MasterStatus.Synced, - userDir, - List.empty[MasterActivity], - jvmName = ManagementFactory.getRuntimeMXBean().getName(), - historyMetricsConfig = getHistoryMetricsConfig - ) - - sender ! MasterData(masterDescription) - - case invalidAppMaster: InvalidAppMaster => - appManager forward invalidAppMaster - } - - import scala.util.{Failure, Success} - - def onMasterListChange: Receive = { - case MasterListUpdated(masters: List[MasterNode]) => - this.masters = masters - } - - def clientMsgHandler: Receive = { - case app: SubmitApplication => - LOG.debug(s"Receive from client, SubmitApplication $app") - appManager.forward(app) - case app: RestartApplication => - LOG.debug(s"Receive from client, RestartApplication $app") - appManager.forward(app) - case app: ShutdownApplication => - LOG.debug(s"Receive from client, Shutting down Application ${app.appId}") - scheduler ! ApplicationFinished(app.appId) - appManager.forward(app) - case app: ResolveAppId => - LOG.debug(s"Receive from client, resolving appId ${app.appId} to ActorRef") - appManager.forward(app) - case resolve: ResolveWorkerId => - LOG.debug(s"Receive from client, resolving workerId ${resolve.workerId}") - val worker = workers.find(_._2 == resolve.workerId) - worker match { - case Some(worker) => sender ! ResolveWorkerIdResult(Success(worker._1)) - case None => sender ! ResolveWorkerIdResult(Failure( - new Exception(s"cannot find worker ${resolve.workerId}"))) - } - case AppMastersDataRequest => - LOG.debug("Master received AppMastersDataRequest") - appManager forward AppMastersDataRequest - case appMasterDataRequest: AppMasterDataRequest => - LOG.debug("Master received AppMasterDataRequest") - appManager forward appMasterDataRequest - case query: QueryAppMasterConfig => - LOG.debug("Master received QueryAppMasterConfig") - appManager forward query - case QueryMasterConfig => - sender ! MasterConfig(ClusterConfig.filterOutDefaultConfig(systemConfig)) - } - - def disassociated: Receive = { - case disassociated: DisassociatedEvent => - LOG.info(s" disassociated ${disassociated.remoteAddress}") - } - - def terminationWatch: Receive = { - case t: Terminated => - val actor = t.actor - LOG.info(s"worker ${actor.path} get terminated, is it due to network reason?" + - t.getAddressTerminated()) - - LOG.info("Let's filter out dead resources...") - // Filters out dead worker resource - if (workers.keySet.contains(actor)) { - scheduler ! WorkerTerminated(workers.get(actor).get) - workers -= actor - } - } - - override def preStart(): Unit = { - val path = ActorUtil.getFullPath(context.system, self.path) - LOG.info(s"master path is $path") - val schedulerClass = Class.forName( - systemConfig.getString(Constants.GEARPUMP_SCHEDULING_SCHEDULER)) - - appManager = context.actorOf(Props(new AppManager(kvService, AppMasterLauncher)), - classOf[AppManager].getSimpleName) - scheduler = context.actorOf(Props(schedulerClass)) - context.system.eventStream.subscribe(self, classOf[DisassociatedEvent]) - } -} - -object Master { - final val MASTER_GROUP = "master_group" - - final val WORKER_ID = "next_worker_id" - - case class WorkerTerminated(workerId: WorkerId) - - case class MasterInfo(master: ActorRef, startTime: Long = 0L) - - /** Notify the subscriber that master actor list has been updated */ - case class MasterListUpdated(masters: List[MasterNode]) - - object MasterInfo { - def empty: MasterInfo = MasterInfo(null) - } - - case class SlotStatus(totalSlots: Int, availableSlots: Int) -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/main/scala/org/apache/gearpump/cluster/scheduler/PriorityScheduler.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/scheduler/PriorityScheduler.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/scheduler/PriorityScheduler.scala deleted file mode 100644 index 1429694..0000000 --- a/daemon/src/main/scala/org/apache/gearpump/cluster/scheduler/PriorityScheduler.scala +++ /dev/null @@ -1,156 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.cluster.scheduler - -import org.apache.gearpump.cluster.worker.WorkerId - -import scala.collection.mutable - -import akka.actor.ActorRef - -import org.apache.gearpump.cluster.AppMasterToMaster.RequestResource -import org.apache.gearpump.cluster.MasterToAppMaster.ResourceAllocated -import org.apache.gearpump.cluster.scheduler.Relaxation._ -import org.apache.gearpump.cluster.scheduler.Scheduler.PendingRequest - -/** Assign resource to application based on the priority of the application */ -class PriorityScheduler extends Scheduler { - private var resourceRequests = new mutable.PriorityQueue[PendingRequest]()(requestOrdering) - - def requestOrdering: Ordering[PendingRequest] = new Ordering[PendingRequest] { - override def compare(x: PendingRequest, y: PendingRequest): Int = { - var res = x.request.priority.id - y.request.priority.id - if (res == 0) { - res = y.timeStamp.compareTo(x.timeStamp) - } - res - } - } - - override def receive: Receive = super.handleScheduleMessage orElse resourceRequestHandler - - override def allocateResource(): Unit = { - var scheduleLater = Array.empty[PendingRequest] - val resourcesSnapShot = resources.clone() - var allocated = Resource.empty - val totalResource = Resource(resourcesSnapShot.values.map(_._2.slots).sum) - - while (resourceRequests.nonEmpty && (allocated < totalResource)) { - val PendingRequest(appId, appMaster, request, timeStamp) = resourceRequests.dequeue() - request.relaxation match { - case ANY => - val allocations = allocateFairly(resourcesSnapShot, request) - val newAllocated = Resource(allocations.map(_.resource.slots).sum) - if (allocations.nonEmpty) { - appMaster ! ResourceAllocated(allocations.toArray) - } - if (newAllocated < request.resource) { - val remainingRequest = request.resource - newAllocated - val remainingExecutors = request.executorNum - allocations.length - val newResourceRequest = request.copy(resource = remainingRequest, - executorNum = remainingExecutors) - scheduleLater = scheduleLater :+ - PendingRequest(appId, appMaster, newResourceRequest, timeStamp) - } - allocated = allocated + newAllocated - case ONEWORKER => - val availableResource = resourcesSnapShot.find { params => - val (_, (_, resource)) = params - resource > request.resource - } - if (availableResource.nonEmpty) { - val (workerId, (worker, resource)) = availableResource.get - allocated = allocated + request.resource - appMaster ! ResourceAllocated(Array(ResourceAllocation(request.resource, worker, - workerId))) - resourcesSnapShot.update(workerId, (worker, resource - request.resource)) - } else { - scheduleLater = scheduleLater :+ PendingRequest(appId, appMaster, request, timeStamp) - } - case SPECIFICWORKER => - val workerAndResource = resourcesSnapShot.get(request.workerId) - if (workerAndResource.nonEmpty && workerAndResource.get._2 > request.resource) { - val (worker, availableResource) = workerAndResource.get - appMaster ! ResourceAllocated(Array(ResourceAllocation(request.resource, worker, - request.workerId))) - allocated = allocated + request.resource - resourcesSnapShot.update(request.workerId, (worker, - availableResource - request.resource)) - } else { - scheduleLater = scheduleLater :+ PendingRequest(appId, appMaster, request, timeStamp) - } - } - } - for (request <- scheduleLater) - resourceRequests.enqueue(request) - } - - def resourceRequestHandler: Receive = { - case RequestResource(appId, request) => - LOG.info(s"Request resource: appId: $appId, slots: ${request.resource.slots}, " + - s"relaxation: ${request.relaxation}, executor number: ${request.executorNum}") - val appMaster = sender() - resourceRequests.enqueue(new PendingRequest(appId, appMaster, request, - System.currentTimeMillis())) - allocateResource() - } - - override def doneApplication(appId: Int): Unit = { - resourceRequests = resourceRequests.filter(_.appId != appId) - } - - private def allocateFairly( - resources: mutable.HashMap[WorkerId, (ActorRef, Resource)], request: ResourceRequest) - : List[ResourceAllocation] = { - val workerNum = resources.size - var allocations = List.empty[ResourceAllocation] - var totalAvailable = Resource(resources.values.map(_._2.slots).sum) - var remainingRequest = request.resource - var remainingExecutors = Math.min(request.executorNum, request.resource.slots) - - while (remainingExecutors > 0 && !totalAvailable.isEmpty) { - val exeutorNum = Math.min(workerNum, remainingExecutors) - val toRequest = Resource(remainingRequest.slots * exeutorNum / remainingExecutors) - - val sortedResources = resources.toArray.sortBy(_._2._2.slots)(Ordering[Int].reverse) - val pickedResources = sortedResources.take(exeutorNum) - - val flattenResource = pickedResources.zipWithIndex.flatMap { workerWithIndex => - val ((workerId, (worker, resource)), index) = workerWithIndex - 0.until(resource.slots).map(seq => ((workerId, worker), seq * workerNum + index)) - }.sortBy(_._2).map(_._1) - - if (flattenResource.length < toRequest.slots) { - // Can not safisfy the user's requirements - totalAvailable = Resource.empty - } else { - flattenResource.take(toRequest.slots).groupBy(actor => actor).mapValues(_.length). - toArray.foreach { params => - val ((workerId, worker), slots) = params - resources.update(workerId, (worker, resources.get(workerId).get._2 - Resource(slots))) - allocations :+= ResourceAllocation(Resource(slots), worker, workerId) - } - totalAvailable -= toRequest - remainingRequest -= toRequest - remainingExecutors -= exeutorNum - } - } - allocations - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala deleted file mode 100644 index 7187c1a..0000000 --- a/daemon/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.cluster.scheduler - -import org.apache.gearpump.cluster.worker.WorkerId - -import scala.collection.mutable - -import akka.actor.{Actor, ActorRef} -import org.slf4j.Logger - -import org.apache.gearpump.TimeStamp -import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, UpdateResourceSucceed, WorkerRegistered} -import org.apache.gearpump.cluster.WorkerToMaster.ResourceUpdate -import org.apache.gearpump.cluster.master.Master.WorkerTerminated -import org.apache.gearpump.cluster.scheduler.Scheduler.ApplicationFinished -import org.apache.gearpump.util.LogUtil - -/** - * Scheduler schedule resource for different applications. - */ -abstract class Scheduler extends Actor { - val LOG: Logger = LogUtil.getLogger(getClass) - protected var resources = new mutable.HashMap[WorkerId, (ActorRef, Resource)] - - def handleScheduleMessage: Receive = { - case WorkerRegistered(id, _) => - if (!resources.contains(id)) { - LOG.info(s"Worker $id added to the scheduler") - resources.put(id, (sender, Resource.empty)) - } - case update@ResourceUpdate(worker, workerId, resource) => - LOG.info(s"$update...") - if (resources.contains(workerId)) { - val resourceReturned = resource > resources.get(workerId).get._2 - resources.update(workerId, (worker, resource)) - if (resourceReturned) { - allocateResource() - } - sender ! UpdateResourceSucceed - } - else { - sender ! UpdateResourceFailed( - s"ResourceUpdate failed! The worker $workerId has not been registered into master") - } - case WorkerTerminated(workerId) => - if (resources.contains(workerId)) { - resources -= workerId - } - case ApplicationFinished(appId) => - doneApplication(appId) - } - - def allocateResource(): Unit - - def doneApplication(appId: Int): Unit -} - -object Scheduler { - case class PendingRequest( - appId: Int, appMaster: ActorRef, request: ResourceRequest, timeStamp: TimeStamp) - - case class ApplicationFinished(appId: Int) -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala deleted file mode 100644 index b4e6f9e..0000000 --- a/daemon/src/main/scala/org/apache/gearpump/cluster/worker/DefaultExecutorProcessLauncher.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.cluster.worker - -import java.io.File - -import com.typesafe.config.Config -import org.slf4j.Logger - -import org.apache.gearpump.cluster.scheduler.Resource -import org.apache.gearpump.util.{LogUtil, RichProcess, Util} - -/** Launcher to start an executor process */ -class DefaultExecutorProcessLauncher(val config: Config) extends ExecutorProcessLauncher { - private val LOG: Logger = LogUtil.getLogger(getClass) - - override def createProcess( - appId: Int, executorId: Int, resource: Resource, config: Config, options: Array[String], - classPath: Array[String], mainClass: String, arguments: Array[String]): RichProcess = { - - LOG.info(s"Launch executor $executorId, classpath: ${classPath.mkString(File.pathSeparator)}") - Util.startProcess(options, classPath, mainClass, arguments) - } - - override def cleanProcess(appId: Int, executorId: Int): Unit = {} -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala b/daemon/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala deleted file mode 100644 index 1b52e5d..0000000 --- a/daemon/src/main/scala/org/apache/gearpump/cluster/worker/Worker.scala +++ /dev/null @@ -1,581 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.cluster.worker - -import java.io.File -import java.lang.management.ManagementFactory -import java.net.URL -import java.util.concurrent.{Executors, TimeUnit} -import org.apache.gearpump.cluster.worker.Worker.ExecutorWatcher - -import scala.concurrent.duration._ -import scala.concurrent.{ExecutionContext, Future, Promise} -import scala.util.{Failure, Success, Try} - -import akka.actor.SupervisorStrategy.Stop -import akka.actor._ -import com.typesafe.config.{ConfigValueFactory, Config, ConfigFactory} -import org.slf4j.Logger - -import org.apache.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData} -import org.apache.gearpump.cluster.AppMasterToWorker._ -import org.apache.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryWorkerConfig} -import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, WorkerConfig} -import org.apache.gearpump.cluster.MasterToWorker._ -import org.apache.gearpump.cluster.WorkerToAppMaster._ -import org.apache.gearpump.cluster.WorkerToMaster._ -import org.apache.gearpump.cluster.master.Master.MasterInfo -import org.apache.gearpump.cluster.scheduler.Resource -import org.apache.gearpump.cluster.{ClusterConfig, ExecutorJVMConfig} -import org.apache.gearpump.jarstore.{JarStoreClient, JarStoreServer} -import org.apache.gearpump.metrics.Metrics.ReportMetrics -import org.apache.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService} -import org.apache.gearpump.util.ActorSystemBooter.Daemon -import org.apache.gearpump.util.Constants._ -import org.apache.gearpump.util.HistoryMetricsService.HistoryMetricsConfig -import org.apache.gearpump.util.{TimeOutScheduler, _} - -/** - * Worker is used to track the resource on single machine, it is like - * the node manager of YARN. - * - * @param masterProxy masterProxy is used to resolve the master - */ -private[cluster] class Worker(masterProxy: ActorRef) extends Actor with TimeOutScheduler { - private val systemConfig: Config = context.system.settings.config - - private val address = ActorUtil.getFullPath(context.system, self.path) - private var resource = Resource.empty - private var allocatedResources = Map[ActorRef, Resource]() - private var executorsInfo = Map[ActorRef, ExecutorSlots]() - private var id: WorkerId = WorkerId.unspecified - private val createdTime = System.currentTimeMillis() - private var masterInfo: MasterInfo = null - private var executorNameToActor = Map.empty[String, ActorRef] - private val executorProcLauncher: ExecutorProcessLauncher = getExecutorProcLauncher() - private val jarStoreClient = new JarStoreClient(systemConfig, context.system) - - private val ioPool = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool()) - private val resourceUpdateTimeoutMs = 30000 // Milliseconds - - private var totalSlots: Int = 0 - - val metricsEnabled = systemConfig.getBoolean(GEARPUMP_METRIC_ENABLED) - var historyMetricsService: Option[ActorRef] = None - - override def receive: Receive = null - var LOG: Logger = LogUtil.getLogger(getClass) - - def service: Receive = - appMasterMsgHandler orElse - clientMessageHandler orElse - metricsService orElse - terminationWatch(masterInfo.master) orElse - ActorUtil.defaultMsgHandler(self) - - def metricsService: Receive = { - case query: QueryHistoryMetrics => - if (historyMetricsService.isEmpty) { - // Returns empty metrics so that we don't hang the UI - sender ! HistoryMetrics(query.path, List.empty[HistoryMetricsItem]) - } else { - historyMetricsService.get forward query - } - } - - private var metricsInitialized = false - - val getHistoryMetricsConfig = HistoryMetricsConfig(systemConfig) - - private def initializeMetrics(): Unit = { - // Registers jvm metrics - val metricsSetName = "worker" + WorkerId.render(id) - Metrics(context.system).register(new JvmMetricsSet(metricsSetName)) - - historyMetricsService = if (metricsEnabled) { - val historyMetricsService = { - context.actorOf(Props(new HistoryMetricsService(metricsSetName, getHistoryMetricsConfig))) - } - - val metricsReportService = context.actorOf(Props( - new MetricsReporterService(Metrics(context.system)))) - historyMetricsService.tell(ReportMetrics, metricsReportService) - Some(historyMetricsService) - } else { - None - } - } - - def waitForMasterConfirm(timeoutTicker: Cancellable): Receive = { - - // If master get disconnected, the WorkerRegistered may be triggered multiple times. - case WorkerRegistered(id, masterInfo) => - this.id = id - - // Adds the flag check, so that we don't re-initialize the metrics when worker re-register - // itself. - if (!metricsInitialized) { - initializeMetrics() - metricsInitialized = true - } - - this.masterInfo = masterInfo - timeoutTicker.cancel() - context.watch(masterInfo.master) - this.LOG = LogUtil.getLogger(getClass, worker = id) - LOG.info(s"Worker is registered. " + - s"actor path: ${ActorUtil.getFullPath(context.system, self.path)} ....") - sendMsgWithTimeOutCallBack(masterInfo.master, ResourceUpdate(self, id, resource), - resourceUpdateTimeoutMs, updateResourceTimeOut()) - context.become(service) - } - - private def updateResourceTimeOut(): Unit = { - LOG.error(s"Update worker resource time out") - } - - def appMasterMsgHandler: Receive = { - case shutdown@ShutdownExecutor(appId, executorId, reason: String) => - val actorName = ActorUtil.actorNameForExecutor(appId, executorId) - val executorToStop = executorNameToActor.get(actorName) - if (executorToStop.isDefined) { - LOG.info(s"Shutdown executor ${actorName}(${executorToStop.get.path.toString}) " + - s"due to: $reason") - executorToStop.get.forward(shutdown) - } else { - LOG.error(s"Cannot find executor $actorName, ignore this message") - sender ! ShutdownExecutorFailed(s"Can not find executor $executorId for app $appId") - } - case launch: LaunchExecutor => - LOG.info(s"$launch") - if (resource < launch.resource) { - sender ! ExecutorLaunchRejected("There is no free resource on this machine") - } else { - val actorName = ActorUtil.actorNameForExecutor(launch.appId, launch.executorId) - - val executor = context.actorOf(Props(classOf[ExecutorWatcher], launch, masterInfo, ioPool, - jarStoreClient, executorProcLauncher)) - executorNameToActor += actorName -> executor - - resource = resource - launch.resource - allocatedResources = allocatedResources + (executor -> launch.resource) - - reportResourceToMaster() - executorsInfo += executor -> - ExecutorSlots(launch.appId, launch.executorId, launch.resource.slots) - context.watch(executor) - } - case UpdateResourceFailed(reason, ex) => - LOG.error(reason) - context.stop(self) - case UpdateResourceSucceed => - LOG.info(s"Update resource succeed") - case GetWorkerData(workerId) => - val aliveFor = System.currentTimeMillis() - createdTime - val logDir = LogUtil.daemonLogDir(systemConfig).getAbsolutePath - val userDir = System.getProperty("user.dir") - sender ! WorkerData(WorkerSummary( - id, "active", - address, - aliveFor, - logDir, - executorsInfo.values.toArray, - totalSlots, - resource.slots, - userDir, - jvmName = ManagementFactory.getRuntimeMXBean().getName(), - resourceManagerContainerId = systemConfig.getString( - GEARPUMP_WORKER_RESOURCE_MANAGER_CONTAINER_ID), - historyMetricsConfig = getHistoryMetricsConfig) - ) - case ChangeExecutorResource(appId, executorId, usedResource) => - for (executor <- executorActorRef(appId, executorId); - allocatedResource <- allocatedResources.get(executor)) { - - allocatedResources += executor -> usedResource - resource = resource + allocatedResource - usedResource - reportResourceToMaster() - - if (usedResource == Resource(0)) { - executorsInfo -= executor - allocatedResources -= executor - // stop executor if there is no resource binded to it. - LOG.info(s"Shutdown executor $executorId because the resource used is zero") - executor ! ShutdownExecutor(appId, executorId, - "Shutdown executor because the resource used is zero") - } - } - } - - private def reportResourceToMaster(): Unit = { - sendMsgWithTimeOutCallBack(masterInfo.master, - ResourceUpdate(self, id, resource), resourceUpdateTimeoutMs, updateResourceTimeOut()) - } - - private def executorActorRef(appId: Int, executorId: Int): Option[ActorRef] = { - val actorName = ActorUtil.actorNameForExecutor(appId, executorId) - executorNameToActor.get(actorName) - } - - def clientMessageHandler: Receive = { - case QueryWorkerConfig(workerId) => - if (this.id == workerId) { - sender ! WorkerConfig(ClusterConfig.filterOutDefaultConfig(systemConfig)) - } else { - sender ! WorkerConfig(ConfigFactory.empty) - } - } - - private def retryRegisterWorker(workerId: WorkerId, timeOutSeconds: Int): Cancellable = { - repeatActionUtil( - seconds = timeOutSeconds, - action = () => { - masterProxy ! RegisterWorker(workerId) - }, - onTimeout = () => { - LOG.error(s"Failed to register the worker $workerId after retrying for $timeOutSeconds " + - s"seconds, abort and kill the worker...") - self ! PoisonPill - }) - } - - def terminationWatch(master: ActorRef): Receive = { - case Terminated(actor) => - if (actor.compareTo(master) == 0) { - // Parent master is down, no point to keep worker anymore. Let's make suicide to free - // resources - LOG.info(s"Master cannot be contacted, find a new master ...") - context.become(waitForMasterConfirm(retryRegisterWorker(id, timeOutSeconds = 30))) - } else if (ActorUtil.isChildActorPath(self, actor)) { - // One executor is down, - LOG.info(s"Executor is down ${getExecutorName(actor)}") - - val allocated = allocatedResources.get(actor) - if (allocated.isDefined) { - resource = resource + allocated.get - executorsInfo -= actor - allocatedResources = allocatedResources - actor - sendMsgWithTimeOutCallBack(master, ResourceUpdate(self, id, resource), - resourceUpdateTimeoutMs, updateResourceTimeOut()) - } - } - } - - private def getExecutorName(actorRef: ActorRef): Option[String] = { - executorNameToActor.find(_._2 == actorRef).map(_._1) - } - - private def getExecutorProcLauncher(): ExecutorProcessLauncher = { - val launcherClazz = Class.forName( - systemConfig.getString(GEARPUMP_EXECUTOR_PROCESS_LAUNCHER)) - launcherClazz.getConstructor(classOf[Config]).newInstance(systemConfig) - .asInstanceOf[ExecutorProcessLauncher] - } - - import context.dispatcher - override def preStart(): Unit = { - LOG.info(s"RegisterNewWorker") - totalSlots = systemConfig.getInt(GEARPUMP_WORKER_SLOTS) - this.resource = Resource(totalSlots) - masterProxy ! RegisterNewWorker - context.become(waitForMasterConfirm(registerTimeoutTicker(seconds = 30))) - } - - private def registerTimeoutTicker(seconds: Int): Cancellable = { - repeatActionUtil(seconds, () => Unit, () => { - LOG.error(s"Failed to register new worker to Master after waiting for $seconds seconds, " + - s"abort and kill the worker...") - self ! PoisonPill - }) - } - - private def repeatActionUtil(seconds: Int, action: () => Unit, onTimeout: () => Unit) - : Cancellable = { - val cancelTimeout = context.system.scheduler.schedule(Duration.Zero, - Duration(2, TimeUnit.SECONDS))(action()) - val cancelSuicide = context.system.scheduler.scheduleOnce(seconds.seconds)(onTimeout()) - new Cancellable { - def cancel(): Boolean = { - val result1 = cancelTimeout.cancel() - val result2 = cancelSuicide.cancel() - result1 && result2 - } - - def isCancelled: Boolean = { - cancelTimeout.isCancelled && cancelSuicide.isCancelled - } - } - } - - override def postStop(): Unit = { - LOG.info(s"Worker is going down....") - ioPool.shutdown() - context.system.terminate() - } -} - -private[cluster] object Worker { - - case class ExecutorResult(result: Try[Int]) - - class ExecutorWatcher( - launch: LaunchExecutor, - masterInfo: MasterInfo, - ioPool: ExecutionContext, - jarStoreClient: JarStoreClient, - procLauncher: ExecutorProcessLauncher) extends Actor { - import launch.{appId, executorId, resource} - - private val LOG: Logger = LogUtil.getLogger(getClass, app = appId, executor = executorId) - - val executorConfig: Config = { - val workerConfig = context.system.settings.config - - val submissionConfig = Option(launch.executorJvmConfig).flatMap { jvmConfig => - Option(jvmConfig.executorAkkaConfig) - }.getOrElse(ConfigFactory.empty()) - - resolveExecutorConfig(workerConfig, submissionConfig) - } - - // For some config, worker has priority, for others, user Application submission config - // have priorities. - private def resolveExecutorConfig(workerConfig: Config, submissionConfig: Config): Config = { - val config = submissionConfig.withoutPath(GEARPUMP_HOSTNAME) - .withoutPath(GEARPUMP_CLUSTER_MASTERS) - .withoutPath(GEARPUMP_HOME) - .withoutPath(GEARPUMP_LOG_DAEMON_DIR) - .withoutPath(GEARPUMP_LOG_APPLICATION_DIR) - .withoutPath(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS) - // Falls back to workerConfig - .withFallback(workerConfig) - - // Minimum supported akka.scheduler.tick-duration on Windows is 10ms - val duration = config.getInt(AKKA_SCHEDULER_TICK_DURATION) - val updatedConf = if (akka.util.Helpers.isWindows && duration < 10) { - LOG.warn(s"$AKKA_SCHEDULER_TICK_DURATION on Windows must be larger than 10ms, set to 10ms") - config.withValue(AKKA_SCHEDULER_TICK_DURATION, ConfigValueFactory.fromAnyRef(10)) - } else { - config - } - - // Excludes reference.conf, and JVM properties.. - ClusterConfig.filterOutDefaultConfig(updatedConf) - } - - implicit val executorService = ioPool - - private val executorHandler = { - val ctx = launch.executorJvmConfig - - if (executorConfig.getBoolean(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS)) { - new ExecutorHandler { - val exitPromise = Promise[Int]() - val app = context.actorOf(Props(new InJvmExecutor(launch, exitPromise))) - - override def destroy(): Unit = { - context.stop(app) - } - override def exitValue: Future[Int] = { - exitPromise.future - } - } - } else { - createProcess(ctx) - } - } - - private def createProcess(ctx: ExecutorJVMConfig): ExecutorHandler = { - - val process = Future { - val jarPath = ctx.jar.map { appJar => - val tempFile = File.createTempFile(appJar.name, ".jar") - jarStoreClient.copyToLocalFile(tempFile, appJar.filePath) - val file = new URL("file:" + tempFile) - file.getFile - } - - val configFile = { - val configFile = File.createTempFile("gearpump", ".conf") - ClusterConfig.saveConfig(executorConfig, configFile) - val file = new URL("file:" + configFile) - file.getFile - } - - val classPath = filterOutDaemonLib(Util.getCurrentClassPath) ++ - ctx.classPath.map(path => expandEnviroment(path)) ++ - jarPath.map(Array(_)).getOrElse(Array.empty[String]) - - val appLogDir = executorConfig.getString(GEARPUMP_LOG_APPLICATION_DIR) - val logArgs = List( - s"-D${GEARPUMP_APPLICATION_ID}=${launch.appId}", - s"-D${GEARPUMP_EXECUTOR_ID}=${launch.executorId}", - s"-D${GEARPUMP_MASTER_STARTTIME}=${getFormatedTime(masterInfo.startTime)}", - s"-D${GEARPUMP_LOG_APPLICATION_DIR}=${appLogDir}") - val configArgs = List(s"-D${GEARPUMP_CUSTOM_CONFIG_FILE}=$configFile") - - val username = List(s"-D${GEARPUMP_USERNAME}=${ctx.username}") - - // Remote debug executor process - val remoteDebugFlag = executorConfig.getBoolean(GEARPUMP_REMOTE_DEBUG_EXECUTOR_JVM) - val remoteDebugConfig = if (remoteDebugFlag) { - val availablePort = Util.findFreePort().get - List( - "-Xdebug", - s"-Xrunjdwp:server=y,transport=dt_socket,address=${availablePort},suspend=n", - s"-D${GEARPUMP_REMOTE_DEBUG_PORT}=$availablePort" - ) - } else { - List.empty[String] - } - - val verboseGCFlag = executorConfig.getBoolean(GEARPUMP_VERBOSE_GC) - val verboseGCConfig = if (verboseGCFlag) { - List( - s"-Xloggc:${appLogDir}/gc-app${launch.appId}-executor-${launch.executorId}.log", - "-verbose:gc", - "-XX:+PrintGCDetails", - "-XX:+PrintGCDateStamps", - "-XX:+PrintTenuringDistribution", - "-XX:+PrintGCApplicationConcurrentTime", - "-XX:+PrintGCApplicationStoppedTime" - ) - } else { - List.empty[String] - } - - val ipv4 = List(s"-D${PREFER_IPV4}=true") - - val options = ctx.jvmArguments ++ username ++ - logArgs ++ remoteDebugConfig ++ verboseGCConfig ++ ipv4 ++ configArgs - - val process = procLauncher.createProcess(appId, executorId, resource, executorConfig, - options, classPath, ctx.mainClass, ctx.arguments) - - ProcessInfo(process, jarPath, configFile) - } - - new ExecutorHandler { - - var destroyed = false - - override def destroy(): Unit = { - LOG.info(s"Destroy executor process ${ctx.mainClass}") - if (!destroyed) { - destroyed = true - process.foreach { info => - info.process.destroy() - info.jarPath.foreach(new File(_).delete()) - new File(info.configFile).delete() - } - } - } - - override def exitValue: Future[Int] = { - process.flatMap { info => - val exit = info.process.exitValue() - if (exit == 0) { - Future.successful(0) - } else { - Future.failed[Int](new Exception(s"Executor exit with failure, exit value: $exit, " + - s"error summary: ${info.process.logger.error}")) - } - } - } - } - } - - private def expandEnviroment(path: String): String = { - // TODO: extend this to support more environment. - path.replace(s"<${GEARPUMP_HOME}>", executorConfig.getString(GEARPUMP_HOME)) - } - - override def preStart(): Unit = { - executorHandler.exitValue.onComplete { value => - procLauncher.cleanProcess(appId, executorId) - val result = ExecutorResult(value) - self ! result - } - } - - override def postStop(): Unit = { - executorHandler.destroy() - } - - // The folders are under ${GEARPUMP_HOME} - val daemonPathPattern = List("lib" + File.separator + "daemon", "lib" + - File.separator + "yarn") - - override def receive: Receive = { - case ShutdownExecutor(appId, executorId, reason: String) => - executorHandler.destroy() - sender ! ShutdownExecutorSucceed(appId, executorId) - context.stop(self) - case ExecutorResult(executorResult) => - executorResult match { - case Success(exit) => LOG.info("Executor exit normally with exit value " + exit) - case Failure(e) => LOG.error("Executor exit with errors", e) - } - context.stop(self) - } - - private def getFormatedTime(timestamp: Long): String = { - val datePattern = "yyyy-MM-dd-HH-mm" - val format = new java.text.SimpleDateFormat(datePattern) - format.format(timestamp) - } - - private def filterOutDaemonLib(classPath: Array[String]): Array[String] = { - classPath.filterNot(matchDaemonPattern(_)) - } - - private def matchDaemonPattern(path: String): Boolean = { - daemonPathPattern.exists(path.contains(_)) - } - } - - trait ExecutorHandler { - def destroy(): Unit - def exitValue: Future[Int] - } - - case class ProcessInfo(process: RichProcess, jarPath: Option[String], configFile: String) - - /** - * Starts the executor in the same JVM as worker. - */ - class InJvmExecutor(launch: LaunchExecutor, exit: Promise[Int]) - extends Daemon(launch.executorJvmConfig.arguments(0), launch.executorJvmConfig.arguments(1)) { - private val exitCode = 0 - - override val supervisorStrategy = - OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1.minute) { - case ex: Throwable => - LOG.error(s"system $name stopped ", ex) - exit.failure(ex) - Stop - } - - override def postStop(): Unit = { - if (!exit.isCompleted) { - exit.success(exitCode) - } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala ---------------------------------------------------------------------- diff --git a/daemon/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala b/daemon/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala deleted file mode 100644 index a6b75cb..0000000 --- a/daemon/src/test/scala/org/apache/gearpump/cluster/MiniCluster.scala +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.cluster - -import scala.concurrent.duration.Duration -import scala.concurrent.{Await, Future} - -import akka.actor.{Actor, ActorRef, ActorSystem, Props} -import akka.pattern.ask -import akka.testkit.TestActorRef -import com.typesafe.config.ConfigValueFactory - -import org.apache.gearpump.cluster.AppMasterToMaster.GetAllWorkers -import org.apache.gearpump.cluster.MasterToAppMaster.WorkerList -import org.apache.gearpump.cluster.master.Master -import org.apache.gearpump.cluster.worker.Worker -import org.apache.gearpump.util.Constants - -class MiniCluster { - private val mockMasterIP = "127.0.0.1" - - implicit val system = ActorSystem("system", TestUtil.MASTER_CONFIG. - withValue(Constants.NETTY_TCP_HOSTNAME, ConfigValueFactory.fromAnyRef(mockMasterIP))) - - val (mockMaster, worker) = { - val master = system.actorOf(Props(classOf[Master]), "master") - val worker = system.actorOf(Props(classOf[Worker], master), "worker") - - // Wait until worker register itself to master - waitUtilWorkerIsRegistered(master) - (master, worker) - } - - def launchActor(props: Props): TestActorRef[Actor] = { - TestActorRef(props) - } - - private def waitUtilWorkerIsRegistered(master: ActorRef): Unit = { - while (!isWorkerRegistered(master)) {} - } - - private def isWorkerRegistered(master: ActorRef): Boolean = { - import scala.concurrent.duration._ - implicit val dispatcher = system.dispatcher - - implicit val futureTimeout = Constants.FUTURE_TIMEOUT - - val workerListFuture = (master ? GetAllWorkers).asInstanceOf[Future[WorkerList]] - - // Waits until the worker is registered. - val workers = Await.result[WorkerList](workerListFuture, 15.seconds) - workers.workers.size > 0 - } - - def shutDown(): Unit = { - system.terminate() - Await.result(system.whenTerminated, Duration.Inf) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala ---------------------------------------------------------------------- diff --git a/daemon/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala b/daemon/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala deleted file mode 100644 index 90fdd39..0000000 --- a/daemon/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala +++ /dev/null @@ -1,190 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.cluster.main - -import java.util.Properties - -import akka.testkit.TestProbe -import org.apache.gearpump.cluster.MasterToWorker.WorkerRegistered -import org.apache.gearpump.cluster.master.MasterProxy -import org.apache.gearpump.transport.HostPort - -import scala.concurrent.Future -import scala.util.{Success, Try} - -import com.typesafe.config.{ConfigFactory, Config} -import org.scalatest._ - -import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ShutdownApplication} -import org.apache.gearpump.cluster.MasterToAppMaster.{AppMastersDataRequest, ReplayFromTimestampWindowTrailingEdge, _} -import org.apache.gearpump.cluster.MasterToClient.{ReplayApplicationResult, ResolveAppIdResult, ShutdownApplicationResult} -import org.apache.gearpump.cluster.WorkerToMaster.RegisterNewWorker -import org.apache.gearpump.cluster.{MasterHarness, TestUtil} -import org.apache.gearpump.util.Constants._ -import org.apache.gearpump.util.{Constants, LogUtil, Util} - -class MainSpec extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness { - - private val LOG = LogUtil.getLogger(getClass) - - override def config: Config = TestUtil.DEFAULT_CONFIG - - override def beforeEach(): Unit = { - startActorSystem() - } - - override def afterEach(): Unit = { - shutdownActorSystem() - } - - "Worker" should "register worker address to master when started." in { - - val masterReceiver = createMockMaster() - - val tempTestConf = convertTestConf(getHost, getPort) - - val options = Array( - s"-D$GEARPUMP_CUSTOM_CONFIG_FILE=${tempTestConf.toString}", - s"-D${PREFER_IPV4}=true" - ) ++ getMasterListOption() - - val worker = Util.startProcess(options, - getContextClassPath, - getMainClassName(Worker), - Array.empty) - - try { - masterReceiver.expectMsg(PROCESS_BOOT_TIME, RegisterNewWorker) - - tempTestConf.delete() - } finally { - worker.destroy() - } - } - - "Master" should "accept worker RegisterNewWorker when started" in { - val worker = TestProbe()(getActorSystem) - - val host = "127.0.0.1" - val port = Util.findFreePort().get - - val properties = new Properties() - properties.put(s"${GEARPUMP_CLUSTER_MASTERS}.0", s"$host:$port") - properties.put(s"${GEARPUMP_HOSTNAME}", s"$host") - val masterConfig = ConfigFactory.parseProperties(properties) - .withFallback(TestUtil.MASTER_CONFIG) - Future { - Master.main(masterConfig, Array("-ip", "127.0.0.1", "-port", port.toString)) - } - - val masterProxy = getActorSystem.actorOf( - MasterProxy.props(List(HostPort("127.0.0.1", port))), "mainSpec") - - worker.send(masterProxy, RegisterNewWorker) - worker.expectMsgType[WorkerRegistered](PROCESS_BOOT_TIME) - } - - "Info" should "be started without exception" in { - - val masterReceiver = createMockMaster() - - Future { - org.apache.gearpump.cluster.main.Info.main(masterConfig, Array.empty) - } - - masterReceiver.expectMsg(PROCESS_BOOT_TIME, AppMastersDataRequest) - masterReceiver.reply(AppMastersData(List(AppMasterData(AppMasterActive, 0, "appName")))) - } - - "Kill" should "be started without exception" in { - - val masterReceiver = createMockMaster() - - Future { - Kill.main(masterConfig, Array("-appid", "0")) - } - - masterReceiver.expectMsg(PROCESS_BOOT_TIME, ShutdownApplication(0)) - masterReceiver.reply(ShutdownApplicationResult(Success(0))) - } - - "Replay" should "be started without exception" in { - - val masterReceiver = createMockMaster() - - Future { - Replay.main(masterConfig, Array("-appid", "0")) - } - - masterReceiver.expectMsgType[ResolveAppId](PROCESS_BOOT_TIME) - masterReceiver.reply(ResolveAppIdResult(Success(masterReceiver.ref))) - masterReceiver.expectMsgType[ReplayFromTimestampWindowTrailingEdge](PROCESS_BOOT_TIME) - masterReceiver.reply(ReplayApplicationResult(Success(0))) - } - - "Local" should "be started without exception" in { - val port = Util.findFreePort().get - val options = Array(s"-D${Constants.GEARPUMP_CLUSTER_MASTERS}.0=$getHost:$port", - s"-D${Constants.GEARPUMP_HOSTNAME}=$getHost", - s"-D${PREFER_IPV4}=true") - - val local = Util.startProcess(options, - getContextClassPath, - getMainClassName(Local), - Array.empty) - - def retry(times: Int)(fn: => Boolean): Boolean = { - - LOG.info(s"Local Test: Checking whether local port is available, remain times $times ..") - - val result = fn - if (result || times <= 0) { - result - } else { - Thread.sleep(1000) - retry(times - 1)(fn) - } - } - - try { - assert(retry(10)(isPortUsed("127.0.0.1", port)), - "local is not started successfully, as port is not used " + port) - } finally { - local.destroy() - } - } - - "Gear" should "support app|info|kill|shell|replay" in { - - val commands = Array("app", "info", "kill", "shell", "replay") - - assert(Try(Gear.main(Array.empty)).isSuccess, "print help, no throw") - - for (command <- commands) { - assert(Try(Gear.main(Array("-noexist"))).isFailure, - "pass unknown option, throw, command: " + command) - } - - assert(Try(Gear.main(Array("unknownCommand"))).isFailure, "unknown command, throw ") - - val tryThis = Try(Gear.main(Array("unknownCommand", "-noexist"))) - assert(tryThis.isFailure, "unknown command, throw") - } -} - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala ---------------------------------------------------------------------- diff --git a/daemon/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala b/daemon/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala deleted file mode 100644 index e1ba8f6..0000000 --- a/daemon/src/test/scala/org/apache/gearpump/cluster/main/MasterWatcherSpec.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.cluster.main - -import scala.concurrent.Await -import scala.concurrent.duration._ - -import akka.actor.{ActorSystem, Props} -import akka.testkit.TestProbe -import com.typesafe.config.Config -import org.scalatest.{FlatSpec, Matchers} - -import org.apache.gearpump.cluster.TestUtil - -class MasterWatcherSpec extends FlatSpec with Matchers { - def config: Config = TestUtil.MASTER_CONFIG - - "MasterWatcher" should "kill itself when can not get a quorum" in { - val system = ActorSystem("ForMasterWatcher", config) - - val actorWatcher = TestProbe()(system) - - val masterWatcher = system.actorOf(Props(classOf[MasterWatcher], "watcher")) - actorWatcher watch masterWatcher - actorWatcher.expectTerminated(masterWatcher, 5.seconds) - system.terminate() - Await.result(system.whenTerminated, Duration.Inf) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c3d5eb63/daemon/src/test/scala/org/apache/gearpump/cluster/master/AppManagerSpec.scala ---------------------------------------------------------------------- diff --git a/daemon/src/test/scala/org/apache/gearpump/cluster/master/AppManagerSpec.scala b/daemon/src/test/scala/org/apache/gearpump/cluster/master/AppManagerSpec.scala deleted file mode 100644 index 58e3593..0000000 --- a/daemon/src/test/scala/org/apache/gearpump/cluster/master/AppManagerSpec.scala +++ /dev/null @@ -1,184 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.cluster.master - -import scala.util.Success - -import akka.actor.{Actor, ActorRef, Props} -import akka.testkit.TestProbe -import com.typesafe.config.Config -import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} - -import org.apache.gearpump.cluster.AppMasterToMaster.{AppDataSaved, _} -import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ShutdownApplication, SubmitApplication} -import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterRegistered, AppMastersData, AppMastersDataRequest, _} -import org.apache.gearpump.cluster.MasterToClient.{ResolveAppIdResult, ShutdownApplicationResult, SubmitApplicationResult} -import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeInfo, ApplicationState} -import org.apache.gearpump.cluster.master.AppManager._ -import org.apache.gearpump.cluster.master.InMemoryKVService.{GetKV, GetKVSuccess, PutKV, PutKVSuccess} -import org.apache.gearpump.cluster.{TestUtil, _} -import org.apache.gearpump.util.LogUtil - -class AppManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach with MasterHarness { - var kvService: TestProbe = null - var haService: TestProbe = null - var appLauncher: TestProbe = null - var appManager: ActorRef = null - private val LOG = LogUtil.getLogger(getClass) - - override def config: Config = TestUtil.DEFAULT_CONFIG - - override def beforeEach(): Unit = { - startActorSystem() - kvService = TestProbe()(getActorSystem) - appLauncher = TestProbe()(getActorSystem) - - appManager = getActorSystem.actorOf(Props(new AppManager(kvService.ref, - new DummyAppMasterLauncherFactory(appLauncher)))) - kvService.expectMsgType[GetKV] - kvService.reply(GetKVSuccess(MASTER_STATE, MasterState(0, Map.empty, Set.empty, Set.empty))) - } - - override def afterEach(): Unit = { - shutdownActorSystem() - } - - "AppManager" should "handle AppMaster message correctly" in { - val appMaster = TestProbe()(getActorSystem) - val appId = 1 - - val register = RegisterAppMaster(appMaster.ref, AppMasterRuntimeInfo(appId, "appName")) - appMaster.send(appManager, register) - appMaster.expectMsgType[AppMasterRegistered] - - appMaster.send(appManager, ActivateAppMaster(appId)) - appMaster.expectMsgType[AppMasterActivated] - } - - "DataStoreService" should "support Put and Get" in { - val appMaster = TestProbe()(getActorSystem) - appMaster.send(appManager, SaveAppData(0, "key", 1)) - kvService.expectMsgType[PutKV] - kvService.reply(PutKVSuccess) - appMaster.expectMsg(AppDataSaved) - - appMaster.send(appManager, GetAppData(0, "key")) - kvService.expectMsgType[GetKV] - kvService.reply(GetKVSuccess("key", 1)) - appMaster.expectMsg(GetAppDataResult("key", 1)) - } - - "AppManager" should "support application submission and shutdown" in { - testClientSubmission(withRecover = false) - } - - "AppManager" should "support application submission and recover if appmaster dies" in { - LOG.info("=================testing recover==============") - testClientSubmission(withRecover = true) - } - - "AppManager" should "handle client message correctly" in { - val mockClient = TestProbe()(getActorSystem) - mockClient.send(appManager, ShutdownApplication(1)) - assert(mockClient.receiveN(1).head.asInstanceOf[ShutdownApplicationResult].appId.isFailure) - - mockClient.send(appManager, ResolveAppId(1)) - assert(mockClient.receiveN(1).head.asInstanceOf[ResolveAppIdResult].appMaster.isFailure) - - mockClient.send(appManager, AppMasterDataRequest(1)) - mockClient.expectMsg(AppMasterData(AppMasterNonExist)) - } - - "AppManager" should "reject the application submission if the app name already existed" in { - val app = TestUtil.dummyApp - val submit = SubmitApplication(app, None, "username") - val client = TestProbe()(getActorSystem) - val appMaster = TestProbe()(getActorSystem) - val worker = TestProbe()(getActorSystem) - val appId = 1 - - client.send(appManager, submit) - - kvService.expectMsgType[PutKV] - appLauncher.expectMsg(LauncherStarted(appId)) - appMaster.send(appManager, RegisterAppMaster(appMaster.ref, - AppMasterRuntimeInfo(appId, app.name))) - appMaster.expectMsgType[AppMasterRegistered] - - client.send(appManager, submit) - assert(client.receiveN(1).head.asInstanceOf[SubmitApplicationResult].appId.isFailure) - } - - def testClientSubmission(withRecover: Boolean): Unit = { - val app = TestUtil.dummyApp - val submit = SubmitApplication(app, None, "username") - val client = TestProbe()(getActorSystem) - val appMaster = TestProbe()(getActorSystem) - val worker = TestProbe()(getActorSystem) - val appId = 1 - - client.send(appManager, submit) - - kvService.expectMsgType[PutKV] - appLauncher.expectMsg(LauncherStarted(appId)) - appMaster.send(appManager, RegisterAppMaster(appMaster.ref, - AppMasterRuntimeInfo(appId, app.name))) - kvService.expectMsgType[PutKV] - appMaster.expectMsgType[AppMasterRegistered] - - client.send(appManager, ResolveAppId(appId)) - client.expectMsg(ResolveAppIdResult(Success(appMaster.ref))) - - client.send(appManager, AppMastersDataRequest) - client.expectMsgType[AppMastersData] - - client.send(appManager, AppMasterDataRequest(appId, false)) - client.expectMsgType[AppMasterData] - - if (!withRecover) { - client.send(appManager, ShutdownApplication(appId)) - client.expectMsg(ShutdownApplicationResult(Success(appId))) - } else { - // Do recovery - getActorSystem.stop(appMaster.ref) - kvService.expectMsgType[GetKV] - val appState = ApplicationState(appId, "application1", 1, app, None, "username", null) - kvService.reply(GetKVSuccess(APP_STATE, appState)) - appLauncher.expectMsg(LauncherStarted(appId)) - } - } -} - -class DummyAppMasterLauncherFactory(test: TestProbe) extends AppMasterLauncherFactory { - - override def props(appId: Int, executorId: Int, app: AppDescription, jar: Option[AppJar], - username: String, master: ActorRef, client: Option[ActorRef]): Props = { - Props(new DummyAppMasterLauncher(test, appId)) - } -} - -class DummyAppMasterLauncher(test: TestProbe, appId: Int) extends Actor { - - test.ref ! LauncherStarted(appId) - override def receive: Receive = { - case any: Any => test.ref forward any - } -} - -case class LauncherStarted(appId: Int)
