http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskSchedulerImpl.scala
 
b/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskSchedulerImpl.scala
index 9510531..39f427a 100644
--- 
a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskSchedulerImpl.scala
+++ 
b/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskSchedulerImpl.scala
@@ -18,7 +18,7 @@
 package io.gearpump.streaming.appmaster
 
 import com.typesafe.config.Config
-import io.gearpump.TimeStamp
+import io.gearpump.{WorkerId, TimeStamp}
 import io.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceRequest}
 import io.gearpump.streaming.DAG
 import io.gearpump.streaming.appmaster.TaskLocator.{Locality, WorkerLocality}
@@ -53,7 +53,7 @@ trait TaskScheduler {
    * @param executorId which executorId this resource belongs to.
    * @return a list of tasks
    */
-  def schedule(workerId : Int, executorId: Int, resource: Resource) : 
List[TaskId]
+  def schedule(workerId : WorkerId, executorId: Int, resource: Resource) : 
List[TaskId]
 
   /**
    * This notify the scheduler that {executorId} is failed, and expect a set of
@@ -74,7 +74,7 @@ trait TaskScheduler {
 }
 
 object TaskScheduler {
-  case class Location(workerId: Int, executorId: Int)
+  case class Location(workerId: WorkerId, executorId: Int)
 
   class TaskStatus(val taskId: TaskId, val preferLocality: Locality, var 
allocation: Location)
 }
@@ -100,11 +100,9 @@ class TaskSchedulerImpl(appId : Int, appName: String, 
config: Config)  extends T
     fetchResourceRequests(fromOneWorker = false)
   }
 
-  val WORKER_NO_PREFERENCE = 0
-
   import Relaxation._
   private def fetchResourceRequests(fromOneWorker: Boolean = false): 
Array[ResourceRequest] ={
-    var workersResourceRequest = Map.empty[Int, Resource]
+    var workersResourceRequest = Map.empty[WorkerId, Resource]
 
     tasks.filter(_.allocation == null).foreach{task =>
       task.preferLocality match {
@@ -112,7 +110,7 @@ class TaskSchedulerImpl(appId : Int, appName: String, 
config: Config)  extends T
           val current = workersResourceRequest.getOrElse(workerId, 
Resource.empty)
           workersResourceRequest += workerId -> (current + Resource(1))
         case _ =>
-          val workerId = WORKER_NO_PREFERENCE
+          val workerId = WorkerId.unspecified
           val current = workersResourceRequest.getOrElse(workerId, 
Resource.empty)
           workersResourceRequest += workerId -> (current + Resource(1))
       }
@@ -120,15 +118,15 @@ class TaskSchedulerImpl(appId : Int, appName: String, 
config: Config)  extends T
 
     workersResourceRequest.map {workerIdAndResource =>
       val (workerId, resource) = workerIdAndResource
-      if (workerId == WORKER_NO_PREFERENCE) {
-        ResourceRequest(resource, executorNum = executorNum)
+      if (workerId == WorkerId.unspecified) {
+        ResourceRequest(resource, workerId = WorkerId.unspecified, executorNum 
= executorNum)
       } else {
         ResourceRequest(resource, workerId, relaxation = SPECIFICWORKER)
       }
     }.toArray
   }
 
-  override def schedule(workerId : Int, executorId: Int, resource: Resource) : 
List[TaskId] = {
+  override def schedule(workerId : WorkerId, executorId: Int, resource: 
Resource) : List[TaskId] = {
     var scheduledTasks = List.empty[TaskId]
     val location = Location(workerId, executorId)
     // schedule tasks for specific worker
@@ -163,7 +161,7 @@ class TaskSchedulerImpl(appId : Int, appName: String, 
config: Config)  extends T
     // clean the location of failed tasks
     failedTasks.foreach(_.allocation = null)
 
-    Array(ResourceRequest(Resource(failedTasks.length), relaxation = 
ONEWORKER))
+    Array(ResourceRequest(Resource(failedTasks.length), workerId = 
WorkerId.unspecified, relaxation = ONEWORKER))
   }
 
   override def scheduledTasks(executorId: Int): List[TaskId] = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/streaming/src/main/scala/io/gearpump/streaming/executor/Executor.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/executor/Executor.scala 
b/streaming/src/main/scala/io/gearpump/streaming/executor/Executor.scala
index 8a75d1e..cbce65b 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/executor/Executor.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/executor/Executor.scala
@@ -23,6 +23,7 @@ import java.lang.management.ManagementFactory
 import akka.actor.SupervisorStrategy.Resume
 import akka.actor._
 import com.typesafe.config.Config
+import io.gearpump.WorkerId
 import io.gearpump.cluster.{ClusterConfig, ExecutorContext, UserConfig}
 import io.gearpump.metrics.Metrics.ReportMetrics
 import io.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService}
@@ -412,7 +413,7 @@ object Executor {
 
   case class ExecutorSummary(
     id: Int,
-    workerId: Int,
+    workerId: WorkerId,
     actorPath: String,
     logFile: String,
     status: String,
@@ -422,7 +423,7 @@ object Executor {
   )
 
   object ExecutorSummary {
-    def empty: ExecutorSummary = ExecutorSummary(0, 0, "", "", "", 1, null, 
jvmName = "")
+    def empty: ExecutorSummary = ExecutorSummary(0, WorkerId.unspecified, "", 
"", "", 1, null, jvmName = "")
   }
 
   case class GetExecutorSummary(executorId: Int)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/streaming/src/test/scala/io/gearpump/streaming/appmaster/AppMasterSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/io/gearpump/streaming/appmaster/AppMasterSpec.scala 
b/streaming/src/test/scala/io/gearpump/streaming/appmaster/AppMasterSpec.scala
index 2c22f15..d1d7006 100644
--- 
a/streaming/src/test/scala/io/gearpump/streaming/appmaster/AppMasterSpec.scala
+++ 
b/streaming/src/test/scala/io/gearpump/streaming/appmaster/AppMasterSpec.scala
@@ -19,7 +19,7 @@ package io.gearpump.streaming.appmaster
 
 import akka.actor.{ActorRef, Props}
 import akka.testkit.{TestActorRef, TestProbe}
-import io.gearpump.Message
+import io.gearpump.{WorkerId, Message}
 import io.gearpump.cluster.AppMasterToMaster._
 import io.gearpump.cluster.AppMasterToWorker.LaunchExecutor
 import io.gearpump.cluster.ClientToMaster.ShutdownApplication
@@ -46,7 +46,7 @@ class AppMasterSpec extends WordSpec with Matchers with 
BeforeAndAfterEach with
   var appMaster: ActorRef = null
 
   val appId = 0
-  val workerId = 1
+  val workerId = WorkerId(1, 0L)
   val resource = Resource(1)
   val taskDescription1 = Processor[TaskA](2)
   val taskDescription2 = Processor[TaskB](2)
@@ -97,7 +97,7 @@ class AppMasterSpec extends WordSpec with Matchers with 
BeforeAndAfterEach with
 
     mockMaster.reply(GetAppDataResult("startClock", 0L))
 
-    mockMaster.expectMsg(15 seconds, RequestResource(appId, 
ResourceRequest(Resource(4))))
+    mockMaster.expectMsg(15 seconds, RequestResource(appId, 
ResourceRequest(Resource(4), workerId = WorkerId.unspecified)))
   }
 
   override def afterEach() = {
@@ -115,7 +115,7 @@ class AppMasterSpec extends WordSpec with Matchers with 
BeforeAndAfterEach with
       mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(resource, 
mockWorker.ref, workerId))))
       mockWorker.expectMsgClass(classOf[LaunchExecutor])
       mockWorker.reply(ExecutorLaunchRejected(""))
-      mockMaster.expectMsg(RequestResource(appId, ResourceRequest(resource)))
+      mockMaster.expectMsg(RequestResource(appId, ResourceRequest(resource, 
WorkerId.unspecified)))
     }
 
     "find a new master when lost connection with master" in {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorManagerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorManagerSpec.scala
 
b/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorManagerSpec.scala
index aa81ea8..9121a22 100644
--- 
a/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorManagerSpec.scala
+++ 
b/streaming/src/test/scala/io/gearpump/streaming/appmaster/ExecutorManagerSpec.scala
@@ -22,7 +22,7 @@ import akka.actor._
 import akka.testkit.TestProbe
 import com.typesafe.config.ConfigFactory
 import io.gearpump.streaming.appmaster.ExecutorManager.ExecutorStarted
-import io.gearpump.TestProbeUtil
+import io.gearpump.{WorkerId, TestProbeUtil}
 import io.gearpump.cluster.AppMasterToWorker.ChangeExecutorResource
 import io.gearpump.cluster._
 import io.gearpump.cluster.appmaster.{ExecutorSystem, WorkerInfo}
@@ -72,7 +72,7 @@ class ExecutorManagerSpec  extends FlatSpec with Matchers 
with BeforeAndAfterAll
     val executorManager = system.actorOf(Props(new ExecutorManager(userConfig, 
appMasterContext, executorFactory, ConfigFactory.empty, appName)))
 
     taskManager.send(executorManager, SetTaskManager(taskManager.ref))
-    val resourceRequest = Array(ResourceRequest(resource))
+    val resourceRequest = Array(ResourceRequest(resource, 
WorkerId.unspecified))
 
     //start executors
     taskManager.send(executorManager, StartExecutors(resourceRequest, 
appJar.get))
@@ -104,7 +104,7 @@ class ExecutorManagerSpec  extends FlatSpec with Matchers 
with BeforeAndAfterAll
     val (master, executor, taskManager, executorManager) = startExecutorSystems
     val executorSystemDaemon = TestProbe()
     val worker = TestProbe()
-    val workerId = 0
+    val workerId = WorkerId(0, 0L)
     val workerInfo = WorkerInfo(workerId, worker.ref)
     val executorSystem = ExecutorSystem(0, null, executorSystemDaemon.ref,
       resource, workerInfo)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/streaming/src/test/scala/io/gearpump/streaming/appmaster/JarSchedulerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/io/gearpump/streaming/appmaster/JarSchedulerSpec.scala
 
b/streaming/src/test/scala/io/gearpump/streaming/appmaster/JarSchedulerSpec.scala
index d9ebb37..12128c4 100644
--- 
a/streaming/src/test/scala/io/gearpump/streaming/appmaster/JarSchedulerSpec.scala
+++ 
b/streaming/src/test/scala/io/gearpump/streaming/appmaster/JarSchedulerSpec.scala
@@ -19,6 +19,7 @@ package io.gearpump.streaming.appmaster
 
 import akka.actor.ActorSystem
 import com.typesafe.config.ConfigFactory
+import io.gearpump.WorkerId
 import io.gearpump.streaming.{ProcessorDescription, DAG}
 import io.gearpump.cluster.{TestUtil, AppJar}
 import io.gearpump.cluster.scheduler.{Resource, ResourceRequest}
@@ -49,13 +50,13 @@ class JarSchedulerSpec extends WordSpec with Matchers {
       implicit val dispatcher = system.dispatcher
       val manager = new JarScheduler(0, "APP", TestUtil.DEFAULT_CONFIG, system)
       manager.setDag(dag, Future{0L})
-      val requests = Array(ResourceRequest(Resource(2)))
+      val requests = Array(ResourceRequest(Resource(2), WorkerId.unspecified))
       val result = Await.result(manager.getRequestDetails(), 15 seconds)
       assert(result.length == 1)
       assert(result.head.jar == mockJar1)
       assert(result.head.requests.deep == requests.deep)
 
-      val tasks = Await.result(manager.scheduleTask(mockJar1, 0, 0, 
Resource(2)), 15 seconds)
+      val tasks = Await.result(manager.scheduleTask(mockJar1, WorkerId(0, 0L), 
0, Resource(2)), 15 seconds)
       assert(tasks.contains(TaskId(0, 0)))
       assert(tasks.contains(TaskId(1, 0)))
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskLocatorSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskLocatorSpec.scala
 
b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskLocatorSpec.scala
index 4c9cd5d..c55be84 100644
--- 
a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskLocatorSpec.scala
+++ 
b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskLocatorSpec.scala
@@ -18,13 +18,14 @@
 
 package io.gearpump.streaming.appmaster
 
+import io.gearpump.WorkerId
 import io.gearpump.streaming.appmaster.TaskLocator.Localities
 import io.gearpump.streaming.task.TaskId
 import org.scalatest.{BeforeAndAfterAll, Matchers, FlatSpec}
 
 class TaskLocatorSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
   it should "serialize/deserialize correctly" in {
-    val localities = new Localities(Map(0 -> Array(TaskId(0, 1), TaskId(1,2))))
+    val localities = new Localities(Map(WorkerId(0, 0L) -> Array(TaskId(0, 1), 
TaskId(1,2))))
     Localities.toJson(localities)
 
     localities.localities.mapValues(_.toList) shouldBe

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskManagerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskManagerSpec.scala
 
b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskManagerSpec.scala
index 621455d..8105df3 100644
--- 
a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskManagerSpec.scala
+++ 
b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskManagerSpec.scala
@@ -39,7 +39,7 @@ import io.gearpump.streaming.{DAG, LifeTime, 
ProcessorDescription, ProcessorId}
 import io.gearpump.transport.HostPort
 import io.gearpump.util.Graph
 import io.gearpump.util.Graph._
-import io.gearpump.{Message, TimeStamp}
+import io.gearpump.{WorkerId, Message, TimeStamp}
 import org.mockito.Mockito._
 import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
 
@@ -65,7 +65,7 @@ class TaskManagerSpec extends FlatSpec with Matchers with 
BeforeAndAfterEach {
   val appId = 0
 
   val resource = Resource(2)
-  val workerId = 0
+  val workerId = WorkerId(0, 0L)
   val executorId = 0
 
   override def beforeEach(): Unit = {
@@ -166,7 +166,7 @@ class TaskManagerSpec extends FlatSpec with Matchers with 
BeforeAndAfterEach {
 
     // step2: Get Additional Resource Request
     when(scheduler.getRequestDetails())
-        .thenReturn(Future{Array(ResourceRequestDetail(mockJar, 
Array(ResourceRequest(resource))))})
+        .thenReturn(Future{Array(ResourceRequestDetail(mockJar, 
Array(ResourceRequest(resource, WorkerId.unspecified))))})
 
     // step3: DAG changed. Start transit from ApplicationReady -> DynamicDAG
     dagManager.expectMsg(GetLatestDAG)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
 
b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
index aeef61d..d2373ea 100644
--- 
a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
+++ 
b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
@@ -21,7 +21,7 @@ import com.typesafe.config.ConfigFactory
 import io.gearpump.streaming.Constants
 import io.gearpump.streaming.appmaster.TaskLocator.Localities
 import io.gearpump.streaming.task.{StartTime, TaskContext, TaskId}
-import io.gearpump.Message
+import io.gearpump.{WorkerId, Message}
 import io.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceRequest}
 import io.gearpump.cluster.{TestUtil, ClusterConfig, UserConfig}
 import io.gearpump.partitioner.{HashPartitioner, Partitioner}
@@ -47,8 +47,8 @@ class TaskSchedulerSpec extends WordSpec with Matchers {
     "schedule tasks on different workers properly according user's 
configuration" in {
 
       val localities = Localities(
-        Map(1 -> Array(TaskId(0,0), TaskId(0,1), TaskId(1,0), TaskId(1,1)),
-            2 -> Array(TaskId(0,2), TaskId(0,3))
+        Map(WorkerId(1, 0L) -> Array(TaskId(0,0), TaskId(0,1), TaskId(1,0), 
TaskId(1,1)),
+          WorkerId(2, 0L) -> Array(TaskId(0,2), TaskId(0,3))
       ))
 
       val localityConfig = 
ConfigFactory.parseString(Localities.toJson(localities))
@@ -59,8 +59,8 @@ class TaskSchedulerSpec extends WordSpec with Matchers {
         config.withValue(s"$GEARPUMP_STREAMING_LOCALITIES.$appName", 
localityConfig.root))
 
       val expectedRequests =
-        Array( ResourceRequest(Resource(4), 1, relaxation = 
Relaxation.SPECIFICWORKER),
-          ResourceRequest(Resource(2), 2, relaxation = 
Relaxation.SPECIFICWORKER))
+        Array( ResourceRequest(Resource(4), WorkerId(1, 0L), relaxation = 
Relaxation.SPECIFICWORKER),
+          ResourceRequest(Resource(2), WorkerId(2, 0L), relaxation = 
Relaxation.SPECIFICWORKER))
 
       taskScheduler.setDAG(dag)
       val resourceRequests = taskScheduler.getResourceRequests()
@@ -71,14 +71,14 @@ class TaskSchedulerSpec extends WordSpec with Matchers {
       val tasksOnWorker1 = ArrayBuffer[Int]()
       val tasksOnWorker2 = ArrayBuffer[Int]()
       for (i <- 0 until 4) {
-        tasksOnWorker1.append(taskScheduler.schedule(1, executorId = 0, 
Resource(1)).head.processorId)
+        tasksOnWorker1.append(taskScheduler.schedule(WorkerId(1, 0L), 
executorId = 0, Resource(1)).head.processorId)
       }
       for (i <- 0 until 2) {
-        tasksOnWorker2.append(taskScheduler.schedule(2, executorId = 1, 
Resource(1)).head.processorId)
+        tasksOnWorker2.append(taskScheduler.schedule(WorkerId(2, 0L), 
executorId = 1, Resource(1)).head.processorId)
       }
 
       //allocate more resource, and no tasks to launch
-      assert(taskScheduler.schedule(3, executorId = 3, Resource(1)) == 
List.empty[TaskId])
+      assert(taskScheduler.schedule(WorkerId(3, 0L), executorId = 3, 
Resource(1)) == List.empty[TaskId])
 
       //on worker1, executor 0
       assert(tasksOnWorker1.sorted.sameElements(Array(0, 0, 1, 1)))
@@ -88,9 +88,9 @@ class TaskSchedulerSpec extends WordSpec with Matchers {
 
       val rescheduledResources = taskScheduler.executorFailed(executorId = 1)
 
-      
assert(rescheduledResources.sameElements(Array(ResourceRequest(Resource(2), 
relaxation = Relaxation.ONEWORKER))))
+      
assert(rescheduledResources.sameElements(Array(ResourceRequest(Resource(2), 
WorkerId.unspecified, relaxation = Relaxation.ONEWORKER))))
 
-      val launchedTask = taskScheduler.schedule(workerId  = 3, executorId = 3, 
Resource(2))
+      val launchedTask = taskScheduler.schedule(WorkerId(3, 0L), executorId = 
3, Resource(2))
 
       //start the failed 2 tasks Task(0, 0) and Task(0, 1)
       assert(launchedTask.length == 2)
@@ -101,11 +101,11 @@ class TaskSchedulerSpec extends WordSpec with Matchers {
       val taskScheduler = new TaskSchedulerImpl(appId = 0, appName, config)
 
       val expectedRequests =
-        Array( ResourceRequest(Resource(4), 1, relaxation = 
Relaxation.SPECIFICWORKER),
-          ResourceRequest(Resource(2), 2, relaxation = 
Relaxation.SPECIFICWORKER))
+        Array( ResourceRequest(Resource(4), WorkerId(1, 0L), relaxation = 
Relaxation.SPECIFICWORKER),
+          ResourceRequest(Resource(2), WorkerId(2, 0L), relaxation = 
Relaxation.SPECIFICWORKER))
 
       taskScheduler.setDAG(dag)
-      val tasks = taskScheduler.schedule(1, executorId = 0, Resource(4))
+      val tasks = taskScheduler.schedule(WorkerId(1, 0L), executorId = 0, 
Resource(4))
       assert(tasks.filter(_.processorId == 0).length == 2)
       assert(tasks.filter(_.processorId == 1).length == 2)
     }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e7a7f542/streaming/src/test/scala/io/gearpump/streaming/executor/ExecutorSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/io/gearpump/streaming/executor/ExecutorSpec.scala 
b/streaming/src/test/scala/io/gearpump/streaming/executor/ExecutorSpec.scala
index 1126ce0..151a188 100644
--- a/streaming/src/test/scala/io/gearpump/streaming/executor/ExecutorSpec.scala
+++ b/streaming/src/test/scala/io/gearpump/streaming/executor/ExecutorSpec.scala
@@ -19,6 +19,7 @@ package io.gearpump.streaming.executor
 
 import akka.actor.{ActorSystem, Props}
 import akka.testkit.TestProbe
+import io.gearpump.WorkerId
 import io.gearpump.cluster.appmaster.WorkerInfo
 import io.gearpump.cluster.scheduler.Resource
 import io.gearpump.cluster.{ExecutorContext, TestUtil, UserConfig}
@@ -39,7 +40,7 @@ import scala.language.postfixOps
 class ExecutorSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
   val appId = 0
   val executorId = 0
-  val workerId = 0
+  val workerId = WorkerId(0, 0L)
   var appMaster: TestProbe = null
   implicit var system: ActorSystem = null
   val userConf = UserConfig.empty

Reply via email to