http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskSchedulerImpl.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskSchedulerImpl.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskSchedulerImpl.scala index 9510531..39f427a 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskSchedulerImpl.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskSchedulerImpl.scala @@ -18,7 +18,7 @@ package io.gearpump.streaming.appmaster import com.typesafe.config.Config -import io.gearpump.TimeStamp +import io.gearpump.{WorkerId, TimeStamp} import io.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceRequest} import io.gearpump.streaming.DAG import io.gearpump.streaming.appmaster.TaskLocator.{Locality, WorkerLocality} @@ -53,7 +53,7 @@ trait TaskScheduler { * @param executorId which executorId this resource belongs to. * @return a list of tasks */ - def schedule(workerId : Int, executorId: Int, resource: Resource) : List[TaskId] + def schedule(workerId : WorkerId, executorId: Int, resource: Resource) : List[TaskId] /** * This notify the scheduler that {executorId} is failed, and expect a set of @@ -74,7 +74,7 @@ trait TaskScheduler { } object TaskScheduler { - case class Location(workerId: Int, executorId: Int) + case class Location(workerId: WorkerId, executorId: Int) class TaskStatus(val taskId: TaskId, val preferLocality: Locality, var allocation: Location) } @@ -100,11 +100,9 @@ class TaskSchedulerImpl(appId : Int, appName: String, config: Config) extends T fetchResourceRequests(fromOneWorker = false) } - val WORKER_NO_PREFERENCE = 0 - import Relaxation._ private def fetchResourceRequests(fromOneWorker: Boolean = false): Array[ResourceRequest] ={ - var workersResourceRequest = Map.empty[Int, Resource] + var workersResourceRequest = Map.empty[WorkerId, Resource] tasks.filter(_.allocation == null).foreach{task => task.preferLocality match { @@ -112,7 +110,7 @@ class TaskSchedulerImpl(appId : Int, appName: String, config: Config) extends T val current = workersResourceRequest.getOrElse(workerId, Resource.empty) workersResourceRequest += workerId -> (current + Resource(1)) case _ => - val workerId = WORKER_NO_PREFERENCE + val workerId = WorkerId.unspecified val current = workersResourceRequest.getOrElse(workerId, Resource.empty) workersResourceRequest += workerId -> (current + Resource(1)) } @@ -120,15 +118,15 @@ class TaskSchedulerImpl(appId : Int, appName: String, config: Config) extends T workersResourceRequest.map {workerIdAndResource => val (workerId, resource) = workerIdAndResource - if (workerId == WORKER_NO_PREFERENCE) { - ResourceRequest(resource, executorNum = executorNum) + if (workerId == WorkerId.unspecified) { + ResourceRequest(resource, workerId = WorkerId.unspecified, executorNum = executorNum) } else { ResourceRequest(resource, workerId, relaxation = SPECIFICWORKER) } }.toArray } - override def schedule(workerId : Int, executorId: Int, resource: Resource) : List[TaskId] = { + override def schedule(workerId : WorkerId, executorId: Int, resource: Resource) : List[TaskId] = { var scheduledTasks = List.empty[TaskId] val location = Location(workerId, executorId) // schedule tasks for specific worker @@ -163,7 +161,7 @@ class TaskSchedulerImpl(appId : Int, appName: String, config: Config) extends T // clean the location of failed tasks failedTasks.foreach(_.allocation = null) - Array(ResourceRequest(Resource(failedTasks.length), relaxation = ONEWORKER)) + Array(ResourceRequest(Resource(failedTasks.length), workerId = WorkerId.unspecified, relaxation = ONEWORKER)) } override def scheduledTasks(executorId: Int): List[TaskId] = {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/streaming/src/main/scala/io/gearpump/streaming/executor/Executor.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/executor/Executor.scala b/streaming/src/main/scala/io/gearpump/streaming/executor/Executor.scala index 8a75d1e..cbce65b 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/executor/Executor.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/executor/Executor.scala @@ -23,6 +23,7 @@ import java.lang.management.ManagementFactory import akka.actor.SupervisorStrategy.Resume import akka.actor._ import com.typesafe.config.Config +import io.gearpump.WorkerId import io.gearpump.cluster.{ClusterConfig, ExecutorContext, UserConfig} import io.gearpump.metrics.Metrics.ReportMetrics import io.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService} @@ -412,7 +413,7 @@ object Executor { case class ExecutorSummary( id: Int, - workerId: Int, + workerId: WorkerId, actorPath: String, logFile: String, status: String, @@ -422,7 +423,7 @@ object Executor { ) object ExecutorSummary { - def empty: ExecutorSummary = ExecutorSummary(0, 0, "", "", "", 1, null, jvmName = "") + def empty: ExecutorSummary = ExecutorSummary(0, WorkerId.unspecified, "", "", "", 1, null, jvmName = "") } case class GetExecutorSummary(executorId: Int) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/streaming/src/test/scala/io/gearpump/streaming/appmaster/AppMasterSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/AppMasterSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/AppMasterSpec.scala index 2c22f15..d1d7006 100644 --- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/AppMasterSpec.scala +++ b/streaming/src/test/scala/io/gearpump/streaming/appmaster/AppMasterSpec.scala @@ -19,7 +19,7 @@ package io.gearpump.streaming.appmaster import akka.actor.{ActorRef, Props} import akka.testkit.{TestActorRef, TestProbe} -import io.gearpump.Message +import io.gearpump.{WorkerId, Message} import io.gearpump.cluster.AppMasterToMaster._ import io.gearpump.cluster.AppMasterToWorker.LaunchExecutor import io.gearpump.cluster.ClientToMaster.ShutdownApplication @@ -46,7 +46,7 @@ class AppMasterSpec extends WordSpec with Matchers with BeforeAndAfterEach with var appMaster: ActorRef = null val appId = 0 - val workerId = 1 + val workerId = WorkerId(1, 0L) val resource = Resource(1) val taskDescription1 = Processor[TaskA](2) val taskDescription2 = Processor[TaskB](2) @@ -97,7 +97,7 @@ class AppMasterSpec extends WordSpec with Matchers with BeforeAndAfterEach with mockMaster.reply(GetAppDataResult("startClock", 0L)) - mockMaster.expectMsg(15 seconds, RequestResource(appId, ResourceRequest(Resource(4)))) + mockMaster.expectMsg(15 seconds, RequestResource(appId, ResourceRequest(Resource(4), workerId = WorkerId.unspecified))) } override def afterEach() = { @@ -115,7 +115,7 @@ class AppMasterSpec extends WordSpec with Matchers with BeforeAndAfterEach with mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(resource, mockWorker.ref, workerId)))) mockWorker.expectMsgClass(classOf[LaunchExecutor]) mockWorker.reply(ExecutorLaunchRejected("")) - mockMaster.expectMsg(RequestResource(appId, ResourceRequest(resource))) + mockMaster.expectMsg(RequestResource(appId, ResourceRequest(resource, WorkerId.unspecified))) } "find a new master when lost connection with master" in { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorManagerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorManagerSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorManagerSpec.scala index aa81ea8..9121a22 100644 --- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorManagerSpec.scala +++ b/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorManagerSpec.scala @@ -22,7 +22,7 @@ import akka.actor._ import akka.testkit.TestProbe import com.typesafe.config.ConfigFactory import io.gearpump.streaming.appmaster.ExecutorManager.ExecutorStarted -import io.gearpump.TestProbeUtil +import io.gearpump.{WorkerId, TestProbeUtil} import io.gearpump.cluster.AppMasterToWorker.ChangeExecutorResource import io.gearpump.cluster._ import io.gearpump.cluster.appmaster.{ExecutorSystem, WorkerInfo} @@ -72,7 +72,7 @@ class ExecutorManagerSpec extends FlatSpec with Matchers with BeforeAndAfterAll val executorManager = system.actorOf(Props(new ExecutorManager(userConfig, appMasterContext, executorFactory, ConfigFactory.empty, appName))) taskManager.send(executorManager, SetTaskManager(taskManager.ref)) - val resourceRequest = Array(ResourceRequest(resource)) + val resourceRequest = Array(ResourceRequest(resource, WorkerId.unspecified)) //start executors taskManager.send(executorManager, StartExecutors(resourceRequest, appJar.get)) @@ -104,7 +104,7 @@ class ExecutorManagerSpec extends FlatSpec with Matchers with BeforeAndAfterAll val (master, executor, taskManager, executorManager) = startExecutorSystems val executorSystemDaemon = TestProbe() val worker = TestProbe() - val workerId = 0 + val workerId = WorkerId(0, 0L) val workerInfo = WorkerInfo(workerId, worker.ref) val executorSystem = ExecutorSystem(0, null, executorSystemDaemon.ref, resource, workerInfo) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/streaming/src/test/scala/io/gearpump/streaming/appmaster/JarSchedulerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/JarSchedulerSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/JarSchedulerSpec.scala index d9ebb37..12128c4 100644 --- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/JarSchedulerSpec.scala +++ b/streaming/src/test/scala/io/gearpump/streaming/appmaster/JarSchedulerSpec.scala @@ -19,6 +19,7 @@ package io.gearpump.streaming.appmaster import akka.actor.ActorSystem import com.typesafe.config.ConfigFactory +import io.gearpump.WorkerId import io.gearpump.streaming.{ProcessorDescription, DAG} import io.gearpump.cluster.{TestUtil, AppJar} import io.gearpump.cluster.scheduler.{Resource, ResourceRequest} @@ -49,13 +50,13 @@ class JarSchedulerSpec extends WordSpec with Matchers { implicit val dispatcher = system.dispatcher val manager = new JarScheduler(0, "APP", TestUtil.DEFAULT_CONFIG, system) manager.setDag(dag, Future{0L}) - val requests = Array(ResourceRequest(Resource(2))) + val requests = Array(ResourceRequest(Resource(2), WorkerId.unspecified)) val result = Await.result(manager.getRequestDetails(), 15 seconds) assert(result.length == 1) assert(result.head.jar == mockJar1) assert(result.head.requests.deep == requests.deep) - val tasks = Await.result(manager.scheduleTask(mockJar1, 0, 0, Resource(2)), 15 seconds) + val tasks = Await.result(manager.scheduleTask(mockJar1, WorkerId(0, 0L), 0, Resource(2)), 15 seconds) assert(tasks.contains(TaskId(0, 0))) assert(tasks.contains(TaskId(1, 0))) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskLocatorSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskLocatorSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskLocatorSpec.scala index 4c9cd5d..c55be84 100644 --- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskLocatorSpec.scala +++ b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskLocatorSpec.scala @@ -18,13 +18,14 @@ package io.gearpump.streaming.appmaster +import io.gearpump.WorkerId import io.gearpump.streaming.appmaster.TaskLocator.Localities import io.gearpump.streaming.task.TaskId import org.scalatest.{BeforeAndAfterAll, Matchers, FlatSpec} class TaskLocatorSpec extends FlatSpec with Matchers with BeforeAndAfterAll { it should "serialize/deserialize correctly" in { - val localities = new Localities(Map(0 -> Array(TaskId(0, 1), TaskId(1,2)))) + val localities = new Localities(Map(WorkerId(0, 0L) -> Array(TaskId(0, 1), TaskId(1,2)))) Localities.toJson(localities) localities.localities.mapValues(_.toList) shouldBe http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskManagerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskManagerSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskManagerSpec.scala index 621455d..8105df3 100644 --- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskManagerSpec.scala +++ b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskManagerSpec.scala @@ -39,7 +39,7 @@ import io.gearpump.streaming.{DAG, LifeTime, ProcessorDescription, ProcessorId} import io.gearpump.transport.HostPort import io.gearpump.util.Graph import io.gearpump.util.Graph._ -import io.gearpump.{Message, TimeStamp} +import io.gearpump.{WorkerId, Message, TimeStamp} import org.mockito.Mockito._ import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} @@ -65,7 +65,7 @@ class TaskManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach { val appId = 0 val resource = Resource(2) - val workerId = 0 + val workerId = WorkerId(0, 0L) val executorId = 0 override def beforeEach(): Unit = { @@ -166,7 +166,7 @@ class TaskManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach { // step2: Get Additional Resource Request when(scheduler.getRequestDetails()) - .thenReturn(Future{Array(ResourceRequestDetail(mockJar, Array(ResourceRequest(resource))))}) + .thenReturn(Future{Array(ResourceRequestDetail(mockJar, Array(ResourceRequest(resource, WorkerId.unspecified))))}) // step3: DAG changed. Start transit from ApplicationReady -> DynamicDAG dagManager.expectMsg(GetLatestDAG) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskSchedulerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskSchedulerSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskSchedulerSpec.scala index aeef61d..d2373ea 100644 --- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskSchedulerSpec.scala +++ b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskSchedulerSpec.scala @@ -21,7 +21,7 @@ import com.typesafe.config.ConfigFactory import io.gearpump.streaming.Constants import io.gearpump.streaming.appmaster.TaskLocator.Localities import io.gearpump.streaming.task.{StartTime, TaskContext, TaskId} -import io.gearpump.Message +import io.gearpump.{WorkerId, Message} import io.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceRequest} import io.gearpump.cluster.{TestUtil, ClusterConfig, UserConfig} import io.gearpump.partitioner.{HashPartitioner, Partitioner} @@ -47,8 +47,8 @@ class TaskSchedulerSpec extends WordSpec with Matchers { "schedule tasks on different workers properly according user's configuration" in { val localities = Localities( - Map(1 -> Array(TaskId(0,0), TaskId(0,1), TaskId(1,0), TaskId(1,1)), - 2 -> Array(TaskId(0,2), TaskId(0,3)) + Map(WorkerId(1, 0L) -> Array(TaskId(0,0), TaskId(0,1), TaskId(1,0), TaskId(1,1)), + WorkerId(2, 0L) -> Array(TaskId(0,2), TaskId(0,3)) )) val localityConfig = ConfigFactory.parseString(Localities.toJson(localities)) @@ -59,8 +59,8 @@ class TaskSchedulerSpec extends WordSpec with Matchers { config.withValue(s"$GEARPUMP_STREAMING_LOCALITIES.$appName", localityConfig.root)) val expectedRequests = - Array( ResourceRequest(Resource(4), 1, relaxation = Relaxation.SPECIFICWORKER), - ResourceRequest(Resource(2), 2, relaxation = Relaxation.SPECIFICWORKER)) + Array( ResourceRequest(Resource(4), WorkerId(1, 0L), relaxation = Relaxation.SPECIFICWORKER), + ResourceRequest(Resource(2), WorkerId(2, 0L), relaxation = Relaxation.SPECIFICWORKER)) taskScheduler.setDAG(dag) val resourceRequests = taskScheduler.getResourceRequests() @@ -71,14 +71,14 @@ class TaskSchedulerSpec extends WordSpec with Matchers { val tasksOnWorker1 = ArrayBuffer[Int]() val tasksOnWorker2 = ArrayBuffer[Int]() for (i <- 0 until 4) { - tasksOnWorker1.append(taskScheduler.schedule(1, executorId = 0, Resource(1)).head.processorId) + tasksOnWorker1.append(taskScheduler.schedule(WorkerId(1, 0L), executorId = 0, Resource(1)).head.processorId) } for (i <- 0 until 2) { - tasksOnWorker2.append(taskScheduler.schedule(2, executorId = 1, Resource(1)).head.processorId) + tasksOnWorker2.append(taskScheduler.schedule(WorkerId(2, 0L), executorId = 1, Resource(1)).head.processorId) } //allocate more resource, and no tasks to launch - assert(taskScheduler.schedule(3, executorId = 3, Resource(1)) == List.empty[TaskId]) + assert(taskScheduler.schedule(WorkerId(3, 0L), executorId = 3, Resource(1)) == List.empty[TaskId]) //on worker1, executor 0 assert(tasksOnWorker1.sorted.sameElements(Array(0, 0, 1, 1))) @@ -88,9 +88,9 @@ class TaskSchedulerSpec extends WordSpec with Matchers { val rescheduledResources = taskScheduler.executorFailed(executorId = 1) - assert(rescheduledResources.sameElements(Array(ResourceRequest(Resource(2), relaxation = Relaxation.ONEWORKER)))) + assert(rescheduledResources.sameElements(Array(ResourceRequest(Resource(2), WorkerId.unspecified, relaxation = Relaxation.ONEWORKER)))) - val launchedTask = taskScheduler.schedule(workerId = 3, executorId = 3, Resource(2)) + val launchedTask = taskScheduler.schedule(WorkerId(3, 0L), executorId = 3, Resource(2)) //start the failed 2 tasks Task(0, 0) and Task(0, 1) assert(launchedTask.length == 2) @@ -101,11 +101,11 @@ class TaskSchedulerSpec extends WordSpec with Matchers { val taskScheduler = new TaskSchedulerImpl(appId = 0, appName, config) val expectedRequests = - Array( ResourceRequest(Resource(4), 1, relaxation = Relaxation.SPECIFICWORKER), - ResourceRequest(Resource(2), 2, relaxation = Relaxation.SPECIFICWORKER)) + Array( ResourceRequest(Resource(4), WorkerId(1, 0L), relaxation = Relaxation.SPECIFICWORKER), + ResourceRequest(Resource(2), WorkerId(2, 0L), relaxation = Relaxation.SPECIFICWORKER)) taskScheduler.setDAG(dag) - val tasks = taskScheduler.schedule(1, executorId = 0, Resource(4)) + val tasks = taskScheduler.schedule(WorkerId(1, 0L), executorId = 0, Resource(4)) assert(tasks.filter(_.processorId == 0).length == 2) assert(tasks.filter(_.processorId == 1).length == 2) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/streaming/src/test/scala/io/gearpump/streaming/executor/ExecutorSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/io/gearpump/streaming/executor/ExecutorSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/executor/ExecutorSpec.scala index 1126ce0..151a188 100644 --- a/streaming/src/test/scala/io/gearpump/streaming/executor/ExecutorSpec.scala +++ b/streaming/src/test/scala/io/gearpump/streaming/executor/ExecutorSpec.scala @@ -19,6 +19,7 @@ package io.gearpump.streaming.executor import akka.actor.{ActorSystem, Props} import akka.testkit.TestProbe +import io.gearpump.WorkerId import io.gearpump.cluster.appmaster.WorkerInfo import io.gearpump.cluster.scheduler.Resource import io.gearpump.cluster.{ExecutorContext, TestUtil, UserConfig} @@ -39,7 +40,7 @@ import scala.language.postfixOps class ExecutorSpec extends FlatSpec with Matchers with BeforeAndAfterAll { val appId = 0 val executorId = 0 - val workerId = 0 + val workerId = WorkerId(0, 0L) var appMaster: TestProbe = null implicit var system: ActorSystem = null val userConf = UserConfig.empty
