http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/appmaster/ClockService.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/appmaster/ClockService.scala 
b/streaming/src/main/scala/io/gearpump/streaming/appmaster/ClockService.scala
deleted file mode 100644
index bd18bdc..0000000
--- 
a/streaming/src/main/scala/io/gearpump/streaming/appmaster/ClockService.scala
+++ /dev/null
@@ -1,447 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.appmaster
-
-import java.util
-import java.util.Date
-import java.util.concurrent.TimeUnit
-import scala.concurrent.Future
-import scala.concurrent.duration.FiniteDuration
-import scala.language.implicitConversions
-
-import akka.actor.{Actor, Cancellable, Stash}
-import org.slf4j.Logger
-
-import io.gearpump.TimeStamp
-import io.gearpump.cluster.ClientToMaster.GetStallingTasks
-import io.gearpump.google.common.primitives.Longs
-import io.gearpump.streaming.AppMasterToMaster.StallingTasks
-import io.gearpump.streaming._
-import io.gearpump.streaming.appmaster.ClockService.HealthChecker.ClockValue
-import io.gearpump.streaming.appmaster.ClockService._
-import io.gearpump.streaming.storage.AppDataStore
-import io.gearpump.streaming.task._
-import io.gearpump.util.LogUtil
-
-/**
- * Maintains a global view of message timestamp in the application
- */
-class ClockService(private var dag: DAG, store: AppDataStore) extends Actor 
with Stash {
-  private val LOG: Logger = LogUtil.getLogger(getClass)
-
-  import context.dispatcher
-
-  private val healthChecker = new HealthChecker(stallingThresholdSeconds = 60)
-  private var healthCheckScheduler: Cancellable = null
-  private var snapshotScheduler: Cancellable = null
-
-  override def receive: Receive = null
-
-  override def preStart(): Unit = {
-    LOG.info("Initializing Clock service, get snapshotted StartClock ....")
-    store.get(START_CLOCK).asInstanceOf[Future[TimeStamp]].map { clock =>
-      val startClock = Option(clock).getOrElse(0L)
-
-      minCheckpointClock = Some(startClock)
-
-      // Recover the application by restarting from last persisted startClock.
-      // Only messge after startClock will be replayed.
-      self ! StoredStartClock(startClock)
-      LOG.info(s"Start Clock Retrieved, starting ClockService, startClock: 
$startClock")
-    }
-
-    context.become(waitForStartClock)
-  }
-
-  override def postStop(): Unit = {
-    Option(healthCheckScheduler).map(_.cancel)
-    Option(snapshotScheduler).map(_.cancel)
-  }
-
-  // Keep track of clock value of all processors.
-  private var clocks = Map.empty[ProcessorId, ProcessorClock]
-
-  // Each process can have multiple upstream processors. This keep track of 
the upstream clocks.
-  private var upstreamClocks = Map.empty[ProcessorId, Array[ProcessorClock]]
-
-  // We use Array instead of List for Performance consideration
-  private var processorClocks = Array.empty[ProcessorClock]
-
-  private var checkpointClocks: Map[TaskId, Vector[TimeStamp]] = null
-
-  private var minCheckpointClock: Option[TimeStamp] = None
-
-  private def checkpointEnabled(processor: ProcessorDescription): Boolean = {
-    val taskConf = processor.taskConf
-    taskConf != null && taskConf.getBoolean("state.checkpoint.enable") == 
Some(true)
-  }
-
-  private def resetCheckpointClocks(dag: DAG, startClock: TimeStamp): Unit = {
-    this.checkpointClocks = dag.processors.filter(startClock < _._2.life.death)
-      .filter { case (_, processor) =>
-        checkpointEnabled(processor)
-      }.flatMap { case (id, processor) =>
-      (0 until processor.parallelism).map(TaskId(id, _) -> 
Vector.empty[TimeStamp])
-    }
-    if (this.checkpointClocks.isEmpty) {
-      minCheckpointClock = None
-    }
-  }
-
-  private def initDag(startClock: TimeStamp): Unit = {
-    recoverDag(this.dag, startClock)
-  }
-
-  private def recoverDag(dag: DAG, startClock: TimeStamp): Unit = {
-    this.clocks = dag.processors.filter(startClock < _._2.life.death).
-      map { pair =>
-        val (processorId, processor) = pair
-        val parallelism = processor.parallelism
-        val clock = new ProcessorClock(processorId, processor.life, 
parallelism)
-        clock.init(startClock)
-        (processorId, clock)
-      }
-
-    this.upstreamClocks = clocks.map { pair =>
-      val (processorId, processor) = pair
-
-      val upstreams = dag.graph.incomingEdgesOf(processorId).map(_._1)
-      val upstreamClocks = upstreams.flatMap(clocks.get(_))
-      (processorId, upstreamClocks.toArray)
-    }
-
-    this.processorClocks = clocks.toArray.map(_._2)
-
-    resetCheckpointClocks(dag, startClock)
-  }
-
-  private def dynamicDAG(dag: DAG, startClock: TimeStamp): Unit = {
-    val newClocks = dag.processors.filter(startClock < _._2.life.death).
-      map { pair =>
-        val (processorId, processor) = pair
-        val parallelism = processor.parallelism
-
-        val clock = if (clocks.contains(processor.id)) {
-          clocks(processorId).copy(life = processor.life)
-        } else {
-          new ProcessorClock(processorId, processor.life, parallelism)
-        }
-        (processorId, clock)
-      }
-
-    this.clocks = newClocks
-
-    this.upstreamClocks = newClocks.map { pair =>
-      val (processorId, processor) = pair
-
-      val upstreams = dag.graph.incomingEdgesOf(processorId).map(_._1)
-      val upstreamClocks = upstreams.flatMap(newClocks.get(_))
-      (processorId, upstreamClocks.toArray)
-    }
-
-    // Inits the clock of all processors.
-    newClocks.map { pair =>
-      val (processorId, processorClock) = pair
-      val upstreamClock = getUpStreamMinClock(processorId)
-      val birth = processorClock.life.birth
-
-      if (dag.graph.inDegreeOf(processorId) == 0) {
-        processorClock.init(Longs.max(birth, startClock))
-      } else {
-        processorClock.init(upstreamClock)
-      }
-    }
-
-    this.processorClocks = clocks.toArray.map(_._2)
-
-    resetCheckpointClocks(dag, startClock)
-  }
-
-  def waitForStartClock: Receive = {
-    case StoredStartClock(startClock) =>
-      initDag(startClock)
-
-      import context.dispatcher
-
-      // Period report current clock
-      healthCheckScheduler = context.system.scheduler.schedule(
-        new FiniteDuration(5, TimeUnit.SECONDS),
-        new FiniteDuration(60, TimeUnit.SECONDS), self, HealthCheck)
-
-      // Period snpashot latest min startclock to external storage
-      snapshotScheduler = context.system.scheduler.schedule(new 
FiniteDuration(5, TimeUnit.SECONDS),
-        new FiniteDuration(5, TimeUnit.SECONDS), self, SnapshotStartClock)
-
-      unstashAll()
-      context.become(clockService)
-
-    case _ =>
-      stash()
-  }
-
-  private def getUpStreamMinClock(processorId: ProcessorId): TimeStamp = {
-    val clocks = upstreamClocks.get(processorId)
-    if (clocks.isDefined) {
-      if (clocks.get == null || clocks.get.length == 0) {
-        Long.MaxValue
-      } else {
-        ProcessorClocks.minClock(clocks.get)
-      }
-    } else {
-      Long.MaxValue
-    }
-  }
-
-  def clockService: Receive = {
-    case GetUpstreamMinClock(task) =>
-      sender ! UpstreamMinClock(getUpStreamMinClock(task.processorId))
-
-    case update@UpdateClock(task, clock) =>
-      val upstreamMinClock = getUpStreamMinClock(task.processorId)
-
-      val processorClock = clocks.get(task.processorId)
-      if (processorClock.isDefined) {
-        processorClock.get.updateMinClock(task.index, clock)
-      } else {
-        LOG.error(s"Cannot updateClock for task $task")
-      }
-      sender ! UpstreamMinClock(upstreamMinClock)
-
-    case GetLatestMinClock =>
-      sender ! LatestMinClock(minClock)
-
-    case GetStartClock =>
-      sender ! StartClock(getStartClock)
-
-    case deathCheck: CheckProcessorDeath =>
-      val processorId = deathCheck.processorId
-      val processorClock = clocks.get(processorId)
-      if (processorClock.isDefined) {
-        val life = processorClock.get.life
-        if (processorClock.get.min >= life.death) {
-
-          LOG.info(s"Removing $processorId from clock service...")
-          removeProcessor(processorId)
-        } else {
-          LOG.info(s"Unsuccessfully in removing $processorId from clock 
service...," +
-            s" min: ${processorClock.get.min}, life: $life")
-        }
-      }
-    case HealthCheck =>
-      selfCheck()
-
-    case SnapshotStartClock =>
-      snapshotStartClock()
-
-    case ReportCheckpointClock(task, time) =>
-      updateCheckpointClocks(task, time)
-
-    case GetCheckpointClock =>
-      sender ! CheckpointClock(minCheckpointClock)
-
-    case getStalling: GetStallingTasks =>
-      sender ! StallingTasks(healthChecker.getReport.stallingTasks)
-
-    case ChangeToNewDAG(dag) =>
-      if (dag.version > this.dag.version) {
-        // Transits to a new dag version
-        this.dag = dag
-        dynamicDAG(dag, getStartClock)
-      } else {
-        // Restarts current dag.
-        recoverDag(dag, getStartClock)
-      }
-      LOG.info(s"Change to new DAG(dag = ${dag.version}), send back 
ChangeToNewDAGSuccess")
-      sender ! ChangeToNewDAGSuccess(clocks.map { pair =>
-        val (id, clock) = pair
-        (id, clock.min)
-      })
-  }
-
-  private def removeProcessor(processorId: ProcessorId): Unit = {
-    clocks = clocks - processorId
-    processorClocks = processorClocks.filter(_.processorId != processorId)
-
-    upstreamClocks = upstreamClocks.map { pair =>
-      val (id, upstreams) = pair
-      val updatedUpstream = upstreams.filter(_.processorId != processorId)
-      (id, updatedUpstream)
-    }
-
-    upstreamClocks = upstreamClocks - processorId
-
-    // Removes dead processor from checkpoints.
-    checkpointClocks = checkpointClocks.filter { kv =>
-      val (taskId, processor) = kv
-      taskId.processorId != processorId
-    }
-  }
-
-  private def minClock: TimeStamp = {
-    ProcessorClocks.minClock(processorClocks)
-  }
-
-  def selfCheck(): Unit = {
-    val minTimestamp = minClock
-
-    if (Long.MaxValue == minTimestamp) {
-      processorClocks.foreach { clock =>
-        LOG.info(s"Processor ${clock.processorId} Clock: min: ${clock.min}, " +
-          s"taskClocks: " + clock.taskClocks.mkString(","))
-      }
-    }
-
-    healthChecker.check(minTimestamp, clocks, dag, System.currentTimeMillis())
-  }
-
-  private def getStartClock: TimeStamp = {
-    minCheckpointClock.getOrElse(minClock)
-  }
-
-  private def snapshotStartClock(): Unit = {
-    store.put(START_CLOCK, getStartClock)
-  }
-
-  private def updateCheckpointClocks(task: TaskId, time: TimeStamp): Unit = {
-    val clocks = checkpointClocks(task) :+ time
-    checkpointClocks += task -> clocks
-
-    if (checkpointClocks.forall(_._2.contains(time))) {
-      minCheckpointClock = Some(time)
-      LOG.info(s"minCheckpointTime $minCheckpointClock")
-
-      checkpointClocks = checkpointClocks.mapValues(_.dropWhile(_ <= time))
-    }
-  }
-}
-
-object ClockService {
-  val START_CLOCK = "startClock"
-
-  case object HealthCheck
-
-  class ProcessorClock(val processorId: ProcessorId, val life: LifeTime, val 
parallism: Int,
-      private var _min: TimeStamp = 0L, private var _taskClocks: 
Array[TimeStamp] = null) {
-
-    def copy(life: LifeTime): ProcessorClock = {
-      new ProcessorClock(processorId, life, parallism, _min, _taskClocks)
-    }
-
-    def min: TimeStamp = _min
-    def taskClocks: Array[TimeStamp] = _taskClocks
-
-    def init(startClock: TimeStamp): Unit = {
-      if (taskClocks == null) {
-        this._min = startClock
-        this._taskClocks = new Array(parallism)
-        util.Arrays.fill(taskClocks, startClock)
-      }
-    }
-
-    def updateMinClock(taskIndex: Int, clock: TimeStamp): Unit = {
-      taskClocks(taskIndex) = clock
-      _min = Longs.min(taskClocks: _*)
-    }
-  }
-
-  case object SnapshotStartClock
-
-  case class Report(stallingTasks: List[TaskId])
-
-  /**
-   * Check whether the clock is advancing normally
-   */
-  class HealthChecker(stallingThresholdSeconds: Int) {
-    private val LOG: Logger = LogUtil.getLogger(getClass)
-
-    private var minClock: ClockValue = null
-    private val stallingThresholdMilliseconds = stallingThresholdSeconds * 1000
-    // 60 seconds
-    private var stallingTasks = Array.empty[TaskId]
-
-    /** Check for stalling tasks */
-    def check(
-        currentMinClock: TimeStamp, processorClocks: Map[ProcessorId, 
ProcessorClock],
-        dag: DAG, now: TimeStamp): Unit = {
-      var isClockStalling = false
-      if (null == minClock || currentMinClock > minClock.appClock) {
-        minClock = ClockValue(systemClock = now, appClock = currentMinClock)
-      } else {
-        // Clock not advancing
-        if (now > minClock.systemClock + stallingThresholdMilliseconds) {
-          LOG.warn(s"Clock has not advanced for ${(now - minClock.systemClock) 
/ 1000} seconds " +
-            s"since ${minClock.prettyPrint}...")
-          isClockStalling = true
-        }
-      }
-
-      if (isClockStalling) {
-        val processorId = 
dag.graph.topologicalOrderWithCirclesIterator.toList.find { processorId =>
-          val clock = processorClocks.get(processorId)
-          if (clock.isDefined) {
-            clock.get.min == minClock.appClock
-          } else {
-            false
-          }
-        }
-
-        processorId.foreach { processorId =>
-          val processorClock = processorClocks(processorId)
-          val taskClocks = processorClock.taskClocks
-          stallingTasks = taskClocks.zipWithIndex.filter(_._1 == 
minClock.appClock).
-            map(pair => TaskId(processorId, pair._2))
-        }
-        LOG.info(s"Stalling Tasks: ${stallingTasks.mkString(",")}")
-      } else {
-        stallingTasks = Array.empty[TaskId]
-      }
-    }
-
-    def getReport: Report = {
-      Report(stallingTasks.toList)
-    }
-  }
-
-  object HealthChecker {
-    case class ClockValue(systemClock: TimeStamp, appClock: TimeStamp) {
-      def prettyPrint: String = {
-        "(system clock: " + new Date(systemClock).toString + ", app clock: " + 
appClock + ")"
-      }
-    }
-  }
-
-  object ProcessorClocks {
-
-    // Get the Min clock of all processors
-    def minClock(clock: Array[ProcessorClock]): TimeStamp = {
-      var i = 0
-      var min = if (clock.length == 0) 0L else clock(0).min
-      while (i < clock.length) {
-        min = Math.min(min, clock(i).min)
-        i += 1
-      }
-      min
-    }
-  }
-
-  case class ChangeToNewDAG(dag: DAG)
-  case class ChangeToNewDAGSuccess(clocks: Map[ProcessorId, TimeStamp])
-
-  case class StoredStartClock(clock: TimeStamp)
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/appmaster/DagManager.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/appmaster/DagManager.scala 
b/streaming/src/main/scala/io/gearpump/streaming/appmaster/DagManager.scala
deleted file mode 100644
index f18b387..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/DagManager.scala
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.appmaster
-
-import scala.concurrent.Future
-
-import akka.actor.{Actor, ActorRef, Stash}
-import org.slf4j.Logger
-
-import io.gearpump.cluster.UserConfig
-import io.gearpump.partitioner.PartitionerDescription
-import io.gearpump.streaming._
-import io.gearpump.streaming.appmaster.DagManager._
-import io.gearpump.streaming.storage.AppDataStore
-import io.gearpump.streaming.task.Subscriber
-import io.gearpump.util.{Graph, LogUtil}
-
-/**
- * Handles dag modification and other stuff related with DAG
- *
- * DagManager maintains multiple version of DAGs. For each version, the DAG is 
immutable.
- * For operations like modifying a processor, it will create a new version of 
DAG.
- */
-class DagManager(appId: Int, userConfig: UserConfig, store: AppDataStore, dag: 
Option[DAG])
-  extends Actor with Stash {
-
-  import context.dispatcher
-  private val LOG: Logger = LogUtil.getLogger(getClass, app = appId)
-  private val NOT_INITIALIZED = -1
-
-  private var dags = List.empty[DAG]
-  private var maxProcessorId = -1
-  private implicit val system = context.system
-
-  private var watchers = List.empty[ActorRef]
-
-  override def receive: Receive = null
-
-  override def preStart(): Unit = {
-    LOG.info("Initializing Dag Service, get stored Dag ....")
-    store.get(StreamApplication.DAG).asInstanceOf[Future[DAG]].map { storedDag 
=>
-      if (storedDag != null) {
-        dags :+= storedDag
-      } else {
-        dags :+= 
dag.getOrElse(DAG(userConfig.getValue[Graph[ProcessorDescription,
-          PartitionerDescription]](StreamApplication.DAG).get))
-      }
-      maxProcessorId = {
-        val keys = dags.head.processors.keys
-        if (keys.size == 0) {
-          0
-        } else {
-          keys.max
-        }
-      }
-      self ! DagInitiated
-    }
-    context.become(waitForDagInitiate)
-  }
-
-  def waitForDagInitiate: Receive = {
-    case DagInitiated =>
-      unstashAll()
-      context.become(dagService)
-    case _ =>
-      stash()
-  }
-
-  private def nextProcessorId: ProcessorId = {
-    maxProcessorId += 1
-    maxProcessorId
-  }
-
-  private def taskLaunchData(dag: DAG, processorId: Int, context: AnyRef): 
TaskLaunchData = {
-    val processorDescription = dag.processors(processorId)
-    val subscribers = Subscriber.of(processorId, dag)
-    TaskLaunchData(processorDescription, subscribers, context)
-  }
-
-  def dagService: Receive = {
-    case GetLatestDAG =>
-      // Get the latest version of DAG.
-      sender ! LatestDAG(dags.last)
-    case GetTaskLaunchData(version, processorId, context) =>
-      // Task information like Processor class, downstream subscriber 
processors and etc.
-      dags.find(_.version == version).foreach { dag =>
-        LOG.info(s"Get task launcher data for processor: $processorId, 
dagVersion: $version")
-        sender ! taskLaunchData(dag, processorId, context)
-      }
-    case ReplaceProcessor(oldProcessorId, inputNewProcessor) =>
-      // Replace a processor with new implementation. The upstream processors 
and downstream
-      // processors are NOT changed.
-      var newProcessor = inputNewProcessor.copy(id = nextProcessorId)
-      if (inputNewProcessor.jar == null) {
-        val oldJar = dags.last.processors.get(oldProcessorId).get
-        newProcessor = newProcessor.copy(jar = oldJar.jar)
-      }
-      if (dags.length > 1) {
-        sender ! DAGOperationFailed(
-          "We are in the process of handling previous dynamic dag change")
-      } else {
-        val oldDAG = dags.last
-        val newVersion = oldDAG.version + 1
-        val newDAG = replaceDAG(oldDAG, oldProcessorId, newProcessor, 
newVersion)
-        dags :+= newDAG
-
-        LOG.info(s"ReplaceProcessor old: $oldProcessorId, new: $newProcessor")
-        LOG.info(s"new DAG: $newDAG")
-        watchers.foreach(_ ! LatestDAG(newDAG))
-        sender ! DAGOperationSuccess
-      }
-
-    case WatchChange(watcher) =>
-      // Checks whether there are modifications for this DAG.
-      if (!this.watchers.contains(watcher)) {
-        this.watchers :+= watcher
-      }
-
-    case NewDAGDeployed(dagVersion) =>
-      // Means dynamic Dag transition completed, and the new DAG version has 
been successfully
-      // deployed. The obsolete dag versions will be removed.
-      if (dagVersion != NOT_INITIALIZED) {
-        dags = dags.filter(_.version == dagVersion)
-        store.put(StreamApplication.DAG, dags.last)
-      }
-  }
-
-  private def replaceDAG(
-      dag: DAG, oldProcessorId: ProcessorId, newProcessor: 
ProcessorDescription, newVersion: Int)
-    : DAG = {
-    val oldProcessorLife = LifeTime(dag.processors(oldProcessorId).life.birth,
-      newProcessor.life.birth)
-
-    val newProcessorMap = dag.processors ++
-      Map(oldProcessorId -> dag.processors(oldProcessorId).copy(life = 
oldProcessorLife),
-        newProcessor.id -> newProcessor)
-
-    val newGraph = dag.graph.subGraph(oldProcessorId).
-      replaceVertex(oldProcessorId, newProcessor.id).addGraph(dag.graph)
-    new DAG(newVersion, newProcessorMap, newGraph)
-  }
-}
-
-object DagManager {
-  case object DagInitiated
-
-  case class WatchChange(watcher: ActorRef)
-
-  case object GetLatestDAG
-  case class LatestDAG(dag: DAG)
-
-  case class GetTaskLaunchData(dagVersion: Int, processorId: Int, context: 
AnyRef = null)
-  case class TaskLaunchData(processorDescription : ProcessorDescription,
-      subscribers: List[Subscriber], context: AnyRef = null)
-
-  sealed trait DAGOperation
-
-  case class ReplaceProcessor(oldProcessorId: ProcessorId,
-      newProcessorDescription: ProcessorDescription) extends DAGOperation
-
-  sealed trait DAGOperationResult
-  case object DAGOperationSuccess extends DAGOperationResult
-  case class DAGOperationFailed(reason: String) extends DAGOperationResult
-
-  case class NewDAGDeployed(dagVersion: Int)
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/appmaster/ExecutorManager.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/appmaster/ExecutorManager.scala
 
b/streaming/src/main/scala/io/gearpump/streaming/appmaster/ExecutorManager.scala
deleted file mode 100644
index ab99af5..0000000
--- 
a/streaming/src/main/scala/io/gearpump/streaming/appmaster/ExecutorManager.scala
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.appmaster
-
-import scala.concurrent.duration._
-import scala.util.{Failure, Try}
-
-import akka.actor.SupervisorStrategy.Stop
-import akka.actor._
-import akka.remote.RemoteScope
-import com.typesafe.config.Config
-import org.apache.commons.lang.exception.ExceptionUtils
-
-import io.gearpump.cluster.AppMasterToWorker.ChangeExecutorResource
-import 
io.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSystemJvmConfig, 
ExecutorSystemStarted, StartExecutorSystemTimeout, StartExecutorSystems}
-import io.gearpump.cluster.appmaster.WorkerInfo
-import io.gearpump.cluster.scheduler.{Resource, ResourceRequest}
-import io.gearpump.cluster.worker.WorkerId
-import io.gearpump.cluster.{AppJar, AppMasterContext, ExecutorContext, 
UserConfig}
-import io.gearpump.streaming.ExecutorId
-import io.gearpump.streaming.ExecutorToAppMaster.RegisterExecutor
-import io.gearpump.streaming.appmaster.ExecutorManager._
-import io.gearpump.streaming.executor.Executor
-import io.gearpump.util.{LogUtil, Util}
-
-/**
- * ExecutorManager manage the start and stop of all executors.
- *
- * ExecutorManager will launch Executor when asked. It hide the details like 
starting
- * a new ExecutorSystem from user. Please use ExecutorManager.props() to 
construct this actor
- */
-private[appmaster] class ExecutorManager(
-    userConfig: UserConfig,
-    appContext: AppMasterContext,
-    executorFactory: (ExecutorContext, UserConfig, Address, ExecutorId) => 
Props,
-    clusterConfig: Config,
-    appName: String)
-  extends Actor {
-
-  private val LOG = LogUtil.getLogger(getClass)
-
-  import appContext.{appId, masterProxy, username}
-
-  private var taskManager: ActorRef = null
-
-  private implicit val actorSystem = context.system
-  private val systemConfig = context.system.settings.config
-
-  private var executors = Map.empty[Int, ExecutorInfo]
-
-  def receive: Receive = waitForTaskManager
-
-  def waitForTaskManager: Receive = {
-    case SetTaskManager(taskManager) =>
-      this.taskManager = taskManager
-      context.become(service orElse terminationWatch)
-  }
-
-  // If something wrong on executor, ExecutorManager will stop the current 
executor,
-  // and wait for AppMaster to start a new executor.
-  override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10,
-    withinTimeRange = 1.minute) {
-    case ex: Throwable =>
-      val executorId = Try(sender.path.name.toInt)
-      executorId match {
-        case scala.util.Success(id) => {
-          executors -= id
-          LOG.error(s"Executor $id throws exception, stop it...\n" +
-            ExceptionUtils.getStackTrace(ex))
-        }
-        case Failure(ex) => {
-          LOG.error(s"Sender ${sender.path} is dead, but seems it is not an 
executor...")
-        }
-      }
-      Stop
-  }
-
-  // Responds to outside queries
-  def service: Receive = {
-    case StartExecutors(resources, jar) =>
-      masterProxy ! StartExecutorSystems(resources, 
getExecutorJvmConfig(Some(jar)))
-    case ExecutorSystemStarted(executorSystem, boundedJar) =>
-      import executorSystem.{address, executorSystemId, resource => 
executorResource, worker}
-
-      val executorId = executorSystemId
-      val executorContext = ExecutorContext(executorId, worker, appId, appName,
-        appMaster = context.parent, executorResource)
-      executors += executorId -> ExecutorInfo(executorId, null, worker, 
boundedJar)
-
-      // Starts executor
-      val executor = context.actorOf(executorFactory(executorContext, 
userConfig,
-        address, executorId), executorId.toString)
-      executorSystem.bindLifeCycleWith(executor)
-    case StartExecutorSystemTimeout =>
-      taskManager ! StartExecutorsTimeOut
-
-    case RegisterExecutor(executor, executorId, resource, worker) =>
-      LOG.info(s"executor $executorId has been launched")
-      // Watches for executor termination
-      context.watch(executor)
-      val executorInfo = executors.get(executorId).get
-      executors += executorId -> executorInfo.copy(executor = executor)
-      taskManager ! ExecutorStarted(executorId, resource, worker.workerId, 
executorInfo.boundedJar)
-
-    // Broadcasts message to all executors
-    case BroadCast(msg) =>
-      LOG.info(s"Broadcast ${msg.getClass.getSimpleName} to all executors")
-      context.children.foreach(_ forward msg)
-
-    // Unicasts message to single executor
-    case UniCast(executorId, msg) =>
-      LOG.info(s"Unicast ${msg.getClass.getSimpleName} to executor 
$executorId")
-      val executor = executors.get(executorId)
-      executor.foreach(_.executor forward msg)
-
-    case GetExecutorInfo =>
-      sender ! executors
-
-    // Tells Executor manager resources that are occupied. The Executor 
Manager can use this
-    // information to tell worker to reclaim un-used resources
-    case ExecutorResourceUsageSummary(resources) =>
-      executors.foreach { pair =>
-        val (executorId, executor) = pair
-        val resource = resources.get(executorId)
-        val worker = executor.worker.ref
-        // Notifies the worker the actual resource used by this application.
-        resource match {
-          case Some(resource) =>
-            worker ! ChangeExecutorResource(appId, executorId, resource)
-          case None =>
-            worker ! ChangeExecutorResource(appId, executorId, Resource(0))
-        }
-      }
-  }
-
-  def terminationWatch: Receive = {
-    case Terminated(actor) =>
-      val executorId = Try(actor.path.name.toInt)
-      executorId match {
-        case scala.util.Success(id) => {
-          executors -= id
-          LOG.error(s"Executor $id is down")
-          taskManager ! ExecutorStopped(id)
-        }
-        case scala.util.Failure(ex) =>
-          LOG.error(s"failed to get the executor Id from path string 
${actor.path}", ex)
-      }
-  }
-
-  private def getExecutorJvmConfig(jar: Option[AppJar]): 
ExecutorSystemJvmConfig = {
-    val executorAkkaConfig = clusterConfig
-    val jvmSetting = 
Util.resolveJvmSetting(executorAkkaConfig.withFallback(systemConfig)).executor
-
-    ExecutorSystemJvmConfig(jvmSetting.classPath, jvmSetting.vmargs, jar,
-      username, executorAkkaConfig)
-  }
-}
-
-private[appmaster] object ExecutorManager {
-  case class StartExecutors(resources: Array[ResourceRequest], jar: AppJar)
-  case class BroadCast(msg: Any)
-
-  case class UniCast(executorId: Int, msg: Any)
-
-  case object GetExecutorInfo
-
-  case class ExecutorStarted(
-      executorId: Int, resource: Resource, workerId: WorkerId, boundedJar: 
Option[AppJar])
-  case class ExecutorStopped(executorId: Int)
-
-  case class SetTaskManager(taskManager: ActorRef)
-
-  case object StartExecutorsTimeOut
-
-  def props(
-      userConfig: UserConfig, appContext: AppMasterContext, clusterConfig: 
Config, appName: String)
-    : Props = {
-    val executorFactory =
-      (executorContext: ExecutorContext,
-        userConfig: UserConfig,
-        address: Address,
-        executorId: ExecutorId) =>
-        Props(classOf[Executor], executorContext, userConfig)
-          .withDeploy(Deploy(scope = RemoteScope(address)))
-
-    Props(new ExecutorManager(userConfig, appContext, executorFactory, 
clusterConfig, appName))
-  }
-
-  case class ExecutorResourceUsageSummary(resources: Map[ExecutorId, Resource])
-
-  case class ExecutorInfo(
-      executorId: ExecutorId, executor: ActorRef, worker: WorkerInfo, 
boundedJar: Option[AppJar])
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/appmaster/JarScheduler.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/appmaster/JarScheduler.scala 
b/streaming/src/main/scala/io/gearpump/streaming/appmaster/JarScheduler.scala
deleted file mode 100644
index a2a9c74..0000000
--- 
a/streaming/src/main/scala/io/gearpump/streaming/appmaster/JarScheduler.scala
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.appmaster
-
-import scala.concurrent.Future
-
-import akka.actor._
-import akka.pattern.ask
-import com.typesafe.config.Config
-
-import io.gearpump.TimeStamp
-import io.gearpump.cluster.AppJar
-import io.gearpump.cluster.scheduler.{Resource, ResourceRequest}
-import io.gearpump.cluster.worker.WorkerId
-import io.gearpump.partitioner.PartitionerDescription
-import io.gearpump.streaming.appmaster.JarScheduler._
-import io.gearpump.streaming.task.TaskId
-import io.gearpump.streaming.{DAG, ProcessorDescription}
-import io.gearpump.util.{Constants, Graph, LogUtil}
-
-/**
- * Different processors of the stream application can use different jars. 
JarScheduler is the
- * scheduler for different jars.
- *
- * For a DAG of multiple processors, each processor can have its own jar. 
Tasks of same jar
- * is scheduled by TaskScheduler, and TaskSchedulers are scheduled by 
JarScheduler.
- *
- * In runtime, the implementation is delegated to actor JarSchedulerImpl
- */
-class JarScheduler(appId: Int, appName: String, config: Config, factory: 
ActorRefFactory) {
-  private val actor: ActorRef = factory.actorOf(Props(new 
JarSchedulerImpl(appId, appName, config)))
-  private implicit val dispatcher = factory.dispatcher
-  private implicit val timeout = Constants.FUTURE_TIMEOUT
-
-  /** Set the current DAG version active */
-  def setDag(dag: DAG, startClock: Future[TimeStamp]): Unit = {
-    actor ! TransitToNewDag
-    startClock.map { start =>
-      actor ! NewDag(dag, start)
-    }
-  }
-
-  /** AppMaster ask JarScheduler about how many resource it wants */
-  def getResourceRequestDetails(): Future[Array[ResourceRequestDetail]] = {
-    (actor ? 
GetResourceRequestDetails).asInstanceOf[Future[Array[ResourceRequestDetail]]]
-  }
-
-  /**
-   * AppMaster has resource allocated, and ask the jar scheduler to schedule 
tasks
-   * for this executor.
-   */
-  def scheduleTask(appJar: AppJar, workerId: WorkerId, executorId: Int, 
resource: Resource)
-  : Future[List[TaskId]] = {
-    (actor ? ScheduleTask(appJar, workerId, executorId, resource))
-      .asInstanceOf[Future[List[TaskId]]]
-  }
-
-  /**
-   * Some executor JVM process is dead. AppMaster asks jar scheduler to 
re-schedule the impacted
-   * tasks.
-   */
-  def executorFailed(executorId: Int): Future[Option[ResourceRequestDetail]] = 
{
-    (actor ? 
ExecutorFailed(executorId)).asInstanceOf[Future[Option[ResourceRequestDetail]]]
-  }
-}
-
-object JarScheduler {
-
-  case class ResourceRequestDetail(jar: AppJar, requests: 
Array[ResourceRequest])
-
-  case class NewDag(dag: DAG, startTime: TimeStamp)
-
-  case object TransitToNewDag
-
-  case object GetResourceRequestDetails
-
-  /**
-   * Schedule tasks for one appJar.
-   *
-   * @param appJar Application jar.
-   * @param workerId Worker machine Id.
-   * @param executorId Executor Id.
-   * @param resource Slots that are available.
-   */
-  case class ScheduleTask(appJar: AppJar, workerId: WorkerId, executorId: Int, 
resource: Resource)
-
-  /** Some executor JVM is dead, try to recover tasks that are located on 
failed executor */
-  case class ExecutorFailed(executorId: Int)
-
-  class JarSchedulerImpl(appId: Int, appName: String, config: Config) extends 
Actor with Stash {
-
-    // Each TaskScheduler maps to a jar.
-    private var taskSchedulers = Map.empty[AppJar, TaskScheduler]
-
-    private val LOG = LogUtil.getLogger(getClass)
-
-    def receive: Receive = waitForNewDag
-
-    def waitForNewDag: Receive = {
-      case TransitToNewDag => // Continue current state
-      case NewDag(dag, startTime) =>
-
-        LOG.info(s"Init JarScheduler, dag version: ${dag.version}, startTime: 
$startTime")
-
-        val processors = dag.processors.values.groupBy(_.jar)
-
-        taskSchedulers = processors.map { jarAndProcessors =>
-          val (jar, processors) = jarAndProcessors
-          // Construct the sub DAG, each sub DAG maps to a separate jar.
-          val subGraph = Graph.empty[ProcessorDescription, 
PartitionerDescription]
-          processors.foreach { processor =>
-            if (startTime < processor.life.death) {
-              subGraph.addVertex(processor)
-            }
-          }
-          val subDagForSingleJar = DAG(subGraph)
-
-          val taskScheduler = taskSchedulers
-            .getOrElse(jar, new TaskSchedulerImpl(appId, appName, config))
-
-          LOG.info(s"Set DAG for TaskScheduler, count: " + 
subDagForSingleJar.processors.size)
-          taskScheduler.setDAG(subDagForSingleJar)
-          jar -> taskScheduler
-        }
-        unstashAll()
-        context.become(ready)
-      case other =>
-        stash()
-    }
-
-    def ready: Receive = {
-      // Notifies there is a new DAG coming.
-      case TransitToNewDag =>
-        context.become(waitForNewDag)
-
-      case GetResourceRequestDetails =>
-
-        // Asks each TaskScheduler (Each for one jar) the resource requests.
-        val result: Array[ResourceRequestDetail] = taskSchedulers.map { 
jarAndScheduler =>
-          val (jar, scheduler) = jarAndScheduler
-          ResourceRequestDetail(jar, scheduler.getResourceRequests())
-        }.toArray
-        LOG.info(s"GetRequestDetails " + result.mkString(";"))
-        sender ! result
-
-      case ScheduleTask(appJar, workerId, executorId, resource) =>
-        val result: List[TaskId] = taskSchedulers.get(appJar).map { scheduler 
=>
-          scheduler.schedule(workerId, executorId, resource)
-        }.getOrElse(List.empty)
-        LOG.info(s"ScheduleTask " + result.mkString(";"))
-        sender ! result
-      case ExecutorFailed(executorId) =>
-        val result: Option[ResourceRequestDetail] = taskSchedulers.
-          find(_._2.scheduledTasks(executorId).nonEmpty).map { jarAndScheduler 
=>
-          ResourceRequestDetail(jarAndScheduler._1, 
jarAndScheduler._2.executorFailed(executorId))
-        }
-        LOG.info(s"ExecutorFailed " + result.mkString(";"))
-        sender ! result
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/appmaster/StreamAppMasterSummary.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/appmaster/StreamAppMasterSummary.scala
 
b/streaming/src/main/scala/io/gearpump/streaming/appmaster/StreamAppMasterSummary.scala
deleted file mode 100644
index 3b2c8bf..0000000
--- 
a/streaming/src/main/scala/io/gearpump/streaming/appmaster/StreamAppMasterSummary.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.appmaster
-
-import io.gearpump._
-import io.gearpump.cluster.AppMasterToMaster.AppMasterSummary
-import io.gearpump.cluster.MasterToAppMaster.AppMasterStatus
-import io.gearpump.cluster.{MasterToAppMaster, UserConfig}
-import io.gearpump.streaming.appmaster.AppMaster.ExecutorBrief
-import io.gearpump.streaming.{ExecutorId, LifeTime, ProcessorId}
-import io.gearpump.util.Graph
-import io.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
-
-/** Stream application summary, used for REST API */
-case class StreamAppMasterSummary(
-    appType: String = "streaming",
-    appId: Int,
-    appName: String = null,
-    actorPath: String = null,
-    clock: TimeStamp = 0L,
-    status: AppMasterStatus = MasterToAppMaster.AppMasterActive,
-    startTime: TimeStamp = 0L,
-    uptime: TimeStamp = 0L,
-    user: String = null,
-    homeDirectory: String = "",
-    logFile: String = "",
-    dag: Graph[ProcessorId, String] = null,
-    executors: List[ExecutorBrief] = null,
-    processors: Map[ProcessorId, ProcessorSummary] = Map.empty[ProcessorId, 
ProcessorSummary],
-    // Hiearachy level for each processor
-    processorLevels: Map[ProcessorId, Int] = Map.empty[ProcessorId, Int],
-    historyMetricsConfig: HistoryMetricsConfig = null)
-  extends AppMasterSummary
-
-case class TaskCount(count: Int)
-
-case class ProcessorSummary(
-    id: ProcessorId,
-    taskClass: String,
-    parallelism: Int,
-    description: String,
-    taskConf: UserConfig,
-    life: LifeTime,
-    executors: List[ExecutorId],
-    taskCount: Map[ExecutorId, TaskCount])
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskLocator.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskLocator.scala 
b/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskLocator.scala
deleted file mode 100644
index 6e7ebc6..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskLocator.scala
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.appmaster
-
-import scala.collection.JavaConverters._
-import scala.util.Try
-
-import com.typesafe.config.{Config, ConfigFactory, ConfigRenderOptions, 
ConfigValueFactory}
-
-import io.gearpump.cluster.worker.WorkerId
-import io.gearpump.streaming.Constants
-import io.gearpump.streaming.appmaster.TaskLocator.{Localities, Locality, 
NonLocality, WorkerLocality}
-import io.gearpump.streaming.task.TaskId
-
-/**
- * TaskLocator is used to decide which machine one task should run on.
- *
- * User can specify config 
[[io.gearpump.streaming.Constants#GEARPUMP_STREAMING_LOCALITIES]] to
- * decide to control which machine the task is running on.
- */
-class TaskLocator(appName: String, config: Config) {
-  private val taskLocalities: Map[TaskId, Locality] = 
loadTaskLocalities(config)
-
-  /** Finds where a task should belongs to */
-  def locateTask(taskId: TaskId): Locality = {
-    taskLocalities.getOrElse(taskId, NonLocality)
-  }
-
-  private def loadTaskLocalities(config: Config): Map[TaskId, Locality] = {
-    import io.gearpump.streaming.Constants.GEARPUMP_STREAMING_LOCALITIES
-    Try(config.getConfig(s"$GEARPUMP_STREAMING_LOCALITIES.$appName")).map { 
appConfig =>
-      val json = appConfig.root().render(ConfigRenderOptions.concise)
-      Localities.fromJson(json)
-    }.map { localityConfig =>
-      import localityConfig.localities
-      localities.keySet.flatMap { workerId =>
-        val tasks = localities(workerId)
-        tasks.map((_, WorkerLocality(workerId)))
-      }.toArray.toMap
-    }.getOrElse(Map.empty[TaskId, Locality])
-  }
-}
-
-object TaskLocator {
-
-  trait Locality
-
-  /** Means we require the resource from the specific worker */
-  case class WorkerLocality(workerId: WorkerId) extends Locality
-
-  /** Means no preference on worker */
-  object NonLocality extends Locality
-
-  /** Localities settings. Mapping from workerId to list of taskId */
-  case class Localities(localities: Map[WorkerId, Array[TaskId]])
-
-  object Localities {
-    val pattern = "task_([0-9]+)_([0-9]+)".r
-
-    // To avoid polluting the classpath, we do the JSON translation ourself 
instead of
-    // introducing JSON library dependencies directly.
-    def fromJson(json: String): Localities = {
-      val localities = ConfigFactory.parseString(json).getAnyRef("localities")
-        .asInstanceOf[java.util.Map[String, String]].asScala.map { pair =>
-        val workerId: WorkerId = WorkerId.parse(pair._1)
-        val tasks = pair._2.split(",").map { task =>
-          val pattern(processorId, taskIndex) = task
-          TaskId(processorId.toInt, taskIndex.toInt)
-        }
-        (workerId, tasks)
-      }.toMap
-      new Localities(localities)
-    }
-
-    def toJson(localities: Localities): String = {
-      val map = localities.localities.toList.map { pair =>
-        (WorkerId.render(pair._1), pair._2.map(task =>
-          s"task_${task.processorId}_${task.index}").mkString(","))
-      }.toMap.asJava
-      ConfigFactory.empty().withValue("localities", 
ConfigValueFactory.fromAnyRef(map)).
-        root.render(ConfigRenderOptions.concise())
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskManager.scala 
b/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskManager.scala
deleted file mode 100644
index 51f2f9c..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskManager.scala
+++ /dev/null
@@ -1,497 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.appmaster
-
-import scala.concurrent.Future
-import scala.concurrent.duration._
-
-import akka.actor._
-import akka.pattern.ask
-import org.slf4j.Logger
-
-import io.gearpump.TimeStamp
-import 
io.gearpump.cluster.MasterToAppMaster.ReplayFromTimestampWindowTrailingEdge
-import io.gearpump.streaming.AppMasterToExecutor._
-import io.gearpump.streaming.ExecutorToAppMaster.{MessageLoss, RegisterTask, 
UnRegisterTask}
-import io.gearpump.streaming._
-import io.gearpump.streaming.appmaster.AppMaster.{AllocateResourceTimeOut, 
LookupTaskActorRef, TaskActorRef}
-import io.gearpump.streaming.appmaster.ClockService.{ChangeToNewDAG, 
ChangeToNewDAGSuccess}
-import io.gearpump.streaming.appmaster.DagManager.{GetLatestDAG, 
GetTaskLaunchData, LatestDAG, NewDAGDeployed, TaskLaunchData, WatchChange}
-import io.gearpump.streaming.appmaster.ExecutorManager.{ExecutorStarted, 
StartExecutorsTimeOut, _}
-import io.gearpump.streaming.appmaster.TaskManager._
-import io.gearpump.streaming.appmaster.TaskRegistry.{Accept, TaskLocation}
-import io.gearpump.streaming.executor.Executor.RestartTasks
-import io.gearpump.streaming.executor.ExecutorRestartPolicy
-import io.gearpump.streaming.task._
-import io.gearpump.streaming.util.ActorPathUtil
-import io.gearpump.util.{Constants, LogUtil}
-
-/**
- *
- * TaskManager track all tasks's status.
- *
- * It is state machine with three states:
- *  1. applicationReady
- *  2. recovery
- *  3. dynamicDag
- *
- * When in state applicationReady:
- *  1. When there is message-loss or JVM crash, transit to state recovery.
- *  2. When user modify the DAG, transit to dynamicDag.
- *
- * When in state recovery:
- *  1. When all tasks has been recovered, transit to applicationReady.
- *
- * When in state dynamicDag:
- *  1. When dynamic dag transition is complete, transit to applicationReady.
- *  2. When there is message loss or JVM crash, transit to state recovery.
- */
-private[appmaster] class TaskManager(
-    appId: Int,
-    dagManager: ActorRef,
-    jarScheduler: JarScheduler,
-    executorManager: ActorRef,
-    clockService: ActorRef,
-    appMaster: ActorRef,
-    appName: String)
-  extends Actor {
-
-  private val LOG: Logger = LogUtil.getLogger(getClass, app = appId)
-  private val systemConfig = context.system.settings.config
-
-  private val ids = new SessionIdFactory()
-
-  private val executorRestartPolicy = new ExecutorRestartPolicy(maxNrOfRetries 
= 5,
-    withinTimeRange = 20.seconds)
-
-  private implicit val timeout = Constants.FUTURE_TIMEOUT
-  private implicit val actorSystem = context.system
-
-  import context.dispatcher
-
-  dagManager ! WatchChange(watcher = self)
-  executorManager ! SetTaskManager(self)
-
-  private def getStartClock: Future[TimeStamp] = {
-    (clockService ? 
GetStartClock).asInstanceOf[Future[StartClock]].map(_.clock)
-  }
-
-  private var startClock: Future[TimeStamp] = getStartClock
-
-  def receive: Receive = applicationReady(DagReadyState.empty)
-
-  private def onClientQuery(taskRegistry: TaskRegistry): Receive = {
-    case clock: ClockEvent =>
-      clockService forward clock
-    case GetTaskList =>
-      sender ! TaskList(taskRegistry.getTaskExecutorMap)
-    case LookupTaskActorRef(taskId) =>
-      val executorId = taskRegistry.getExecutorId(taskId)
-      val requestor = sender
-      executorId.map { executorId =>
-        val taskPath = ActorPathUtil.taskActorPath(appMaster, executorId, 
taskId)
-        context.actorSelection(taskPath).resolveOne(3.seconds).map { 
taskActorRef =>
-          requestor ! TaskActorRef(taskActorRef)
-        }
-      }
-  }
-
-  /**
-   * State applicationReady
-   */
-  def applicationReady(state: DagReadyState): Receive = {
-    executorManager ! state.taskRegistry.usedResource
-    dagManager ! NewDAGDeployed(state.dag.version)
-    dagManager ! GetLatestDAG
-    LOG.info(s"goto state ApplicationReady(dag = ${state.dag.version})...")
-
-    val recoverRegistry = new TaskRegistry(expectedTasks = state.dag.tasks,
-      deadTasks = state.taskRegistry.deadTasks)
-
-    val recoverState = new StartDagState(state.dag, recoverRegistry)
-
-    val onError: Receive = {
-      case executorStopped@ExecutorStopped(executorId) =>
-        if (state.taskRegistry.isTaskRegisteredForExecutor(executorId)) {
-          self ! executorStopped
-          context.become(recovery(recoverState))
-        }
-      case MessageLoss(executorId, taskId, cause) =>
-        if (state.taskRegistry.isTaskRegisteredForExecutor(executorId) &&
-          executorRestartPolicy.allowRestartExecutor(executorId)) {
-          context.become(recovery(recoverState))
-        } else {
-          val errorMsg = s"Task $taskId fails too many times to recover"
-          appMaster ! FailedToRecover(errorMsg)
-        }
-      case replay: ReplayFromTimestampWindowTrailingEdge =>
-        LOG.error(s"Received $replay")
-        context.become(recovery(recoverState))
-    }
-
-    val onNewDag: Receive = {
-      case LatestDAG(newDag) =>
-
-        if (newDag.version > state.dag.version) {
-
-          executorManager ! BroadCast(StartDynamicDag(newDag.version))
-          LOG.info("Broadcasting StartDynamicDag")
-
-          val dagDiff = migrate(state.dag, newDag)
-          jarScheduler.setDag(newDag, startClock)
-          val resourceRequestsDetails = 
jarScheduler.getResourceRequestDetails()
-          resourceRequestsDetails.map { details =>
-            details.foreach { detail =>
-              if (detail.requests.length > 0 && 
detail.requests.exists(!_.resource.isEmpty)) {
-                executorManager ! StartExecutors(detail.requests, detail.jar)
-              }
-            }
-          }
-
-          var modifiedTasks = List.empty[TaskId]
-          for (processorId <- dagDiff.modifiedProcessors ++ 
dagDiff.impactedUpstream) {
-            val executors = state.taskRegistry.processorExecutors(processorId)
-            executors.foreach { pair =>
-              val (executorId, tasks) = pair
-              modifiedTasks ++= tasks
-              dagManager ! GetTaskLaunchData(newDag.version, processorId,
-                ChangeTasksOnExecutor(executorId, tasks))
-            }
-          }
-
-          val taskChangeRegistry = new TaskChangeRegistry(modifiedTasks)
-
-          val deadTasks = state.taskRegistry.deadTasks
-          val registeredTasks = state.taskRegistry.registeredTasks
-          val dynamicTaskRegistry = new TaskRegistry(newDag.tasks, 
registeredTasks, deadTasks)
-
-          val nextState = new StartDagState(newDag, dynamicTaskRegistry, 
taskChangeRegistry)
-          context.become(dynamicDag(nextState, recoverState))
-        }
-    }
-
-    val onUnRegisterTask: Receive = {
-      case unRegister: UnRegisterTask =>
-
-        LOG.info(s"Received $unRegister, stop task ${unRegister.taskId}")
-        sender ! StopTask(unRegister.taskId)
-
-        val taskId = unRegister.taskId
-        val registry = state.taskRegistry
-        val deadTasks = registry.deadTasks
-
-        val newRegistry = registry.copy(registeredTasks = 
registry.registeredTasks - taskId,
-          deadTasks = deadTasks + taskId)
-
-        val newState = new DagReadyState(state.dag, newRegistry)
-        context.become(applicationReady(newState))
-    }
-
-    // Recovers to same version
-    onClientQuery(state.taskRegistry) orElse onError orElse onNewDag orElse
-      onUnRegisterTask orElse unHandled("applicationReady")
-  }
-
-  /**
-   * State dynamicDag
-   */
-  def dynamicDag(state: StartDagState, recoverState: StartDagState): Receive = 
{
-    LOG.info(s"DynamicDag transit to dag version: ${state.dag.version}...")
-
-    val onMessageLoss: Receive = {
-      case executorStopped@ExecutorStopped(executorId) =>
-        context.become(recovery(recoverState))
-      case MessageLoss(executorId, taskId, cause) =>
-        if (state.taskRegistry.isTaskRegisteredForExecutor(executorId) &&
-          executorRestartPolicy.allowRestartExecutor(executorId)) {
-          context.become(recovery(recoverState))
-        } else {
-          val errorMsg = s"Task $taskId fails too many times to recover"
-          appMaster ! FailedToRecover(errorMsg)
-        }
-    }
-
-    onMessageLoss orElse onClientQuery(state.taskRegistry) orElse
-      startDag(state, recoverState) orElse unHandled("dynamicDag")
-  }
-
-  private def startDag(state: StartDagState, recoverState: StartDagState): 
Receive = {
-    case executor: ExecutorStarted =>
-      import executor.{boundedJar, executorId, resource, workerId}
-      val taskIdsFuture = jarScheduler.scheduleTask(boundedJar.get, workerId, 
executorId, resource)
-      taskIdsFuture.foreach { taskIds =>
-        LOG.info(s"Executor $executor has been started, " +
-          s"start to schedule tasks: ${taskIds.mkString(",")}")
-        taskIds.groupBy(_.processorId).foreach { pair =>
-          val (processorId, tasks) = pair
-          dagManager ! GetTaskLaunchData(state.dag.version, processorId,
-            StartTasksOnExecutor(executor.executorId, tasks))
-        }
-      }
-
-    case StartExecutorsTimeOut =>
-      appMaster ! AllocateResourceTimeOut
-    case TaskLaunchData(processorDescription, subscribers, command) =>
-      command match {
-        case StartTasksOnExecutor(executorId, tasks) =>
-          LOG.info(s"Start tasks on Executor($executorId), tasks: " + tasks)
-          val launchTasks = LaunchTasks(tasks, state.dag.version, 
processorDescription, subscribers)
-          executorManager ! UniCast(executorId, launchTasks)
-          tasks.foreach(executorRestartPolicy.addTaskToExecutor(executorId, _))
-        case ChangeTasksOnExecutor(executorId, tasks) =>
-          LOG.info("change Task on executor: " + executorId + ", tasks: " + 
tasks)
-          val changeTasks = ChangeTasks(tasks, state.dag.version, 
processorDescription.life,
-            subscribers)
-          executorManager ! UniCast(executorId, changeTasks)
-        case other =>
-          LOG.error(s"severe error! we expect ExecutorStarted but get 
${other.getClass.toString}")
-      }
-    case TasksLaunched =>
-    // We will track all launched task by message RegisterTask
-    case TasksChanged(tasks) =>
-      tasks.foreach(task => state.taskChangeRegistry.taskChanged(task))
-
-      if (allTasksReady(state)) {
-        broadcastLocations(state)
-      }
-
-    case RegisterTask(taskId, executorId, host) =>
-      val client = sender
-      val register = state.taskRegistry
-      val status = register.registerTask(taskId, TaskLocation(executorId, 
host))
-      if (status == Accept) {
-        LOG.info(s"RegisterTask($taskId) TaskLocation: $host, Executor: 
$executorId")
-        val sessionId = ids.newSessionId
-
-        startClock.foreach(clock => client ! TaskRegistered(taskId, sessionId, 
clock))
-        if (allTasksReady(state)) {
-          broadcastLocations(state)
-        }
-      } else {
-        sender ! TaskRejected(taskId)
-      }
-
-    case TaskChanged(taskId, dagVersion) =>
-      state.taskChangeRegistry.taskChanged(taskId)
-      if (allTasksReady(state)) {
-        broadcastLocations(state)
-      }
-    case locationReceived: TaskLocationsReceived =>
-      state.executorReadyRegistry.registerExecutor(locationReceived.executorId)
-      if (allTasksReady(state) &&
-        
state.executorReadyRegistry.allRegistered(state.taskRegistry.executors)) {
-        LOG.info("All executors are ready to start...")
-        clockService ! ChangeToNewDAG(state.dag)
-      }
-    case locationRejected: TaskLocationsRejected =>
-      LOG.error(s"received $locationRejected, start to recover")
-      context.become(recovery(recoverState))
-
-    case ChangeToNewDAGSuccess(_) =>
-      if (allTasksReady(state) &&
-        
state.executorReadyRegistry.allRegistered(state.taskRegistry.executors)) {
-        executorManager ! BroadCast(StartAllTasks(state.dag.version))
-        context.become(applicationReady(new DagReadyState(state.dag, 
state.taskRegistry)))
-      }
-  }
-
-  def onExecutorError: Receive = {
-    case ExecutorStopped(executorId) =>
-      if (executorRestartPolicy.allowRestartExecutor(executorId)) {
-        jarScheduler.executorFailed(executorId).foreach { 
resourceRequestDetail =>
-          if (resourceRequestDetail.isDefined) {
-            executorManager ! 
StartExecutors(resourceRequestDetail.get.requests,
-              resourceRequestDetail.get.jar)
-          }
-        }
-      } else {
-        val errorMsg = s"Executor restarted too many times to recover"
-        appMaster ! FailedToRecover(errorMsg)
-      }
-  }
-
-  private def allTasksReady(state: StartDagState): Boolean = {
-    import state.{taskChangeRegistry, taskRegistry}
-    taskRegistry.isAllTasksRegistered && taskChangeRegistry.allTaskChanged
-  }
-
-  private def broadcastLocations(state: StartDagState): Unit = {
-    LOG.info(s"All tasks have been launched; send Task locations to all 
executors")
-    val taskLocations = state.taskRegistry.getTaskLocations
-    executorManager ! BroadCast(TaskLocationsReady(taskLocations, 
state.dag.version))
-  }
-
-  /**
-   * State recovery
-   */
-  def recovery(state: StartDagState): Receive = {
-    val recoverDagVersion = state.dag.version
-    executorManager ! BroadCast(RestartTasks(recoverDagVersion))
-
-    // Use new Start Clock so that we recover at timepoint we fails.
-    startClock = getStartClock
-
-    jarScheduler.setDag(state.dag, startClock)
-
-    LOG.info(s"goto state Recovery(recoverDag = $recoverDagVersion)...")
-    val ignoreClock: Receive = {
-      case clock: ClockEvent =>
-      // Ignores clock events.
-    }
-
-    if (state.dag.isEmpty) {
-      applicationReady(new DagReadyState(state.dag, state.taskRegistry))
-    } else {
-      val registry = new TaskRegistry(expectedTasks = state.dag.tasks,
-        deadTasks = state.taskRegistry.deadTasks)
-
-      val recoverState = new StartDagState(state.dag, registry)
-      ignoreClock orElse startDag(state, recoverState) orElse onExecutorError 
orElse
-        unHandled("recovery")
-    }
-  }
-
-  private def unHandled(state: String): Receive = {
-    case other =>
-      LOG.info(s"Received unknown message $other in state $state")
-  }
-}
-
-private[appmaster] object TaskManager {
-
-  /**
-   * When application is ready, then transit to DagReadyState
-   */
-  class DagReadyState(val dag: DAG, val taskRegistry: TaskRegistry)
-
-  object DagReadyState {
-    def empty: DagReadyState = {
-      new DagReadyState(
-        DAG.empty.copy(version = -1),
-        new TaskRegistry(List.empty[TaskId]))
-    }
-  }
-
-  /**
-   * When application is booting up or doing recovery, it use StartDagState
-   */
-  class StartDagState(
-      val dag: DAG,
-      val taskRegistry: TaskRegistry,
-      val taskChangeRegistry: TaskChangeRegistry = new 
TaskChangeRegistry(List.empty[TaskId]),
-      val executorReadyRegistry: ExecutorRegistry = new ExecutorRegistry)
-
-  case object GetTaskList
-
-  case class TaskList(tasks: Map[TaskId, ExecutorId])
-
-  case class FailedToRecover(errorMsg: String)
-
-  /**
-   * Starts new Tasks on Executor executorId
-   */
-  case class StartTasksOnExecutor(executorId: Int, tasks: List[TaskId])
-
-  /**
-   * Changes existing tasks on executor executorId
-   */
-  case class ChangeTasksOnExecutor(executorId: Int, tasks: List[TaskId])
-
-  /**
-   * Tracks the registration of all new started executors.
-   */
-  class ExecutorRegistry {
-    private var registeredExecutors = Set.empty[ExecutorId]
-
-    def registerExecutor(executorId: ExecutorId): Unit = {
-      registeredExecutors += executorId
-    }
-
-    def allRegistered(all: List[ExecutorId]): Boolean = {
-      all.forall(executor => registeredExecutors.contains(executor))
-    }
-  }
-
-  /**
-   * Tracks the registration of all changed tasks.
-   */
-  class TaskChangeRegistry(targetTasks: List[TaskId]) {
-    private var registeredTasks = Set.empty[TaskId]
-    def taskChanged(taskId: TaskId): Unit = {
-      registeredTasks += taskId
-    }
-    def allTaskChanged: Boolean = {
-      targetTasks.forall(taskId => registeredTasks.contains(taskId))
-    }
-  }
-
-  object TaskChangeRegistry {
-    def empty: TaskChangeRegistry = new TaskChangeRegistry(List.empty[TaskId])
-  }
-
-  /**
-   * DAGDiff is used to track impacted processors when doing dynamic dag.
-   */
-  case class DAGDiff(
-      addedProcessors: List[ProcessorId],
-      modifiedProcessors: List[ProcessorId],
-      impactedUpstream: List[ProcessorId])
-
-  /**
-   * Migrates from old DAG to new DAG, return DAGDiff
-   */
-  def migrate(leftDAG: DAG, rightDAG: DAG): DAGDiff = {
-    val left = leftDAG.processors.keySet
-    val right = rightDAG.processors.keySet
-
-    val added = right -- left
-    val join = right -- added
-
-    val modified = join.filter { processorId =>
-      leftDAG.processors(processorId) != rightDAG.processors(processorId)
-    }
-
-    val upstream = (list: Set[ProcessorId]) => {
-      list.flatMap { processorId =>
-        rightDAG.graph.incomingEdgesOf(processorId).map(_._1).toSet
-      } -- list
-    }
-
-    val impactedUpstream = upstream(added ++ modified)
-
-    // All upstream tasks are affected, and should be handled properly.
-    DAGDiff(added.toList, modified.toList, impactedUpstream.toList)
-  }
-
-  /**
-   * Each new task will be assigned with a unique session Id.
-   */
-  class SessionIdFactory {
-    private var nextSessionId = 1
-
-    /**
-     * Returns a new session Id for new task
-     */
-    final def newSessionId: Int = {
-      val sessionId = nextSessionId
-      nextSessionId += 1
-      sessionId
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskRegistry.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskRegistry.scala 
b/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskRegistry.scala
deleted file mode 100644
index adfdeba..0000000
--- 
a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskRegistry.scala
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.appmaster
-
-import org.slf4j.Logger
-
-import io.gearpump.cluster.scheduler.Resource
-import 
io.gearpump.streaming.appmaster.ExecutorManager.ExecutorResourceUsageSummary
-import io.gearpump.streaming.appmaster.TaskRegistry._
-import io.gearpump.streaming.task.TaskId
-import io.gearpump.streaming.{ExecutorId, ProcessorId}
-import io.gearpump.transport.HostPort
-import io.gearpump.util.LogUtil
-
-/**
- * Tracks the registration of all tasks, when the application is booting up.
- */
-class TaskRegistry(val expectedTasks: List[TaskId],
-    var registeredTasks: Map[TaskId, TaskLocation] = Map.empty[TaskId, 
TaskLocation],
-    var deadTasks: Set[TaskId] = Set.empty[TaskId]) {
-
-  private val LOG: Logger = LogUtil.getLogger(getClass)
-
-  private val processors = expectedTasks.map(_.processorId).toSet
-
-  /**
-   * When a task is booted, it need to call registerTask to register itself.
-   * If this task is valid, then accept it, otherwise reject it.
-   *
-   * @param taskId Task that register itself to TaskRegistry.
-   * @param location The host and port where this task is running on. NOTE: 
The host and port
-   *                 is NOT the same host and port of Akka remoting. Instead, 
it is host and port
-   *                 of custom netty layer, see 
[[io.gearpump.transport.netty.Context]].
-   */
-  def registerTask(taskId: TaskId, location: TaskLocation): RegisterTaskStatus 
= {
-    val processorId = taskId.processorId
-
-    if (processors.contains(processorId)) {
-      registeredTasks += taskId -> location
-      Accept
-    } else {
-      LOG.error(s" the task is not accepted for registration, taskId: 
${taskId}")
-      Reject
-    }
-  }
-
-  def copy(expectedTasks: List[TaskId] = this.expectedTasks,
-      registeredTasks: Map[TaskId, TaskLocation] = this.registeredTasks,
-      deadTasks: Set[TaskId] = this.deadTasks): TaskRegistry = {
-    new TaskRegistry(expectedTasks, registeredTasks, deadTasks)
-  }
-
-  def getTaskLocations: TaskLocations = {
-    val taskLocations = registeredTasks.toList.groupBy(_._2.host).map { pair =>
-      val (k, v) = pair
-      val taskIds = v.map(_._1)
-      (k, taskIds.toSet)
-    }
-    TaskLocations(taskLocations)
-  }
-
-  def getTaskExecutorMap: Map[TaskId, ExecutorId] = {
-    getTaskLocations.locations.flatMap { pair =>
-      val (hostPort, taskSet) = pair
-      taskSet.map { taskId =>
-        (taskId, getExecutorId(taskId).getOrElse(-1))
-      }
-    }
-  }
-
-  /** Query the executor Id where the task is running on */
-  def getExecutorId(taskId: TaskId): Option[Int] = {
-    registeredTasks.get(taskId).map(_.executorId)
-  }
-
-  /** Gets list of allocated executor Ids */
-  def executors: List[ExecutorId] = {
-    registeredTasks.toList.map(_._2.executorId)
-  }
-
-  def isAllTasksRegistered: Boolean = {
-    val aliveTasks = (expectedTasks.toSet -- deadTasks)
-    aliveTasks.forall(task => registeredTasks.contains(task))
-  }
-
-  def isTaskRegisteredForExecutor(executorId: ExecutorId): Boolean = {
-    registeredTasks.exists(_._2.executorId == executorId)
-  }
-
-  private def filterTasks(processorId: ProcessorId): List[TaskId] = {
-    registeredTasks.keys.toList.filter(_.processorId == processorId)
-  }
-
-  /** List of executors that current processor taks are running on */
-  def processorExecutors(processorId: ProcessorId): Map[ExecutorId, 
List[TaskId]] = {
-    val taskToExecutor = filterTasks(processorId).flatMap { taskId =>
-      getExecutorId(taskId).map { executorId =>
-        (taskId, executorId)
-      }
-    }
-
-    val executorToTasks = taskToExecutor.groupBy(_._2).map { kv =>
-      val (k, v) = kv
-      (k, v.map(_._1))
-    }
-    executorToTasks
-  }
-
-  /** Summary about how many resources are used for all running tasks */
-  def usedResource: ExecutorResourceUsageSummary = {
-    val resourceMap = registeredTasks.foldLeft(Map.empty[ExecutorId, 
Resource]) { (map, task) =>
-      val resource = map.getOrElse(task._2.executorId, Resource(0)) + 
Resource(1)
-      map + (task._2.executorId -> resource)
-    }
-    ExecutorResourceUsageSummary(resourceMap)
-  }
-}
-
-object TaskRegistry {
-  sealed trait RegisterTaskStatus
-  case object Accept extends RegisterTaskStatus
-  case object Reject extends RegisterTaskStatus
-
-  case class TaskLocation(executorId: Int, host: HostPort)
-
-  case class TaskLocations(locations: Map[HostPort, Set[TaskId]])
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskSchedulerImpl.scala
 
b/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskSchedulerImpl.scala
deleted file mode 100644
index 62dff6c..0000000
--- 
a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskSchedulerImpl.scala
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.appmaster
-
-import com.typesafe.config.Config
-
-import io.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceRequest}
-import io.gearpump.cluster.worker.WorkerId
-import io.gearpump.streaming.DAG
-import io.gearpump.streaming.appmaster.TaskLocator.{Locality, WorkerLocality}
-import io.gearpump.streaming.appmaster.TaskScheduler.{Location, TaskStatus}
-import io.gearpump.streaming.task.TaskId
-import io.gearpump.util.Constants
-
-/**
- * Schedules tasks to run for new allocated resources. TaskScheduler only 
schedule tasks that
- * share the same jar. For scheduling for multiple jars, see
- * [[io.gearpump.streaming.appmaster.JarScheduler]].
- */
-trait TaskScheduler {
-
-  /**
-   * This notify the scheduler that the task DAG is created.
-   * @param dag task dag
-   */
-  def setDAG(dag: DAG): Unit
-
-  /**
-   * Get the resource requirements for all unscheduled tasks.
-   */
-  def getResourceRequests(): Array[ResourceRequest]
-
-  /**
-   * This notifies the scheduler that a resource slot on {workerId} and 
{executorId} is allocated
-   * , and expect a task to be scheduled in return.
-   * Task locality should be considered when deciding whether to offer a task 
on target {worker}
-   * and {executor}
-   *
-   * @param workerId which worker this resource is located.
-   * @param executorId which executorId this resource belongs to.
-   * @return a list of tasks
-   */
-  def schedule(workerId: WorkerId, executorId: Int, resource: Resource): 
List[TaskId]
-
-  /**
-   * This notifies the scheduler that {executorId} is failed, and expect a set 
of
-   * ResourceRequest for all failed tasks on that executor.
-   *
-   * @param executorId executor that failed
-   * @return resource requests of the failed executor
-   */
-  def executorFailed(executorId: Int): Array[ResourceRequest]
-
-  /**
-   * Queries the task list that already scheduled on the executor
-   *
-   * @param executorId executor to query
-   * @return a list of tasks
-   */
-  def scheduledTasks(executorId: Int): List[TaskId]
-}
-
-object TaskScheduler {
-  case class Location(workerId: WorkerId, executorId: Int)
-
-  class TaskStatus(val taskId: TaskId, val preferLocality: Locality, var 
allocation: Location)
-}
-
-class TaskSchedulerImpl(appId: Int, appName: String, config: Config) extends 
TaskScheduler {
-  private val executorNum = 
config.getInt(Constants.APPLICATION_EXECUTOR_NUMBER)
-
-  private var tasks = List.empty[TaskStatus]
-
-  // Finds the locality of the tasks
-  private val taskLocator = new TaskLocator(appName, config)
-
-  override def setDAG(dag: DAG): Unit = {
-    val taskMap = tasks.map(_.taskId).zip(tasks).toMap
-
-    tasks = dag.tasks.sortBy(_.index).map { taskId =>
-      val locality = taskLocator.locateTask(taskId)
-      taskMap.getOrElse(taskId, new TaskStatus(taskId, locality, allocation = 
null))
-    }
-  }
-
-  def getResourceRequests(): Array[ResourceRequest] = {
-    fetchResourceRequests(fromOneWorker = false)
-  }
-
-  import io.gearpump.cluster.scheduler.Relaxation._
-  private def fetchResourceRequests(fromOneWorker: Boolean = false): 
Array[ResourceRequest] = {
-    var workersResourceRequest = Map.empty[WorkerId, Resource]
-
-    tasks.filter(_.allocation == null).foreach { task =>
-      task.preferLocality match {
-        case WorkerLocality(workerId) =>
-          val current = workersResourceRequest.getOrElse(workerId, 
Resource.empty)
-          workersResourceRequest += workerId -> (current + Resource(1))
-        case _ =>
-          val workerId = WorkerId.unspecified
-          val current = workersResourceRequest.getOrElse(workerId, 
Resource.empty)
-          workersResourceRequest += workerId -> (current + Resource(1))
-      }
-    }
-
-    workersResourceRequest.map { workerIdAndResource =>
-      val (workerId, resource) = workerIdAndResource
-      if (workerId == WorkerId.unspecified) {
-        ResourceRequest(resource, workerId = WorkerId.unspecified, executorNum 
= executorNum)
-      } else {
-        ResourceRequest(resource, workerId, relaxation = SPECIFICWORKER)
-      }
-    }.toArray
-  }
-
-  override def schedule(workerId: WorkerId, executorId: Int, resource: 
Resource): List[TaskId] = {
-    var scheduledTasks = List.empty[TaskId]
-    val location = Location(workerId, executorId)
-    // Schedules tasks for specific worker
-    scheduledTasks ++= scheduleTasksForLocality(resource, location,
-      (locality) => locality == WorkerLocality(workerId))
-
-    // Schedules tasks without specific location preference
-    scheduledTasks ++= scheduleTasksForLocality(resource - 
Resource(scheduledTasks.length),
-      location, (locality) => true)
-    scheduledTasks
-  }
-
-  private def scheduleTasksForLocality(
-      resource: Resource, resourceLocation: Location, matcher: (Locality) => 
Boolean)
-    : List[TaskId] = {
-    var scheduledTasks = List.empty[TaskId]
-    var index = 0
-    var remain = resource.slots
-    while (index < tasks.length && remain > 0) {
-      val taskStatus = tasks(index)
-      if (taskStatus.allocation == null && matcher(taskStatus.preferLocality)) 
{
-        taskStatus.allocation = resourceLocation
-        scheduledTasks +:= taskStatus.taskId
-        remain -= 1
-      }
-      index += 1
-    }
-    scheduledTasks
-  }
-
-  override def executorFailed(executorId: Int): Array[ResourceRequest] = {
-    val failedTasks = tasks.filter { status =>
-      status.allocation != null && status.allocation.executorId == executorId
-    }
-    // Cleans the location of failed tasks
-    failedTasks.foreach(_.allocation = null)
-
-    Array(ResourceRequest(Resource(failedTasks.length),
-      workerId = WorkerId.unspecified, relaxation = ONEWORKER))
-  }
-
-  override def scheduledTasks(executorId: Int): List[TaskId] = {
-    tasks.filter { status =>
-      status.allocation != null && status.allocation.executorId == executorId
-    }.map(_.taskId)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/dsl/Stream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/dsl/Stream.scala 
b/streaming/src/main/scala/io/gearpump/streaming/dsl/Stream.scala
deleted file mode 100644
index ddd2037..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/dsl/Stream.scala
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.dsl
-
-import scala.language.implicitConversions
-
-import org.slf4j.{Logger, LoggerFactory}
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.dsl.op._
-import io.gearpump.streaming.sink.DataSink
-import io.gearpump.streaming.task.{Task, TaskContext}
-import io.gearpump.util.Graph
-
-class Stream[T](
-    private val graph: Graph[Op, OpEdge], private val thisNode: Op,
-    private val edge: Option[OpEdge] = None) {
-
-  /**
-   * converts a value[T] to a list of value[R]
-   *
-   * @param fun FlatMap function
-   * @param description The description message for this operation
-   * @return A new stream with type [R]
-   */
-  def flatMap[R](fun: T => TraversableOnce[R], description: String = null): 
Stream[R] = {
-    val flatMapOp = FlatMapOp(fun, Option(description).getOrElse("flatmap"))
-    graph.addVertex(flatMapOp)
-    graph.addEdge(thisNode, edge.getOrElse(Direct), flatMapOp)
-    new Stream[R](graph, flatMapOp)
-  }
-
-  /**
-   * Maps message of type T message of type R
-   *
-   * @param fun Function
-   * @return A new stream with type [R]
-   */
-  def map[R](fun: T => R, description: String = null): Stream[R] = {
-    this.flatMap({ data =>
-      Option(fun(data))
-    }, Option(description).getOrElse("map"))
-  }
-
-  /**
-   * Keeps records when fun(T) == true
-   *
-   * @param fun  the filter
-   * @return  a new stream after filter
-   */
-  def filter(fun: T => Boolean, description: String = null): Stream[T] = {
-    this.flatMap({ data =>
-      if (fun(data)) Option(data) else None
-    }, Option(description).getOrElse("filter"))
-  }
-
-  /**
-   * Reduces operations.
-   *
-   * @param fun  reduction function
-   * @param description description message for this operator
-   * @return a new stream after reduction
-   */
-  def reduce(fun: (T, T) => T, description: String = null): Stream[T] = {
-    val reduceOp = ReduceOp(fun, Option(description).getOrElse("reduce"))
-    graph.addVertex(reduceOp)
-    graph.addEdge(thisNode, edge.getOrElse(Direct), reduceOp)
-    new Stream(graph, reduceOp)
-  }
-
-  /**
-   * Log to task log file
-   */
-  def log(): Unit = {
-    this.map(msg => LoggerFactory.getLogger("dsl").info(msg.toString), "log")
-  }
-
-  /**
-   * Merges data from two stream into one
-   *
-   * @param other the other stream
-   * @return  the merged stream
-   */
-  def merge(other: Stream[T], description: String = null): Stream[T] = {
-    val mergeOp = MergeOp(Option(description).getOrElse("merge"))
-    graph.addVertex(mergeOp)
-    graph.addEdge(thisNode, edge.getOrElse(Direct), mergeOp)
-    graph.addEdge(other.thisNode, other.edge.getOrElse(Shuffle), mergeOp)
-    new Stream[T](graph, mergeOp)
-  }
-
-  /**
-   * Group by function (T => Group)
-   *
-   * For example, we have T type, People(name: String, gender: String, age: 
Int)
-   * groupBy[People](_.gender) will group the people by gender.
-   *
-   * You can append other combinators after groupBy
-   *
-   * For example,
-   * {{{
-   * Stream[People].groupBy(_.gender).flatmap(..).filter.(..).reduce(..)
-   * }}}
-   *
-   * @param fun  Group by function
-   * @param parallelism  Parallelism level
-   * @param description  The description
-   * @return  the grouped stream
-   */
-  def groupBy[Group](fun: T => Group, parallelism: Int = 1, description: 
String = null)
-    : Stream[T] = {
-    val groupOp = GroupByOp(fun, parallelism, 
Option(description).getOrElse("groupBy"))
-    graph.addVertex(groupOp)
-    graph.addEdge(thisNode, edge.getOrElse(Shuffle), groupOp)
-    new Stream[T](graph, groupOp)
-  }
-
-  /**
-   * Connects with a low level Processor(TaskDescription)
-   *
-   * @param processor  a user defined processor
-   * @param parallelism  parallelism level
-   * @return  new stream after processing with type [R]
-   */
-  def process[R](
-      processor: Class[_ <: Task], parallelism: Int, conf: UserConfig = 
UserConfig.empty,
-      description: String = null): Stream[R] = {
-    val processorOp = ProcessorOp(processor, parallelism, conf,
-      Option(description).getOrElse("process"))
-    graph.addVertex(processorOp)
-    graph.addEdge(thisNode, edge.getOrElse(Shuffle), processorOp)
-    new Stream[R](graph, processorOp, Some(Shuffle))
-  }
-}
-
-class KVStream[K, V](stream: Stream[Tuple2[K, V]]) {
-  /**
-   * GroupBy key
-   *
-   * Applies to Stream[Tuple2[K,V]]
-   *
-   * @param parallelism  the parallelism for this operation
-   * @return  the new KV stream
-   */
-  def groupByKey(parallelism: Int = 1): Stream[Tuple2[K, V]] = {
-    stream.groupBy(Stream.getTupleKey[K, V], parallelism, "groupByKey")
-  }
-
-  /**
-   * Sum the value of the tuples
-   *
-   * Apply to Stream[Tuple2[K,V]], V must be of type Number
-   *
-   * For input (key, value1), (key, value2), will generate (key, value1 + 
value2)
-   * @param numeric  the numeric operations
-   * @return  the sum stream
-   */
-  def sum(implicit numeric: Numeric[V]): Stream[(K, V)] = {
-    stream.reduce(Stream.sumByValue[K, V](numeric), "sum")
-  }
-}
-
-object Stream {
-
-  def apply[T](graph: Graph[Op, OpEdge], node: Op, edge: Option[OpEdge]): 
Stream[T] = {
-    new Stream[T](graph, node, edge)
-  }
-
-  def getTupleKey[K, V](tuple: Tuple2[K, V]): K = tuple._1
-
-  def sumByValue[K, V](numeric: Numeric[V]): (Tuple2[K, V], Tuple2[K, V]) => 
Tuple2[K, V]
-  = (tuple1, tuple2) => Tuple2(tuple1._1, numeric.plus(tuple1._2, tuple2._2))
-
-  implicit def streamToKVStream[K, V](stream: Stream[Tuple2[K, V]]): 
KVStream[K, V] = {
-    new KVStream(stream)
-  }
-
-  implicit class Sink[T](stream: Stream[T]) extends java.io.Serializable {
-    def sink[T](dataSink: DataSink, parallism: Int, conf: UserConfig, 
description: String)
-      : Stream[T] = {
-      implicit val sink = DataSinkOp[T](dataSink, parallism, conf,
-        Some(description).getOrElse("traversable"))
-      stream.graph.addVertex(sink)
-      stream.graph.addEdge(stream.thisNode, Shuffle, sink)
-      new Stream[T](stream.graph, sink)
-    }
-
-    def sink[T](
-        sink: Class[_ <: Task], parallism: Int, conf: UserConfig = 
UserConfig.empty,
-        description: String = null): Stream[T] = {
-      val sinkOp = ProcessorOp(sink, parallism, conf, 
Option(description).getOrElse("source"))
-      stream.graph.addVertex(sinkOp)
-      stream.graph.addEdge(stream.thisNode, Shuffle, sinkOp)
-      new Stream[T](stream.graph, sinkOp)
-    }
-  }
-}
-
-class LoggerSink[T] extends DataSink {
-  var logger: Logger = null
-
-  private var context: TaskContext = null
-
-  override def open(context: TaskContext): Unit = {
-    this.logger = context.logger
-  }
-
-  override def write(message: Message): Unit = {
-    logger.info("logging message " + message.msg)
-  }
-
-  override def close(): Unit = Unit
-}
\ No newline at end of file

Reply via email to