GEARPUMP-8, fix "two machines can possibly have same worker id for single-master cluster"
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/e7a7f542 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/e7a7f542 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/e7a7f542 Branch: refs/heads/master Commit: e7a7f54272a263eebb156510abf88616088dff34 Parents: 02597f7 Author: Sean Zhong <[email protected]> Authored: Sat Apr 2 12:34:19 2016 +0800 Committer: manuzhang <[email protected]> Committed: Tue Apr 26 14:24:43 2016 +0800 ---------------------------------------------------------------------- .gitignore | 1 + .sbtopts | 1 + conf/gear.conf | 2 +- .../io/gearpump/cluster/ClusterMessage.scala | 10 ++--- .../cluster/appmaster/ExecutorSystem.scala | 3 +- .../appmaster/ExecutorSystemScheduler.scala | 3 +- .../cluster/master/AppMasterLauncher.scala | 6 +-- .../gearpump/cluster/master/MasterSummary.scala | 6 ++- .../gearpump/cluster/scheduler/Resource.scala | 5 ++- .../gearpump/cluster/worker/WorkerSummary.scala | 5 ++- core/src/main/scala/io/gearpump/package.scala | 44 ++++++++++++++++++++ .../main/scala/io/gearpump/util/ActorUtil.scala | 3 +- .../appmaster/ExecutorSystemLauncherSpec.scala | 3 +- .../appmaster/ExecutorSystemSchedulerSpec.scala | 5 ++- .../cluster/master/AppMasterLauncherSpec.scala | 9 ++-- .../io/gearpump/cluster/DaemonMessage.scala | 7 ++-- .../io/gearpump/cluster/master/Master.scala | 14 +++++-- .../cluster/scheduler/PriorityScheduler.scala | 3 +- .../gearpump/cluster/scheduler/Scheduler.scala | 4 +- .../io/gearpump/cluster/worker/Worker.scala | 3 +- .../io/gearpump/cluster/main/MainSpec.scala | 11 ----- .../scheduler/PrioritySchedulerSpec.scala | 29 ++++++------- .../io/gearpump/cluster/worker/WorkerSpec.scala | 3 +- docs/dev-rest-api.md | 14 +++---- .../DistShellAppMasterSpec.scala | 5 ++- .../distributedshell/ShellExecutorSpec.scala | 3 +- .../DistServiceAppMasterSpec.scala | 5 ++- .../checklist/RestServiceSpec.scala | 2 +- .../minicluster/BaseContainer.scala | 2 +- .../minicluster/MiniCluster.scala | 17 ++++---- .../minicluster/RestClient.scala | 10 +++-- .../integrationtest/storm/StormClient.scala | 2 +- project/Build.scala | 4 +- .../io/gearpump/services/AdminService.scala | 2 +- .../io/gearpump/services/AppMasterService.scala | 2 +- .../io/gearpump/services/MasterService.scala | 2 +- .../io/gearpump/services/SecurityService.scala | 2 + .../io/gearpump/services/StaticService.scala | 1 + .../gearpump/services/SupervisorService.scala | 6 ++- .../io/gearpump/services/WorkerService.scala | 7 +++- .../io/gearpump/services/util/UpickleUtil.scala | 11 +++++ .../io/gearpump/services/AdminServiceSpec.scala | 2 +- .../services/AppMasterServiceSpec.scala | 2 +- .../gearpump/services/MasterServiceSpec.scala | 6 ++- .../gearpump/services/SecurityServiceSpec.scala | 1 + .../gearpump/services/StaticServiceSpec.scala | 1 + .../gearpump/services/WorkerServiceSpec.scala | 12 +++--- .../streaming/appmaster/AppMaster.scala | 7 ++-- .../streaming/appmaster/ExecutorManager.scala | 3 +- .../streaming/appmaster/JarScheduler.scala | 6 +-- .../streaming/appmaster/TaskLocator.scala | 9 ++-- .../streaming/appmaster/TaskSchedulerImpl.scala | 20 ++++----- .../gearpump/streaming/executor/Executor.scala | 5 ++- .../streaming/appmaster/AppMasterSpec.scala | 8 ++-- .../appmaster/ExecutorManagerSpec.scala | 6 +-- .../streaming/appmaster/JarSchedulerSpec.scala | 5 ++- .../streaming/appmaster/TaskLocatorSpec.scala | 3 +- .../streaming/appmaster/TaskManagerSpec.scala | 6 +-- .../streaming/appmaster/TaskSchedulerSpec.scala | 26 ++++++------ .../streaming/executor/ExecutorSpec.scala | 3 +- 60 files changed, 251 insertions(+), 157 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index 0932561..1efaafc 100644 --- a/.gitignore +++ b/.gitignore @@ -33,6 +33,7 @@ out/ .idea_modules/ atlassian-ide-plugin.xml com_crashlytics_export_strings.xml +intellij # Eclipse .metadata http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/.sbtopts ---------------------------------------------------------------------- diff --git a/.sbtopts b/.sbtopts index 161b349..f6f24bf 100644 --- a/.sbtopts +++ b/.sbtopts @@ -1,3 +1,4 @@ -J-XX:+CMSClassUnloadingEnabled -J-XX:+UseConcMarkSweepGC -J-Xss6M +-J-XX:MaxMetaspaceSize=512m http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/conf/gear.conf ---------------------------------------------------------------------- diff --git a/conf/gear.conf b/conf/gear.conf index e7f14af..c2352fe 100644 --- a/conf/gear.conf +++ b/conf/gear.conf @@ -547,4 +547,4 @@ gearpump-linux { ### On windows, the value must be larger than 10ms, check ### https://github.com/akka/akka/blob/master/akka-actor/src/main/scala/akka/actor/Scheduler.scala#L204 akka.scheduler.tick-duration = 1 -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/core/src/main/scala/io/gearpump/cluster/ClusterMessage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/ClusterMessage.scala b/core/src/main/scala/io/gearpump/cluster/ClusterMessage.scala index 2280920..aac45ab 100644 --- a/core/src/main/scala/io/gearpump/cluster/ClusterMessage.scala +++ b/core/src/main/scala/io/gearpump/cluster/ClusterMessage.scala @@ -20,7 +20,7 @@ package io.gearpump.cluster import akka.actor.ActorRef import com.typesafe.config.Config -import io.gearpump.TimeStamp +import io.gearpump.{WorkerId, TimeStamp} import io.gearpump.cluster.MasterToAppMaster.AppMasterStatus import io.gearpump.cluster.master.MasterSummary import io.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest} @@ -53,7 +53,7 @@ object ClientToMaster { case class ShutdownApplication(appId: Int) case class ResolveAppId(appId: Int) - case class ResolveWorkerId(workerId: Int) + case class ResolveWorkerId(workerId: WorkerId) case object GetJarStoreServer @@ -61,7 +61,7 @@ object ClientToMaster { case class QueryAppMasterConfig(appId: Int) - case class QueryWorkerConfig(workerId: Int) + case class QueryWorkerConfig(workerId: WorkerId) case object QueryMasterConfig @@ -144,7 +144,7 @@ object AppMasterToMaster { extends AppMasterSummary case object GetAllWorkers - case class GetWorkerData(workerId: Int) + case class GetWorkerData(workerId: WorkerId) case class WorkerData(workerDescription: WorkerSummary) case object GetMasterData @@ -181,7 +181,7 @@ object MasterToAppMaster { case class ReplayFromTimestampWindowTrailingEdge(appId: Int) - case class WorkerList(workers: List[Int]) + case class WorkerList(workers: List[WorkerId]) } object AppMasterToWorker { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystem.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystem.scala b/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystem.scala index 94f7aa4..400b61c 100644 --- a/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystem.scala +++ b/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystem.scala @@ -19,10 +19,11 @@ package io.gearpump.cluster.appmaster import akka.actor.{ActorRef, Address, PoisonPill} +import io.gearpump.WorkerId import io.gearpump.cluster.scheduler.Resource import io.gearpump.util.ActorSystemBooter.BindLifeCycle -case class WorkerInfo(workerId: Int, ref: ActorRef) +case class WorkerInfo(workerId: WorkerId, ref: ActorRef) /** * This contains JVM configurations to start an executor system http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala b/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala index 6af68eb..aa980b5 100644 --- a/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala +++ b/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala @@ -20,6 +20,7 @@ package io.gearpump.cluster.appmaster import akka.actor._ import com.typesafe.config.Config +import io.gearpump.WorkerId import io.gearpump.cluster.AppMasterToMaster.RequestResource import io.gearpump.cluster.MasterToAppMaster.ResourceAllocated import io.gearpump.cluster._ @@ -105,7 +106,7 @@ class ExecutorSystemScheduler (appId: Int, masterProxy: ActorRef, case LaunchExecutorSystemRejected(resource, reason, session) => if (isSessionAlive(session)) { LOG.error(s"Failed to launch executor system, due to $reason, will ask master to allocate new resources $resource") - resourceAgents.get(session).map(_ ! RequestResource(appId, ResourceRequest(resource))) + resourceAgents.get(session).map(_ ! RequestResource(appId, ResourceRequest(resource, WorkerId.unspecified))) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/core/src/main/scala/io/gearpump/cluster/master/AppMasterLauncher.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/master/AppMasterLauncher.scala b/core/src/main/scala/io/gearpump/cluster/master/AppMasterLauncher.scala index ffd1835..d391bba 100644 --- a/core/src/main/scala/io/gearpump/cluster/master/AppMasterLauncher.scala +++ b/core/src/main/scala/io/gearpump/cluster/master/AppMasterLauncher.scala @@ -40,7 +40,7 @@ import org.slf4j.Logger import scala.collection.JavaConverters._ import scala.concurrent.duration.Duration import scala.util.{Failure, Success} - +import io.gearpump.WorkerId /** * * AppMasterLauncher is a child Actor of AppManager, it is responsible @@ -59,7 +59,7 @@ class AppMasterLauncher( val appMasterAkkaConfig: Config = app.clusterConfig LOG.info(s"Ask Master resource to start AppMaster $appId...") - master ! RequestResource(appId, ResourceRequest(Resource(1))) + master ! RequestResource(appId, ResourceRequest(Resource(1), WorkerId.unspecified)) def receive : Receive = waitForResourceAllocation @@ -91,7 +91,7 @@ class AppMasterLauncher( case ExecutorLaunchRejected(reason, ex) => LOG.error(s"Executor Launch failed reasonï¼$reason", ex) LOG.info(s"reallocate resource $resource to start appmaster") - master ! RequestResource(appId, ResourceRequest(resource)) + master ! RequestResource(appId, ResourceRequest(resource, WorkerId.unspecified)) context.become(waitForResourceAllocation) case RegisterActorSystem(systemPath) => LOG.info(s"Received RegisterActorSystem $systemPath for AppMaster") http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/core/src/main/scala/io/gearpump/cluster/master/MasterSummary.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/master/MasterSummary.scala b/core/src/main/scala/io/gearpump/cluster/master/MasterSummary.scala index 39e4a41..8847df2 100644 --- a/core/src/main/scala/io/gearpump/cluster/master/MasterSummary.scala +++ b/core/src/main/scala/io/gearpump/cluster/master/MasterSummary.scala @@ -26,7 +26,11 @@ object MasterStatus { } -case class MasterNode(host: String, port: Int) +case class MasterNode(host: String, port: Int) { + def toTuple: (String, Int) = { + (host, port) + } +} /** * Master information for REST API call http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala b/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala index 94c7532..da17829 100644 --- a/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala +++ b/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala @@ -18,6 +18,7 @@ package io.gearpump.cluster.scheduler import akka.actor.ActorRef +import io.gearpump.WorkerId case class Resource(slots : Int) { def +(other : Resource): Resource = Resource(slots + other.slots) @@ -51,9 +52,9 @@ object Relaxation extends Enumeration{ import Relaxation._ import Priority._ -case class ResourceRequest(resource: Resource, workerId: Int = 0, priority: Priority = NORMAL, relaxation: Relaxation = ANY, executorNum: Int = 1) +case class ResourceRequest(resource: Resource, workerId: WorkerId, priority: Priority = NORMAL, relaxation: Relaxation = ANY, executorNum: Int = 1) -case class ResourceAllocation(resource : Resource, worker : ActorRef, workerId : Int) +case class ResourceAllocation(resource : Resource, worker : ActorRef, workerId : WorkerId) object Resource { def empty = new Resource(0) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/core/src/main/scala/io/gearpump/cluster/worker/WorkerSummary.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/worker/WorkerSummary.scala b/core/src/main/scala/io/gearpump/cluster/worker/WorkerSummary.scala index b1f9e6f..ca700dc 100644 --- a/core/src/main/scala/io/gearpump/cluster/worker/WorkerSummary.scala +++ b/core/src/main/scala/io/gearpump/cluster/worker/WorkerSummary.scala @@ -18,13 +18,14 @@ package io.gearpump.cluster.worker import akka.actor.ActorRef +import io.gearpump.WorkerId import io.gearpump.util.HistoryMetricsService.HistoryMetricsConfig /** * Worker summary information for REST API. */ case class WorkerSummary( - workerId: Int, + workerId: WorkerId, state: String, actorPath: String, aliveFor: Long, @@ -39,7 +40,7 @@ case class WorkerSummary( historyMetricsConfig: HistoryMetricsConfig = null) object WorkerSummary{ - def empty = WorkerSummary(-1, "", "", 0L, "", Array.empty[ExecutorSlots], 0, 0, "", jvmName = "", resourceManagerContainerId = "") + def empty = WorkerSummary(WorkerId.unspecified, "", "", 0L, "", Array.empty[ExecutorSlots], 0, 0, "", jvmName = "", resourceManagerContainerId = "") } case class ExecutorSlots(appId: Int, executorId: Int, slots: Int) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/core/src/main/scala/io/gearpump/package.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/package.scala b/core/src/main/scala/io/gearpump/package.scala index 05130a9..1ed94a7 100644 --- a/core/src/main/scala/io/gearpump/package.scala +++ b/core/src/main/scala/io/gearpump/package.scala @@ -3,4 +3,48 @@ package io package object gearpump { type TimeStamp = Long val LatestTime = -1 + + /** + * WorkerId is used to uniquely track a worker machine. + * + * @param sessionId sessionId is assigned by Master node for easy tracking. It is possible that + * sessionId is **NOT** unique, so always use WorkerId for comparison. + * @param registerTime the timestamp when a worker node register itself to master node + */ + case class WorkerId(sessionId: Int, registerTime: Long) + + object WorkerId { + val unspecified: WorkerId = new WorkerId(-1, 0L) + + def render(workerId: WorkerId): String = { + workerId.registerTime + "_" + workerId.sessionId + } + + def parse(str: String): WorkerId = { + val pair = str.split("_") + new WorkerId(pair(1).toInt, pair(0).toLong) + } + + implicit val workerIdOrdering: Ordering[WorkerId] = { + new Ordering[WorkerId] { + + /** Compare timestamp first, then id */ + override def compare(x: WorkerId, y: WorkerId): Int = { + if (x.registerTime < y.registerTime) { + -1 + } else if (x.registerTime == y.registerTime) { + if (x.sessionId < y.sessionId) { + -1 + } else if (x.sessionId == y.sessionId) { + 0 + } else { + 1 + } + } else { + 1 + } + } + } + } + } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/core/src/main/scala/io/gearpump/util/ActorUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/util/ActorUtil.scala b/core/src/main/scala/io/gearpump/util/ActorUtil.scala index 8233b28..cc701d8 100644 --- a/core/src/main/scala/io/gearpump/util/ActorUtil.scala +++ b/core/src/main/scala/io/gearpump/util/ActorUtil.scala @@ -21,6 +21,7 @@ package io.gearpump.util import akka.actor.Actor.Receive import akka.actor._ import akka.pattern.ask +import io.gearpump.WorkerId import io.gearpump.cluster.AppMasterToMaster.GetAllWorkers import io.gearpump.cluster.ClientToMaster.{ResolveAppId, ResolveWorkerId} import io.gearpump.cluster.MasterToAppMaster.WorkerList @@ -110,7 +111,7 @@ object ActorUtil { appmaster.flatMap(askActor[T](_, msg)) } - def askWorker[T](master: ActorRef, workerId: Int, msg: Any)(implicit ex: ExecutionContext): Future[T] = { + def askWorker[T](master: ActorRef, workerId: WorkerId, msg: Any)(implicit ex: ExecutionContext): Future[T] = { implicit val timeout = Constants.FUTURE_TIMEOUT val worker = askActor[ResolveWorkerIdResult](master, ResolveWorkerId(workerId)).flatMap { result => if (result.worker.isSuccess) { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncherSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncherSpec.scala b/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncherSpec.scala index d0fded1..dc8e624 100644 --- a/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncherSpec.scala +++ b/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemLauncherSpec.scala @@ -21,6 +21,7 @@ package io.gearpump.cluster.appmaster import akka.actor.{ActorSystem, Props} import akka.testkit.TestProbe import com.typesafe.config.ConfigValueFactory +import io.gearpump.WorkerId import io.gearpump.cluster.AppMasterToWorker.LaunchExecutor import io.gearpump.cluster.AppMasterToWorker.LaunchExecutor import io.gearpump.cluster.TestUtil @@ -36,7 +37,7 @@ import scala.concurrent.duration._ class ExecutorSystemLauncherSpec extends FlatSpec with Matchers with BeforeAndAfterAll { implicit var system: ActorSystem = null - val workerId = 0 + val workerId: WorkerId = WorkerId(0, 0L) val appId = 0 val executorId = 0 val url = "akka.tcp://[email protected]:3000" http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemSchedulerSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemSchedulerSpec.scala b/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemSchedulerSpec.scala index d487582..106d389 100644 --- a/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemSchedulerSpec.scala +++ b/core/src/test/scala/io/gearpump/cluster/appmaster/ExecutorSystemSchedulerSpec.scala @@ -20,6 +20,7 @@ package io.gearpump.cluster.appmaster import akka.actor.{Actor, ActorSystem, Props} import akka.testkit.TestProbe +import io.gearpump.WorkerId import io.gearpump.cluster.AppMasterToMaster.RequestResource import io.gearpump.cluster.MasterToAppMaster.ResourceAllocated import io.gearpump.cluster.{ExecutorJVMConfig, AppJar, TestUtil} @@ -34,9 +35,9 @@ import scala.concurrent.duration._ class ExecutorSystemSchedulerSpec extends FlatSpec with Matchers with BeforeAndAfterEach { val appId = 0 - val workerId = 0 + val workerId = WorkerId(0, 0L) val resource = Resource(1) - val resourceRequest = ResourceRequest(resource) + val resourceRequest = ResourceRequest(resource, WorkerId.unspecified) val mockJar = AppJar("for_test", FilePath("PATH")) val emptyJvmConfig = ExecutorSystemJvmConfig(Array.empty, Array.empty, Some(mockJar), "") val start = StartExecutorSystems(Array(resourceRequest), emptyJvmConfig) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/core/src/test/scala/io/gearpump/cluster/master/AppMasterLauncherSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/gearpump/cluster/master/AppMasterLauncherSpec.scala b/core/src/test/scala/io/gearpump/cluster/master/AppMasterLauncherSpec.scala index 8db0e0b..8ff2ea1 100644 --- a/core/src/test/scala/io/gearpump/cluster/master/AppMasterLauncherSpec.scala +++ b/core/src/test/scala/io/gearpump/cluster/master/AppMasterLauncherSpec.scala @@ -20,6 +20,7 @@ package io.gearpump.cluster.master import akka.actor._ import akka.testkit.TestProbe +import io.gearpump.WorkerId import io.gearpump.cluster.AppMasterToMaster.RequestResource import io.gearpump.cluster.AppMasterToWorker.{ShutdownExecutor, LaunchExecutor} import io.gearpump.cluster.MasterToClient.SubmitApplicationResult @@ -53,8 +54,8 @@ class AppMasterLauncherSpec extends FlatSpec with Matchers with BeforeAndAfterEa appMasterLauncher = getActorSystem.actorOf(AppMasterLauncher.props(appId, executorId, TestUtil.dummyApp, None, "username", master.ref, Some(client.ref))) watcher watch appMasterLauncher - master.expectMsg(RequestResource(appId, ResourceRequest(Resource(1)))) - val resource = ResourceAllocated(Array(ResourceAllocation(Resource(1), worker.ref, 0))) + master.expectMsg(RequestResource(appId, ResourceRequest(Resource(1), WorkerId.unspecified))) + val resource = ResourceAllocated(Array(ResourceAllocation(Resource(1), worker.ref, WorkerId(0, 0L)))) master.reply(resource) worker.expectMsgType[LaunchExecutor] } @@ -76,9 +77,9 @@ class AppMasterLauncherSpec extends FlatSpec with Matchers with BeforeAndAfterEa "AppMasterLauncher" should "reallocate resource if executor launch rejected" in { worker.reply(ExecutorLaunchRejected("")) - master.expectMsg(RequestResource(appId, ResourceRequest(Resource(1)))) + master.expectMsg(RequestResource(appId, ResourceRequest(Resource(1), WorkerId.unspecified))) - val resource = ResourceAllocated(Array(ResourceAllocation(Resource(1), worker.ref, 0))) + val resource = ResourceAllocated(Array(ResourceAllocation(Resource(1), worker.ref, WorkerId(0, 0L)))) master.reply(resource) worker.expectMsgType[LaunchExecutor] http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/daemon/src/main/scala/io/gearpump/cluster/DaemonMessage.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/DaemonMessage.scala b/daemon/src/main/scala/io/gearpump/cluster/DaemonMessage.scala index 19ac620..ac942ed 100644 --- a/daemon/src/main/scala/io/gearpump/cluster/DaemonMessage.scala +++ b/daemon/src/main/scala/io/gearpump/cluster/DaemonMessage.scala @@ -18,6 +18,7 @@ package io.gearpump.cluster import akka.actor.ActorRef +import io.gearpump.WorkerId import io.gearpump.cluster.master.Master.MasterInfo import io.gearpump.cluster.scheduler.Resource @@ -26,12 +27,12 @@ import io.gearpump.cluster.scheduler.Resource */ object WorkerToMaster { case object RegisterNewWorker - case class RegisterWorker(workerId: Int) - case class ResourceUpdate(worker: ActorRef, workerId: Int, resource: Resource) + case class RegisterWorker(workerId: WorkerId) + case class ResourceUpdate(worker: ActorRef, workerId: WorkerId, resource: Resource) } object MasterToWorker { - case class WorkerRegistered(workerId : Int, masterInfo: MasterInfo) + case class WorkerRegistered(workerId : WorkerId, masterInfo: MasterInfo) case class UpdateResourceFailed(reason : String = null, ex: Throwable = null) case object UpdateResourceSucceed } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala b/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala index 8db6758..fdef42e 100644 --- a/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala +++ b/daemon/src/main/scala/io/gearpump/cluster/master/Master.scala @@ -40,6 +40,8 @@ import io.gearpump.transport.HostPort import io.gearpump.util.Constants._ import io.gearpump.util.HistoryMetricsService.HistoryMetricsConfig import io.gearpump.util._ +import io.gearpump.WorkerId + import org.apache.commons.lang.exception.ExceptionUtils import org.slf4j.Logger @@ -64,7 +66,7 @@ private[cluster] class Master extends Actor with Stash { private var scheduler : ActorRef = null - private var workers = new immutable.HashMap[ActorRef, Int] + private var workers = new immutable.HashMap[ActorRef, WorkerId] private val birth = System.currentTimeMillis() @@ -109,6 +111,8 @@ private[cluster] class Master extends Actor with Stash { case GetKVSuccess(_, result) => if(result != null) { this.nextWorkerId = result.asInstanceOf[Int] + } else { + LOG.warn("Cannot find existing state in the distributed cluster...") } context.become(receiveHandler) unstashAll() @@ -132,9 +136,11 @@ private[cluster] class Master extends Actor with Stash { def workerMsgHandler : Receive = { case RegisterNewWorker => - val workerId = nextWorkerId + val workerId = WorkerId(nextWorkerId, System.currentTimeMillis()) nextWorkerId += 1 kvService ! PutKV(MASTER_GROUP, WORKER_ID, nextWorkerId) + val workerHostname = ActorUtil.getHostname(sender()) + LOG.info(s"Register new from $workerHostname ....") self forward RegisterWorker(workerId) case RegisterWorker(id) => @@ -143,7 +149,7 @@ private[cluster] class Master extends Actor with Stash { scheduler forward WorkerRegistered(id, MasterInfo(self, birth)) workers += (sender() -> id) val workerHostname = ActorUtil.getHostname(sender()) - LOG.info(s"Register Worker $id from $workerHostname ....") + LOG.info(s"Register Worker with id $id from $workerHostname ....") case resourceUpdate : ResourceUpdate => scheduler forward resourceUpdate } @@ -290,7 +296,7 @@ object Master { final val WORKER_ID = "next_worker_id" - case class WorkerTerminated(workerId: Int) + case class WorkerTerminated(workerId: WorkerId) case class MasterInfo(master: ActorRef, startTime : Long = 0L) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/daemon/src/main/scala/io/gearpump/cluster/scheduler/PriorityScheduler.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/scheduler/PriorityScheduler.scala b/daemon/src/main/scala/io/gearpump/cluster/scheduler/PriorityScheduler.scala index 10e9dcb..3b1bd9f 100644 --- a/daemon/src/main/scala/io/gearpump/cluster/scheduler/PriorityScheduler.scala +++ b/daemon/src/main/scala/io/gearpump/cluster/scheduler/PriorityScheduler.scala @@ -19,6 +19,7 @@ package io.gearpump.cluster.scheduler import akka.actor.ActorRef +import io.gearpump.WorkerId import io.gearpump.cluster.AppMasterToMaster.RequestResource import io.gearpump.cluster.MasterToAppMaster.ResourceAllocated import io.gearpump.cluster.scheduler.Relaxation._ @@ -104,7 +105,7 @@ class PriorityScheduler extends Scheduler { resourceRequests = resourceRequests.filter(_.appId != appId) } - private def allocateFairly(resources: mutable.HashMap[Int, (ActorRef, Resource)], request: ResourceRequest): List[ResourceAllocation] = { + private def allocateFairly(resources: mutable.HashMap[WorkerId, (ActorRef, Resource)], request: ResourceRequest): List[ResourceAllocation] = { val workerNum = resources.size var allocations = List.empty[ResourceAllocation] var totalAvailable = Resource(resources.values.map(_._2.slots).sum) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/daemon/src/main/scala/io/gearpump/cluster/scheduler/Scheduler.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/scheduler/Scheduler.scala b/daemon/src/main/scala/io/gearpump/cluster/scheduler/Scheduler.scala index 36b900f..8ccf1fb 100644 --- a/daemon/src/main/scala/io/gearpump/cluster/scheduler/Scheduler.scala +++ b/daemon/src/main/scala/io/gearpump/cluster/scheduler/Scheduler.scala @@ -20,7 +20,7 @@ package io.gearpump.cluster.scheduler import akka.actor.{Actor, ActorRef} import io.gearpump.cluster.MasterToWorker.UpdateResourceSucceed import io.gearpump.util.LogUtil -import io.gearpump.TimeStamp +import io.gearpump.{WorkerId, TimeStamp} import io.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, UpdateResourceSucceed, WorkerRegistered} import io.gearpump.cluster.WorkerToMaster.ResourceUpdate import io.gearpump.cluster.master.Master.WorkerTerminated @@ -35,7 +35,7 @@ import scala.collection.mutable */ abstract class Scheduler extends Actor{ val LOG: Logger = LogUtil.getLogger(getClass) - protected var resources = new mutable.HashMap[Int, (ActorRef, Resource)] + protected var resources = new mutable.HashMap[WorkerId, (ActorRef, Resource)] def handleScheduleMessage : Receive = { case WorkerRegistered(id, _) => http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/daemon/src/main/scala/io/gearpump/cluster/worker/Worker.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/worker/Worker.scala b/daemon/src/main/scala/io/gearpump/cluster/worker/Worker.scala index 8957dff..a746b39 100644 --- a/daemon/src/main/scala/io/gearpump/cluster/worker/Worker.scala +++ b/daemon/src/main/scala/io/gearpump/cluster/worker/Worker.scala @@ -26,6 +26,7 @@ import java.util.concurrent.{Executors, TimeUnit} import akka.actor.SupervisorStrategy.Stop import akka.actor._ import com.typesafe.config.{Config, ConfigFactory} +import io.gearpump.WorkerId import io.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData} import io.gearpump.cluster.AppMasterToWorker._ import io.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryWorkerConfig} @@ -63,7 +64,7 @@ private[cluster] class Worker(masterProxy : ActorRef) extends Actor with TimeOut private var resource = Resource.empty private var allocatedResources = Map[ActorRef, Resource]() private var executorsInfo = Map[ActorRef, ExecutorSlots]() - private var id = -1 + private var id: WorkerId = WorkerId.unspecified private val createdTime = System.currentTimeMillis() private var masterInfo: MasterInfo = null private var executorNameToActor = Map.empty[String, ActorRef] http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/daemon/src/test/scala/io/gearpump/cluster/main/MainSpec.scala ---------------------------------------------------------------------- diff --git a/daemon/src/test/scala/io/gearpump/cluster/main/MainSpec.scala b/daemon/src/test/scala/io/gearpump/cluster/main/MainSpec.scala index d412020..9dc0289 100644 --- a/daemon/src/test/scala/io/gearpump/cluster/main/MainSpec.scala +++ b/daemon/src/test/scala/io/gearpump/cluster/main/MainSpec.scala @@ -17,23 +17,12 @@ */ package io.gearpump.cluster.main -import java.util.concurrent.TimeUnit - -import akka.testkit.TestProbe -import io.gearpump.cluster.ClientToMaster.ResolveAppId import io.gearpump.cluster.MasterToAppMaster.{ReplayFromTimestampWindowTrailingEdge, AppMastersDataRequest} -import io.gearpump.cluster.TestUtil -import io.gearpump.cluster.main.{Local, Replay, Kill, Worker} -import io.gearpump.cluster.master.MasterProxy -import io.gearpump.util.{Constants, Util} import io.gearpump.cluster.ClientToMaster.{ResolveAppId, ShutdownApplication} import io.gearpump.cluster.MasterToAppMaster._ import io.gearpump.cluster.MasterToClient.{ResolveAppIdResult, ReplayApplicationResult, ShutdownApplicationResult} -import io.gearpump.cluster.MasterToWorker.WorkerRegistered import io.gearpump.cluster.WorkerToMaster.RegisterNewWorker -import io.gearpump.cluster.master.MasterProxy import io.gearpump.cluster.{MasterHarness, TestUtil} -import io.gearpump.transport.HostPort import io.gearpump.util.Constants._ import io.gearpump.util.{Constants, LogUtil, Util} import org.scalatest._ http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/daemon/src/test/scala/io/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala ---------------------------------------------------------------------- diff --git a/daemon/src/test/scala/io/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala b/daemon/src/test/scala/io/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala index 0a7e1f8..aa2028b 100644 --- a/daemon/src/test/scala/io/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala +++ b/daemon/src/test/scala/io/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala @@ -19,6 +19,7 @@ package io.gearpump.cluster.scheduler import akka.actor.{ActorSystem, Props} import akka.testkit.{ImplicitSender, TestKit, TestProbe} +import io.gearpump.WorkerId import io.gearpump.cluster.AppMasterToMaster.RequestResource import io.gearpump.cluster.MasterToAppMaster.ResourceAllocated import io.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, WorkerRegistered} @@ -36,8 +37,8 @@ class PrioritySchedulerSpec(_system: ActorSystem) extends TestKit(_system) with def this() = this(ActorSystem("PrioritySchedulerSpec", TestUtil.DEFAULT_CONFIG)) val appId = 0 - val workerId1 = 1 - val workerId2 = 2 + val workerId1: WorkerId = WorkerId(1, 0L) + val workerId2: WorkerId = WorkerId(2, 0L) val mockAppMaster = TestProbe() val mockWorker1 = TestProbe() val mockWorker2 = TestProbe() @@ -55,8 +56,8 @@ class PrioritySchedulerSpec(_system: ActorSystem) extends TestKit(_system) with "drop application's resource requests when the application is removed" in { val scheduler = system.actorOf(Props(classOf[PriorityScheduler])) - val request1 = ResourceRequest(Resource(40), 0, Priority.HIGH, Relaxation.ANY) - val request2 = ResourceRequest(Resource(20), 0, Priority.HIGH, Relaxation.ANY) + val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, Priority.HIGH, Relaxation.ANY) + val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, Priority.HIGH, Relaxation.ANY) scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref) scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref) scheduler.tell(ApplicationFinished(appId), mockAppMaster.ref) @@ -69,9 +70,9 @@ class PrioritySchedulerSpec(_system: ActorSystem) extends TestKit(_system) with "The resource request with higher priority" should { "be handled first" in { val scheduler = system.actorOf(Props(classOf[PriorityScheduler])) - val request1 = ResourceRequest(Resource(40), 0, Priority.LOW, Relaxation.ANY) - val request2 = ResourceRequest(Resource(20), 0, Priority.NORMAL, Relaxation.ANY) - val request3 = ResourceRequest(Resource(30), 0, Priority.HIGH, Relaxation.ANY) + val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, Priority.LOW, Relaxation.ANY) + val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, Priority.NORMAL, Relaxation.ANY) + val request3 = ResourceRequest(Resource(30), WorkerId.unspecified, Priority.HIGH, Relaxation.ANY) scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref) scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref) @@ -94,8 +95,8 @@ class PrioritySchedulerSpec(_system: ActorSystem) extends TestKit(_system) with "The resource request which delivered earlier" should { "be handled first if the priorities are the same" in { val scheduler = system.actorOf(Props(classOf[PriorityScheduler])) - val request1 = ResourceRequest(Resource(40), 0, Priority.HIGH, Relaxation.ANY) - val request2 = ResourceRequest(Resource(20), 0, Priority.HIGH, Relaxation.ANY) + val request1 = ResourceRequest(Resource(40), WorkerId.unspecified, Priority.HIGH, Relaxation.ANY) + val request2 = ResourceRequest(Resource(20), WorkerId.unspecified, Priority.HIGH, Relaxation.ANY) scheduler.tell(RequestResource(appId, request1), mockAppMaster.ref) scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref) scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref) @@ -122,14 +123,14 @@ class PrioritySchedulerSpec(_system: ActorSystem) extends TestKit(_system) with scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref) mockAppMaster.expectMsg(5 seconds, ResourceAllocated(Array(ResourceAllocation(Resource(40), mockWorker2.ref, workerId2)))) - val request3 = ResourceRequest(Resource(30), 0, Priority.NORMAL, Relaxation.ANY, executorNum = 2) + val request3 = ResourceRequest(Resource(30), WorkerId.unspecified, Priority.NORMAL, Relaxation.ANY, executorNum = 2) scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref) mockAppMaster.expectMsg(5 seconds, ResourceAllocated(Array(ResourceAllocation(Resource(15), mockWorker1.ref, workerId1), ResourceAllocation(Resource(15), mockWorker2.ref, workerId2)))) //we have to manually update the resource on each worker scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(65)), mockWorker1.ref) scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(45)), mockWorker2.ref) - val request4 = ResourceRequest(Resource(60), 0, Priority.NORMAL, Relaxation.ONEWORKER) + val request4 = ResourceRequest(Resource(60), WorkerId(0, 0L), Priority.NORMAL, Relaxation.ONEWORKER) scheduler.tell(RequestResource(appId, request4), mockAppMaster.ref) mockAppMaster.expectMsg(5 seconds, ResourceAllocated(Array(ResourceAllocation(Resource(60), mockWorker1.ref, workerId1)))) } @@ -144,13 +145,13 @@ class PrioritySchedulerSpec(_system: ActorSystem) extends TestKit(_system) with scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref) //By default, the request requires only one executor - val request2 = ResourceRequest(Resource(20)) + val request2 = ResourceRequest(Resource(20), WorkerId.unspecified) scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref) val allocations2 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated] assert(allocations2.allocations.length == 1) assert(allocations2.allocations.head.resource == Resource(20)) - val request3 = ResourceRequest(Resource(24), executorNum = 3) + val request3 = ResourceRequest(Resource(24), WorkerId.unspecified, executorNum = 3) scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref) val allocations3 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated] assert(allocations3.allocations.length == 3) @@ -159,7 +160,7 @@ class PrioritySchedulerSpec(_system: ActorSystem) extends TestKit(_system) with //The total available resource can not satisfy the requirements with executor number scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(30)), mockWorker1.ref) scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(30)), mockWorker2.ref) - val request4 = ResourceRequest(Resource(60), executorNum = 3) + val request4 = ResourceRequest(Resource(60), WorkerId.unspecified, executorNum = 3) scheduler.tell(RequestResource(appId, request4), mockAppMaster.ref) val allocations4 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated] assert(allocations4.allocations.length == 2) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/daemon/src/test/scala/io/gearpump/cluster/worker/WorkerSpec.scala ---------------------------------------------------------------------- diff --git a/daemon/src/test/scala/io/gearpump/cluster/worker/WorkerSpec.scala b/daemon/src/test/scala/io/gearpump/cluster/worker/WorkerSpec.scala index ae73a3b..9fc4096 100644 --- a/daemon/src/test/scala/io/gearpump/cluster/worker/WorkerSpec.scala +++ b/daemon/src/test/scala/io/gearpump/cluster/worker/WorkerSpec.scala @@ -20,6 +20,7 @@ package io.gearpump.cluster.worker import akka.actor.{ActorSystem, PoisonPill, Props} import akka.testkit.TestProbe import com.typesafe.config.ConfigFactory +import io.gearpump.WorkerId import io.gearpump.cluster.WorkerToMaster.RegisterNewWorker import io.gearpump.cluster.{TestUtil, MasterHarness} import io.gearpump.util.{ActorSystemBooter, ActorUtil, Constants} @@ -40,7 +41,7 @@ class WorkerSpec extends WordSpec with Matchers with BeforeAndAfterEach with Mas override def config = TestUtil.DEFAULT_CONFIG val appId = 1 - val workerId = 1 + val workerId: WorkerId = WorkerId(1, 0L) val executorId = 1 var masterProxy: TestProbe = null var mockMaster: TestProbe = null http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/docs/dev-rest-api.md ---------------------------------------------------------------------- diff --git a/docs/dev-rest-api.md b/docs/dev-rest-api.md index 7fc718a..7b2bb6f 100644 --- a/docs/dev-rest-api.md +++ b/docs/dev-rest-api.md @@ -165,7 +165,7 @@ Sample Response: ``` [ { - "workerId": 1, + "workerId": "1", "state": "active", "actorPath": "akka.tcp://[email protected]:3000/user/Worker0", "aliveFor": "431565", @@ -188,7 +188,7 @@ Sample Response: "jvmName": "11788@lisa" }, { - "workerId": 0, + "workerId": "0", "state": "active", "actorPath": "akka.tcp://[email protected]:3000/user/Worker1", "aliveFor": "431546", @@ -397,7 +397,7 @@ Sample Response: ``` { - "workerId": 0, + "workerId": "0", "state": "active", "actorPath": "akka.tcp://[email protected]:3000/user/Worker1", "aliveFor": "831069", @@ -738,19 +738,19 @@ Sample Response: { "executorId": 0, "executor": "akka.tcp://[email protected]:52240/remote/akka.tcp/[email protected]:52212/user/daemon/appdaemon1/$c/appmaster/executors/0#-1554950276", - "workerId": 1, + "workerId": "1", "status": "active" }, { "executorId": 1, "executor": "akka.tcp://[email protected]:52241/remote/akka.tcp/[email protected]:52212/user/daemon/appdaemon1/$c/appmaster/executors/1#928082134", - "workerId": 0, + "workerId": "0", "status": "active" }, { "executorId": -1, "executor": "akka://app1-executor-1/user/daemon/appdaemon1/$c/appmaster", - "workerId": 1, + "workerId": "1", "status": "active" } ], @@ -1087,7 +1087,7 @@ Sample Response: ``` { "id": 1, - "workerId": 0, + "workerId": "0", "actorPath": "akka.tcp://[email protected]:52241/remote/akka.tcp/[email protected]:52212/user/daemon/appdaemon1/$c/appmaster/executors/1", "logFile": "logs/", "status": "active", http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistShellAppMasterSpec.scala ---------------------------------------------------------------------- diff --git a/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistShellAppMasterSpec.scala b/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistShellAppMasterSpec.scala index 2a19bd2..ec6bba1 100644 --- a/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistShellAppMasterSpec.scala +++ b/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/DistShellAppMasterSpec.scala @@ -19,6 +19,7 @@ package io.gearpump.examples.distributedshell import akka.actor.ActorSystem import akka.testkit.{TestActorRef, TestProbe} +import io.gearpump.WorkerId import io.gearpump.cluster.AppMasterToMaster.{RequestResource, GetAllWorkers, RegisterAppMaster} import io.gearpump.cluster.AppMasterToWorker.LaunchExecutor import io.gearpump.cluster.MasterToAppMaster.{ResourceAllocated, WorkerList, AppMasterRegistered} @@ -37,7 +38,7 @@ class DistShellAppMasterSpec extends WordSpec with Matchers with BeforeAndAfter{ val appId = 0 val userName = "test" val masterExecutorId = 0 - val workerList = List(1, 2, 3) + val workerList = List(WorkerId(1, 0L), WorkerId(2, 0L), WorkerId(3, 0L)) val resource = Resource(1) val appJar = None val appDescription = AppDescription("app0", classOf[DistShellAppMaster].getName, UserConfig.empty) @@ -57,7 +58,7 @@ class DistShellAppMasterSpec extends WordSpec with Matchers with BeforeAndAfter{ workerList.foreach { workerId => mockMaster.expectMsg(RequestResource(appId, ResourceRequest(Resource(1), workerId, relaxation = Relaxation.SPECIFICWORKER))) } - mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(resource, mockWorker1.ref, 1)))) + mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(resource, mockWorker1.ref, WorkerId(1, 0L))))) mockWorker1.expectMsgClass(classOf[LaunchExecutor]) mockWorker1.reply(RegisterActorSystem(ActorUtil.getSystemAddress(system).toString)) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/ShellExecutorSpec.scala ---------------------------------------------------------------------- diff --git a/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/ShellExecutorSpec.scala b/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/ShellExecutorSpec.scala index 6eceb16..d51880b 100644 --- a/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/ShellExecutorSpec.scala +++ b/examples/distributedshell/src/test/scala/io/gearpump/examples/distributedshell/ShellExecutorSpec.scala @@ -19,6 +19,7 @@ package io.gearpump.examples.distributedshell import akka.actor.{ActorSystem, Props} import akka.testkit.TestProbe +import io.gearpump.WorkerId import io.gearpump.examples.distributedshell.DistShellAppMaster.ShellCommandResult import io.gearpump.cluster.appmaster.WorkerInfo import io.gearpump.cluster.scheduler.Resource @@ -34,7 +35,7 @@ class ShellExecutorSpec extends WordSpec with Matchers { "ShellExecutor" should { "execute the shell command and return the result" in { val executorId = 1 - val workerId = 2 + val workerId = WorkerId(2, 0L) val appId = 0 val appName = "app" val resource = Resource(1) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/examples/distributeservice/src/test/scala/io/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala ---------------------------------------------------------------------- diff --git a/examples/distributeservice/src/test/scala/io/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala b/examples/distributeservice/src/test/scala/io/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala index da5001e..fcdbf14 100644 --- a/examples/distributeservice/src/test/scala/io/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala +++ b/examples/distributeservice/src/test/scala/io/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala @@ -19,6 +19,7 @@ package io.gearpump.experiments.distributeservice import akka.actor.ActorSystem import akka.testkit.{TestActorRef, TestProbe} +import io.gearpump.WorkerId import io.gearpump.cluster.AppMasterToMaster.{RequestResource, GetAllWorkers, RegisterAppMaster} import io.gearpump.cluster.AppMasterToWorker.LaunchExecutor import io.gearpump.cluster.MasterToAppMaster.{ResourceAllocated, WorkerList, AppMasterRegistered} @@ -41,7 +42,7 @@ class DistServiceAppMasterSpec extends WordSpec with Matchers with BeforeAndAfte val appId = 0 val userName = "test" val masterExecutorId = 0 - val workerList = List(1, 2, 3) + val workerList = List(WorkerId(1, 0L), WorkerId(2, 0L), WorkerId(3, 0L)) val resource = Resource(1) val appJar = None val appDescription = AppDescription("app0", classOf[DistServiceAppMaster].getName, UserConfig.empty) @@ -64,7 +65,7 @@ class DistServiceAppMasterSpec extends WordSpec with Matchers with BeforeAndAfte workerList.foreach { workerId => mockMaster.expectMsg(RequestResource(appId, ResourceRequest(Resource(1), workerId, relaxation = Relaxation.SPECIFICWORKER))) } - mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(resource, mockWorker1.ref, 1)))) + mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(resource, mockWorker1.ref, WorkerId(1, 0L))))) mockWorker1.expectMsgClass(classOf[LaunchExecutor]) mockWorker1.reply(RegisterActorSystem(ActorUtil.getSystemAddress(system).toString)) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala index 2830390..92ed3ec 100644 --- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala +++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala @@ -183,7 +183,7 @@ class RestServiceSpec extends TestSpecBase { "retrieve 1 master for a non-HA cluster" in { // exercise val masterSummary = restClient.queryMaster() - masterSummary.cluster shouldEqual cluster.getMastersAddresses + masterSummary.cluster.map(_.toTuple) shouldEqual cluster.getMastersAddresses masterSummary.aliveFor should be > 0L masterSummary.masterStatus shouldEqual MasterStatus.Synced } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala index 1d7ddd9..9d7e39f 100644 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala +++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala @@ -25,7 +25,7 @@ import scala.sys.process._ * A helper to instantiate the base image for different usage. */ class BaseContainer(val host: String, command: String, - masterAddrs: Seq[(String, Int)], + masterAddrs: List[(String, Int)], tunnelPorts: Set[Int] = Set.empty) { private val IMAGE_NAME = "stanleyxu2005/gearpump-launcher" http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala index 7c9a69a..c17a20d 100644 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala +++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala @@ -17,6 +17,7 @@ */ package io.gearpump.integrationtest.minicluster +import io.gearpump.cluster.master.MasterNode import io.gearpump.integrationtest.{Docker, Util} import org.apache.log4j.Logger @@ -32,15 +33,15 @@ class MiniCluster { private val REST_SERVICE_PORT = 8090 private val MASTER_PORT = 3000 - private val MASTER_ADDRS = { + private val MASTER_ADDRS: List[(String, Int)] = { (0 to 0).map(index => ("master" + index, MASTER_PORT) - ) + ).toList } - lazy val commandLineClient = new CommandLineClient(getMasterHosts.head) + lazy val commandLineClient: CommandLineClient = new CommandLineClient(getMasterHosts.head) - lazy val restClient = { + lazy val restClient: RestClient = { val client = new RestClient(getMasterHosts.head, REST_SERVICE_PORT) client } @@ -129,16 +130,16 @@ class MiniCluster { start() } - def getMastersAddresses = { + def getMastersAddresses: List[(String, Int)] = { MASTER_ADDRS } - def getMasterHosts = { + def getMasterHosts: List[String] = { MASTER_ADDRS.map({ case (host, port) => host }) } - def getWorkerHosts = { - workers + def getWorkerHosts: List[String] = { + workers.toList } def nodeIsOnline(host: String): Boolean = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala index f11ff9c..7d83c93 100644 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala +++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala @@ -18,6 +18,7 @@ package io.gearpump.integrationtest.minicluster import com.typesafe.config.{Config, ConfigFactory} +import io.gearpump.WorkerId import io.gearpump.cluster.{AppJar, MasterToAppMaster} import io.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMastersData} import io.gearpump.cluster.MasterToClient.HistoryMetrics @@ -36,6 +37,7 @@ import io.gearpump.util.{Constants, Graph} import org.apache.log4j.Logger import upickle.Js import upickle.default._ +import io.gearpump.services.util.UpickleUtil._ /** * A REST client to operate a Gearpump cluster @@ -168,14 +170,14 @@ class RestClient(host: String, port: Int) { ConfigFactory.parseString(resp) } - def queryWorkerMetrics(workerId: Int, current: Boolean): HistoryMetrics = { + def queryWorkerMetrics(workerId: WorkerId, current: Boolean): HistoryMetrics = { val args = if (current) "?readLatest=true" else "" - val resp = callApi(s"worker/$workerId/metrics/worker$workerId?$args") + val resp = callApi(s"worker/${WorkerId.render(workerId)}/metrics/worker$workerId?$args") decodeAs[HistoryMetrics](resp) } - def queryWorkerConfig(workerId: Int): Config = { - val resp = callApi(s"worker/$workerId/config") + def queryWorkerConfig(workerId: WorkerId): Config = { + val resp = callApi(s"worker/${WorkerId.render(workerId)}/config") ConfigFactory.parseString(resp) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala index a018280..dbcc53d 100644 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala +++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala @@ -22,7 +22,7 @@ import backtype.storm.utils.{Utils, DRPCClient} import io.gearpump.integrationtest.{Util, Docker} import io.gearpump.integrationtest.minicluster.{RestClient, BaseContainer} -class StormClient(masterAddrs: Seq[(String, Int)], restClient: RestClient) { +class StormClient(masterAddrs: List[(String, Int)], restClient: RestClient) { private val CONFIG_FILE = "/opt/gearpump/storm.yaml" private val DRPC_HOST = "storm0" http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/project/Build.scala ---------------------------------------------------------------------- diff --git a/project/Build.scala b/project/Build.scala index a99530f..b521e53 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -317,8 +317,8 @@ object Build extends sbt.Build { checksums := Seq(""), requiresDOM := true, libraryDependencies ++= Seq( - "com.lihaoyi" %%% "upickle" % upickleVersion, - "com.lihaoyi" %%% "utest" % "0.3.1" +// "com.lihaoyi" %%% "upickle" % upickleVersion, +// "com.lihaoyi" %%% "utest" % "0.3.1" ), scalaJSStage in Global := FastOptStage, testFrameworks += new TestFramework("utest.runner.Framework"), http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/services/jvm/src/main/scala/io/gearpump/services/AdminService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/io/gearpump/services/AdminService.scala b/services/jvm/src/main/scala/io/gearpump/services/AdminService.scala index b159a19..aa88d13 100644 --- a/services/jvm/src/main/scala/io/gearpump/services/AdminService.scala +++ b/services/jvm/src/main/scala/io/gearpump/services/AdminService.scala @@ -23,7 +23,7 @@ import akka.http.scaladsl.model._ import akka.http.scaladsl.server.Directives._ import akka.stream.{Materializer} import io.gearpump.util.{Constants, Util} - +import io.gearpump.services.util.UpickleUtil._ /** * AdminService is for cluster-wide managements. it is not related with * specific application. http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/services/jvm/src/main/scala/io/gearpump/services/AppMasterService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/io/gearpump/services/AppMasterService.scala b/services/jvm/src/main/scala/io/gearpump/services/AppMasterService.scala index 98ce0b1..30ec061 100644 --- a/services/jvm/src/main/scala/io/gearpump/services/AppMasterService.scala +++ b/services/jvm/src/main/scala/io/gearpump/services/AppMasterService.scala @@ -39,7 +39,7 @@ import io.gearpump.util.ActorUtil.{askActor, askAppMaster} import io.gearpump.util.FileDirective._ import io.gearpump.util.{Constants, Util} import upickle.default.{read, write} - +import io.gearpump.services.util.UpickleUtil._ import scala.util.{Failure, Success, Try} /** http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala b/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala index 3e2b9ca..91f25fe 100644 --- a/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala +++ b/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala @@ -44,6 +44,7 @@ import io.gearpump.streaming.{ProcessorDescription, ProcessorId, StreamApplicati import io.gearpump.util.ActorUtil._ import io.gearpump.util.FileDirective._ import io.gearpump.util.{Constants, Graph, Util} +import io.gearpump.services.util.UpickleUtil._ import scala.collection.JavaConversions._ import scala.concurrent.Future @@ -154,7 +155,6 @@ class MasterService(val master: ActorRef, path("submitdag") { post { entity(as[String]) { request => - import io.gearpump.services.util.UpickleUtil._ val msg = java.net.URLDecoder.decode(request, "UTF-8") val submitApplicationRequest = read[SubmitApplicationRequest](msg) import submitApplicationRequest.{appName, dag, processors, userconfig} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/services/jvm/src/main/scala/io/gearpump/services/SecurityService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/io/gearpump/services/SecurityService.scala b/services/jvm/src/main/scala/io/gearpump/services/SecurityService.scala index 8e03c43..a5bbe2f 100644 --- a/services/jvm/src/main/scala/io/gearpump/services/SecurityService.scala +++ b/services/jvm/src/main/scala/io/gearpump/services/SecurityService.scala @@ -34,10 +34,12 @@ import io.gearpump.services.SecurityService.{User, UserSession} import io.gearpump.services.security.oauth2.OAuth2Authenticator import io.gearpump.util.{Constants, LogUtil} import upickle.default.{write} +import io.gearpump.services.util.UpickleUtil._ import io.gearpump.security.{Authenticator => BaseAuthenticator} import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success} + /** * When user cannot be authenticated, will reject with 401 AuthenticationFailedRejection * When user can be authenticated, but are not authorized to access certail resource, will http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/services/jvm/src/main/scala/io/gearpump/services/StaticService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/io/gearpump/services/StaticService.scala b/services/jvm/src/main/scala/io/gearpump/services/StaticService.scala index b38e5cb..09180c3 100644 --- a/services/jvm/src/main/scala/io/gearpump/services/StaticService.scala +++ b/services/jvm/src/main/scala/io/gearpump/services/StaticService.scala @@ -23,6 +23,7 @@ import akka.http.scaladsl.model._ import akka.http.scaladsl.server.Directives._ import akka.stream.{Materializer} import io.gearpump.util.{Constants, Util} +import io.gearpump.services.util.UpickleUtil._ /** * static resource files. http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/services/jvm/src/main/scala/io/gearpump/services/SupervisorService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/io/gearpump/services/SupervisorService.scala b/services/jvm/src/main/scala/io/gearpump/services/SupervisorService.scala index 9ecd078..5d552d6 100644 --- a/services/jvm/src/main/scala/io/gearpump/services/SupervisorService.scala +++ b/services/jvm/src/main/scala/io/gearpump/services/SupervisorService.scala @@ -22,6 +22,7 @@ import akka.actor.{ActorRef, ActorSystem} import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.server.Route import akka.stream.Materializer +import io.gearpump.WorkerId import io.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData, GetAllWorkers} import io.gearpump.cluster.ClientToMaster._ import io.gearpump.services.SupervisorService.{Path, Status} @@ -30,6 +31,7 @@ import io.gearpump.util.ActorUtil._ import scala.concurrent.Future import scala.util.{Failure, Success} import upickle.default.{read, write} +import io.gearpump.services.util.UpickleUtil._ class SupervisorService(val master: ActorRef, val supervisor: ActorRef, override val system: ActorSystem) extends BasicService { @@ -80,10 +82,10 @@ class SupervisorService(val master: ActorRef, val supervisor: ActorRef, override } } } ~ - path("removeworker" / IntNumber) { workerId => + path("removeworker" / Segment) { workerIdString => post { authorize { - + val workerId = WorkerId.parse(workerIdString) def future(): Future[CommandResult] = { askWorker[WorkerData](master, workerId, GetWorkerData(workerId)).flatMap{workerData => val containerId = workerData.workerDescription.resourceManagerContainerId http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/services/jvm/src/main/scala/io/gearpump/services/WorkerService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/io/gearpump/services/WorkerService.scala b/services/jvm/src/main/scala/io/gearpump/services/WorkerService.scala index 5efce44..a1edbe4 100644 --- a/services/jvm/src/main/scala/io/gearpump/services/WorkerService.scala +++ b/services/jvm/src/main/scala/io/gearpump/services/WorkerService.scala @@ -21,12 +21,14 @@ package io.gearpump.services import akka.actor.{ActorSystem, ActorRef} import akka.http.scaladsl.server.Directives._ import akka.stream.{Materializer} +import io.gearpump.WorkerId import io.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData} import io.gearpump.cluster.ClientToMaster.{ReadOption, QueryHistoryMetrics, QueryWorkerConfig} import io.gearpump.cluster.ClusterConfig import io.gearpump.cluster.MasterToClient.{HistoryMetrics, WorkerConfig} import io.gearpump.util.ActorUtil._ import io.gearpump.util.Constants +import io.gearpump.services.util.UpickleUtil._ import scala.util.{Failure, Success} @@ -37,8 +39,9 @@ class WorkerService(val master: ActorRef, override val system: ActorSystem) private val systemConfig = system.settings.config private val concise = systemConfig.getBoolean(Constants.GEARPUMP_SERVICE_RENDER_CONFIG_CONCISE) - override def doRoute(implicit mat: Materializer) = pathPrefix("worker" / IntNumber) { workerId => + override def doRoute(implicit mat: Materializer) = pathPrefix("worker" / Segment) { workerIdString => pathEnd { + val workerId = WorkerId.parse(workerIdString) onComplete(askWorker[WorkerData](master, workerId, GetWorkerData(workerId))) { case Success(value: WorkerData) => complete(write(value.workerDescription)) @@ -46,6 +49,7 @@ class WorkerService(val master: ActorRef, override val system: ActorSystem) } } ~ path("config") { + val workerId = WorkerId.parse(workerIdString) onComplete(askWorker[WorkerConfig](master, workerId, QueryWorkerConfig(workerId))) { case Success(value: WorkerConfig) => val config = Option(value.config).map(ClusterConfig.render(_, concise)).getOrElse("{}") @@ -55,6 +59,7 @@ class WorkerService(val master: ActorRef, override val system: ActorSystem) } } ~ path("metrics" / RestPath ) { path => + val workerId = WorkerId.parse(workerIdString) parameter(ReadOption.Key ? ReadOption.ReadLatest) { readOption => val query = QueryHistoryMetrics(path.head.toString, readOption) onComplete(askWorker[HistoryMetrics](master, workerId, query)) { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/services/jvm/src/main/scala/io/gearpump/services/util/UpickleUtil.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/io/gearpump/services/util/UpickleUtil.scala b/services/jvm/src/main/scala/io/gearpump/services/util/UpickleUtil.scala index 8ca2c65..0ae4505 100644 --- a/services/jvm/src/main/scala/io/gearpump/services/util/UpickleUtil.scala +++ b/services/jvm/src/main/scala/io/gearpump/services/util/UpickleUtil.scala @@ -18,6 +18,7 @@ package io.gearpump.services.util +import io.gearpump.WorkerId import io.gearpump.util.Graph import upickle.Js @@ -31,4 +32,14 @@ object UpickleUtil { val edgeList = upickle.default.readJs[List[(Int, String, Int)]](edges._2) Graph(vertexList, edgeList) } + + implicit val workerIdReader: upickle.default.Reader[WorkerId] = upickle.default.Reader[WorkerId] { + case Js.Str(str) => + WorkerId.parse(str) + } + + implicit val workerIdWriter: upickle.default.Writer[WorkerId] = upickle.default.Writer[WorkerId] { + case workerId: WorkerId => + Js.Str(WorkerId.render(workerId)) + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/services/jvm/src/test/scala/io/gearpump/services/AdminServiceSpec.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/test/scala/io/gearpump/services/AdminServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/AdminServiceSpec.scala index d4ea2b7..738bbad 100644 --- a/services/jvm/src/test/scala/io/gearpump/services/AdminServiceSpec.scala +++ b/services/jvm/src/test/scala/io/gearpump/services/AdminServiceSpec.scala @@ -23,7 +23,7 @@ import io.gearpump.cluster.TestUtil import akka.http.scaladsl.testkit.{ScalatestRouteTest, RouteTestTimeout} import com.typesafe.config.{Config, ConfigFactory} import org.scalatest.{BeforeAndAfterAll, Matchers, FlatSpec} - +import io.gearpump.services.util.UpickleUtil._ import scala.concurrent.duration._ import scala.util.Try http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/services/jvm/src/test/scala/io/gearpump/services/AppMasterServiceSpec.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/test/scala/io/gearpump/services/AppMasterServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/AppMasterServiceSpec.scala index 3cfe162..7b98cb6 100644 --- a/services/jvm/src/test/scala/io/gearpump/services/AppMasterServiceSpec.scala +++ b/services/jvm/src/test/scala/io/gearpump/services/AppMasterServiceSpec.scala @@ -37,7 +37,7 @@ import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} import org.slf4j.Logger import upickle.default.read import akka.http.scaladsl.testkit.ScalatestRouteTest - +import io.gearpump.services.util.UpickleUtil._ import scala.concurrent.duration._ import scala.util.{Success, Try} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala index 3a1c4fe..1cfc01a 100644 --- a/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala +++ b/services/jvm/src/test/scala/io/gearpump/services/MasterServiceSpec.scala @@ -28,6 +28,7 @@ import akka.stream.scaladsl.Source import akka.testkit.TestActor.{AutoPilot, KeepRunning} import akka.testkit.TestProbe import com.typesafe.config.{Config, ConfigFactory} +import io.gearpump.WorkerId import io.gearpump.cluster.AppMasterToMaster.{GetAllWorkers, GetMasterData, GetWorkerData, MasterData, WorkerData} import io.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryMasterConfig, ResolveWorkerId, SubmitApplication} import io.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMastersData, AppMastersDataRequest, WorkerList} @@ -44,6 +45,7 @@ import scala.concurrent.{Future, ExecutionContext} import scala.concurrent.duration._ import scala.util.{Success, Try} import akka.stream.scaladsl.FileIO +import io.gearpump.services.util.UpickleUtil._ class MasterServiceSpec extends FlatSpec with ScalatestRouteTest with Matchers with BeforeAndAfterAll { @@ -87,9 +89,9 @@ class MasterServiceSpec extends FlatSpec with ScalatestRouteTest with sender ! AppMastersData(List.empty[AppMasterData]) KeepRunning case GetAllWorkers => - sender ! WorkerList(List(0)) + sender ! WorkerList(List(WorkerId(0, 0L))) KeepRunning - case ResolveWorkerId(0) => + case ResolveWorkerId(WorkerId(0, 0L)) => sender ! ResolveWorkerIdResult(Success(mockWorker.ref)) KeepRunning case QueryHistoryMetrics(path, _, _, _) => http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala index c276286..10b45b9 100644 --- a/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala +++ b/services/jvm/src/test/scala/io/gearpump/services/SecurityServiceSpec.scala @@ -21,6 +21,7 @@ package io.gearpump.services import akka.http.scaladsl.testkit.{RouteTestTimeout} import com.typesafe.config.Config import io.gearpump.cluster.TestUtil +import io.gearpump.services.util.UpickleUtil._ import org.scalatest.{BeforeAndAfterAll, Matchers, FlatSpec} import akka.actor.{ActorSystem} import akka.http.scaladsl.server._ http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/services/jvm/src/test/scala/io/gearpump/services/StaticServiceSpec.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/test/scala/io/gearpump/services/StaticServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/StaticServiceSpec.scala index 2ab62e8..501776e 100644 --- a/services/jvm/src/test/scala/io/gearpump/services/StaticServiceSpec.scala +++ b/services/jvm/src/test/scala/io/gearpump/services/StaticServiceSpec.scala @@ -24,6 +24,7 @@ import akka.testkit.TestProbe import com.typesafe.config.{Config, ConfigFactory} import io.gearpump.cluster.TestUtil import io.gearpump.util.Constants +import io.gearpump.services.util.UpickleUtil._ import org.scalatest.{BeforeAndAfterAll, Matchers, FlatSpec} import scala.util.Try http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/services/jvm/src/test/scala/io/gearpump/services/WorkerServiceSpec.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/test/scala/io/gearpump/services/WorkerServiceSpec.scala b/services/jvm/src/test/scala/io/gearpump/services/WorkerServiceSpec.scala index e9256a4..dc8d5a7 100644 --- a/services/jvm/src/test/scala/io/gearpump/services/WorkerServiceSpec.scala +++ b/services/jvm/src/test/scala/io/gearpump/services/WorkerServiceSpec.scala @@ -23,12 +23,14 @@ import akka.http.scaladsl.model.headers.`Cache-Control` import akka.testkit.TestActor.{AutoPilot, KeepRunning} import akka.testkit.{TestKit, TestProbe} import com.typesafe.config.{Config, ConfigFactory} +import io.gearpump.WorkerId import io.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData} import io.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, QueryWorkerConfig, ResolveWorkerId} import io.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, ResolveWorkerIdResult, WorkerConfig} import io.gearpump.cluster.TestUtil import io.gearpump.cluster.worker.WorkerSummary import io.gearpump.jarstore.JarStoreService +import io.gearpump.services.util.UpickleUtil._ import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} import scala.concurrent.duration._ @@ -52,10 +54,10 @@ class WorkerServiceSpec extends FlatSpec with ScalatestRouteTest with Matchers mockWorker.setAutoPilot { new AutoPilot { def run(sender: ActorRef, msg: Any) = msg match { - case GetWorkerData(appId) => + case GetWorkerData(workerId) => sender ! WorkerData(WorkerSummary.empty) KeepRunning - case QueryWorkerConfig(appId) => + case QueryWorkerConfig(workerId) => sender ! WorkerConfig(null) KeepRunning case QueryHistoryMetrics(path, _, _, _) => @@ -79,7 +81,7 @@ class WorkerServiceSpec extends FlatSpec with ScalatestRouteTest with Matchers "ConfigQueryService" should "return config for worker" in { implicit val customTimeout = RouteTestTimeout(15.seconds) - (Get(s"/api/$REST_VERSION/worker/1/config") ~> workerRoute) ~> check{ + (Get(s"/api/$REST_VERSION/worker/${WorkerId.render(WorkerId(0, 0L))}/config") ~> workerRoute) ~> check{ val responseBody = responseAs[String] val config = Try(ConfigFactory.parseString(responseBody)) assert(config.isSuccess) @@ -88,7 +90,7 @@ class WorkerServiceSpec extends FlatSpec with ScalatestRouteTest with Matchers it should "return WorkerData" in { implicit val customTimeout = RouteTestTimeout(15.seconds) - (Get(s"/api/$REST_VERSION/worker/1") ~> workerRoute) ~> check{ + (Get(s"/api/$REST_VERSION/worker/${WorkerId.render(WorkerId(1, 0L))}") ~> workerRoute) ~> check{ val responseBody = responseAs[String] val config = Try(ConfigFactory.parseString(responseBody)) assert(config.isSuccess) @@ -102,7 +104,7 @@ class WorkerServiceSpec extends FlatSpec with ScalatestRouteTest with Matchers "MetricsQueryService" should "return history metrics" in { implicit val customTimeout = RouteTestTimeout(15.seconds) - (Get(s"/api/$REST_VERSION/worker/0/metrics/worker") ~> workerRoute) ~> check { + (Get(s"/api/$REST_VERSION/worker/${WorkerId.render(WorkerId(0, 0L))}/metrics/worker") ~> workerRoute) ~> check { val responseBody = responseAs[String] val config = Try(ConfigFactory.parseString(responseBody)) assert(config.isSuccess) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/streaming/src/main/scala/io/gearpump/streaming/appmaster/AppMaster.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/AppMaster.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/AppMaster.scala index fe90c6a..6fcc47c 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/AppMaster.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/appmaster/AppMaster.scala @@ -82,7 +82,8 @@ class AppMaster(appContext : AppMasterContext, app : AppDescription) extends Ap private val systemConfig = context.system.settings.config private var lastFailure = LastFailure(0L, null) - private val appMasterBrief = ExecutorBrief(APPMASTER_DEFAULT_EXECUTOR_ID, self.path.toString, Option(appContext.workerInfo).map(_.workerId).getOrElse(-1), "active") + private val appMasterBrief = ExecutorBrief(APPMASTER_DEFAULT_EXECUTOR_ID, + self.path.toString, Option(appContext.workerInfo).map(_.workerId).getOrElse(WorkerId.unspecified), "active") private val getHistoryMetricsConfig = HistoryMetricsConfig(systemConfig) @@ -93,7 +94,7 @@ class AppMaster(appContext : AppMasterContext, app : AppDescription) extends Ap private val appMasterExecutorSummary = ExecutorSummary( APPMASTER_DEFAULT_EXECUTOR_ID, - Option(appContext.workerInfo).map(_.workerId).getOrElse(-1), + Option(appContext.workerInfo).map(_.workerId).getOrElse(WorkerId.unspecified), self.path.toString, logFile.getAbsolutePath, status = "Active", @@ -334,6 +335,6 @@ object AppMaster { class ServiceNotAvailableException(reason: String) extends Exception(reason) - case class ExecutorBrief(executorId: ExecutorId, executor: String, workerId: Int, status: String) + case class ExecutorBrief(executorId: ExecutorId, executor: String, workerId: WorkerId, status: String) } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/streaming/src/main/scala/io/gearpump/streaming/appmaster/ExecutorManager.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/ExecutorManager.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/ExecutorManager.scala index 526fbc3..ebe6c11 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/ExecutorManager.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/appmaster/ExecutorManager.scala @@ -22,6 +22,7 @@ import akka.actor.SupervisorStrategy.Stop import akka.actor._ import akka.remote.RemoteScope import com.typesafe.config.Config +import io.gearpump.WorkerId import io.gearpump.cluster.AppMasterToWorker.ChangeExecutorResource import io.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSystemJvmConfig, ExecutorSystemStarted, StartExecutorSystemTimeout, StartExecutorSystems} import io.gearpump.cluster.appmaster.WorkerInfo @@ -167,7 +168,7 @@ private[appmaster] object ExecutorManager { case object GetExecutorInfo - case class ExecutorStarted(executorId: Int, resource: Resource, workerId: Int, boundedJar: Option[AppJar]) + case class ExecutorStarted(executorId: Int, resource: Resource, workerId: WorkerId, boundedJar: Option[AppJar]) case class ExecutorStopped(executorId: Int) case class SetTaskManager(taskManager: ActorRef) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/streaming/src/main/scala/io/gearpump/streaming/appmaster/JarScheduler.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/JarScheduler.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/JarScheduler.scala index fa2e444..e769a0d 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/JarScheduler.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/appmaster/JarScheduler.scala @@ -19,7 +19,7 @@ package io.gearpump.streaming.appmaster import akka.actor._ import com.typesafe.config.Config -import io.gearpump.TimeStamp +import io.gearpump.{WorkerId, TimeStamp} import io.gearpump.streaming.task.{StartClock, TaskId} import io.gearpump.streaming.{ProcessorDescription, DAG} import io.gearpump.cluster.AppJar @@ -52,7 +52,7 @@ class JarScheduler(appId : Int, appName: String, config: Config, factory: ActorR (actor ? GetRequestDetails).asInstanceOf[Future[Array[ResourceRequestDetail]]] } - def scheduleTask(appJar: AppJar, workerId: Int, executorId: Int, resource: Resource): Future[List[TaskId]] = { + def scheduleTask(appJar: AppJar, workerId: WorkerId, executorId: Int, resource: Resource): Future[List[TaskId]] = { (actor ? ScheduleTask(appJar, workerId, executorId, resource)).asInstanceOf[Future[List[TaskId]]] } @@ -71,7 +71,7 @@ object JarScheduler{ case object GetRequestDetails - case class ScheduleTask(appJar: AppJar, workerId: Int, executorId: Int, resource: Resource) + case class ScheduleTask(appJar: AppJar, workerId: WorkerId, executorId: Int, resource: Resource) case class ExecutorFailed(executorId: Int) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskLocator.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskLocator.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskLocator.scala index 10ce15b..c456a06 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskLocator.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskLocator.scala @@ -19,6 +19,7 @@ package io.gearpump.streaming.appmaster import com.typesafe.config.{ConfigValueFactory, ConfigFactory, ConfigRenderOptions, Config} import TaskLocator.{Localities, WorkerLocality, NonLocality, Locality} +import io.gearpump.WorkerId import io.gearpump.streaming.Constants import io.gearpump.streaming.task.TaskId import scala.util.Try @@ -56,12 +57,10 @@ object TaskLocator { trait Locality - case class WorkerLocality(workerId: Int) extends Locality + case class WorkerLocality(workerId: WorkerId) extends Locality object NonLocality extends Locality - type WorkerId = Int - case class Localities(localities: Map[WorkerId, Array[TaskId]]) object Localities { @@ -70,7 +69,7 @@ object TaskLocator { def fromJson(json: String): Localities = { val localities = ConfigFactory.parseString(json).getAnyRef("localities") .asInstanceOf[java.util.Map[String, String]].asScala.map { pair => - val workerId: WorkerId = pair._1.toInt + val workerId: WorkerId = WorkerId.parse(pair._1) val tasks = pair._2.split(",").map { task => val pattern(processorId, taskIndex) = task TaskId(processorId.toInt, taskIndex.toInt) @@ -82,7 +81,7 @@ object TaskLocator { def toJson(localities: Localities): String = { val map = localities.localities.toList.map {pair => - (pair._1.toString, pair._2.map(task => s"task_${task.processorId}_${task.index}").mkString(",")) + (WorkerId.render(pair._1), pair._2.map(task => s"task_${task.processorId}_${task.index}").mkString(",")) }.toMap.asJava ConfigFactory.empty().withValue("localities", ConfigValueFactory.fromAnyRef(map)). root.render(ConfigRenderOptions.concise())
