http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala
new file mode 100644
index 0000000..56d31db
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.dsl.plan
+
+import scala.collection.TraversableOnce
+
+import akka.actor.ActorSystem
+import org.slf4j.Logger
+
+import org.apache.gearpump._
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.Constants._
+import org.apache.gearpump.streaming.Processor
+import org.apache.gearpump.streaming.Processor.DefaultProcessor
+import org.apache.gearpump.streaming.dsl.op._
+import org.apache.gearpump.streaming.dsl.plan.OpTranslator._
+import org.apache.gearpump.streaming.sink.DataSink
+import org.apache.gearpump.streaming.source.DataSource
+import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.util.LogUtil
+
+/**
+ * Translates a OP to a TaskDescription
+ */
+class OpTranslator extends java.io.Serializable {
+  val LOG: Logger = LogUtil.getLogger(getClass)
+
+  def translate(ops: OpChain)(implicit system: ActorSystem): Processor[_ <: 
Task] = {
+
+    val baseConfig = ops.conf
+
+    ops.ops.head match {
+      case op: MasterOp =>
+        val tail = ops.ops.tail
+        val func = toFunction(tail)
+        val userConfig = baseConfig.withValue(GEARPUMP_STREAMING_OPERATOR, 
func)
+
+        op match {
+          case DataSourceOp(dataSource, parallism, conf, description) =>
+            Processor[SourceTask[Object, Object]](parallism,
+              description = description + "." + func.description,
+              userConfig.withValue(GEARPUMP_STREAMING_SOURCE, dataSource))
+          case groupby@GroupByOp(_, parallism, description, _) =>
+            Processor[GroupByTask[Object, Object, Object]](parallism,
+              description = description + "." + func.description,
+              userConfig.withValue(GEARPUMP_STREAMING_GROUPBY_FUNCTION, 
groupby))
+          case merge: MergeOp =>
+            Processor[TransformTask[Object, Object]](1,
+              description = op.description + "." + func.description,
+              userConfig)
+          case ProcessorOp(processor, parallism, conf, description) =>
+            DefaultProcessor(parallism,
+              description = description + "." + func.description,
+              userConfig, processor)
+          case DataSinkOp(dataSink, parallelism, conf, description) =>
+            Processor[SinkTask[Object]](parallelism,
+              description = description + func.description,
+              userConfig.withValue(GEARPUMP_STREAMING_SINK, dataSink))
+        }
+      case op: SlaveOp[_] =>
+        val func = toFunction(ops.ops)
+        val userConfig = baseConfig.withValue(GEARPUMP_STREAMING_OPERATOR, 
func)
+
+        Processor[TransformTask[Object, Object]](1,
+          description = func.description,
+          taskConf = userConfig)
+      case chain: OpChain =>
+        throw new RuntimeException("Not supposed to be called!")
+    }
+  }
+
+  private def toFunction(ops: List[Op]): SingleInputFunction[Object, Object] = 
{
+    val func: SingleInputFunction[Object, Object] = new 
DummyInputFunction[Object]()
+    val totalFunction = ops.foldLeft(func) { (fun, op) =>
+
+      val opFunction = op match {
+        case flatmap: FlatMapOp[Object @unchecked, Object @unchecked] =>
+          new FlatMapFunction(flatmap.fun, flatmap.description)
+        case reduce: ReduceOp[Object @unchecked] =>
+          new ReduceFunction(reduce.fun, reduce.description)
+        case _ =>
+          throw new RuntimeException("Not supposed to be called!")
+      }
+      fun.andThen(opFunction.asInstanceOf[SingleInputFunction[Object, Object]])
+    }
+    totalFunction.asInstanceOf[SingleInputFunction[Object, Object]]
+  }
+}
+
+object OpTranslator {
+
+  trait SingleInputFunction[IN, OUT] extends Serializable {
+    def process(value: IN): TraversableOnce[OUT]
+    def andThen[OUTER](other: SingleInputFunction[OUT, OUTER]): 
SingleInputFunction[IN, OUTER] = {
+      new AndThen(this, other)
+    }
+
+    def description: String
+  }
+
+  class DummyInputFunction[T] extends SingleInputFunction[T, T] {
+    override def andThen[OUTER](other: SingleInputFunction[T, OUTER])
+      : SingleInputFunction[T, OUTER] = {
+      other
+    }
+
+    // Should never be called
+    override def process(value: T): TraversableOnce[T] = None
+
+    override def description: String = ""
+  }
+
+  class AndThen[IN, MIDDLE, OUT](
+      first: SingleInputFunction[IN, MIDDLE], second: 
SingleInputFunction[MIDDLE, OUT])
+    extends SingleInputFunction[IN, OUT] {
+
+    override def process(value: IN): TraversableOnce[OUT] = {
+      first.process(value).flatMap(second.process(_))
+    }
+
+    override def description: String = {
+      Option(first.description).flatMap { description =>
+        Option(second.description).map(description + "." + _)
+      }.getOrElse(null)
+    }
+  }
+
+  class FlatMapFunction[IN, OUT](fun: IN => TraversableOnce[OUT], 
descriptionMessage: String)
+    extends SingleInputFunction[IN, OUT] {
+
+    override def process(value: IN): TraversableOnce[OUT] = {
+      fun(value)
+    }
+
+    override def description: String = {
+      this.descriptionMessage
+    }
+  }
+
+  class ReduceFunction[T](fun: (T, T) => T, descriptionMessage: String)
+    extends SingleInputFunction[T, T] {
+
+    private var state: Any = null
+
+    override def process(value: T): TraversableOnce[T] = {
+      if (state == null) {
+        state = value
+      } else {
+        state = fun(state.asInstanceOf[T], value)
+      }
+      Some(state.asInstanceOf[T])
+    }
+
+    override def description: String = descriptionMessage
+  }
+
+  class GroupByTask[IN, GROUP, OUT](
+      groupBy: IN => GROUP, taskContext: TaskContext, userConf: UserConfig)
+    extends Task(taskContext, userConf) {
+
+    def this(taskContext: TaskContext, userConf: UserConfig) = {
+      this(userConf.getValue[GroupByOp[IN, GROUP]](
+        GEARPUMP_STREAMING_GROUPBY_FUNCTION )(taskContext.system).get.fun,
+        taskContext, userConf)
+    }
+
+    private var groups = Map.empty[GROUP, SingleInputFunction[IN, OUT]]
+
+    override def onStart(startTime: StartTime): Unit = {
+    }
+
+    override def onNext(msg: Message): Unit = {
+      val time = msg.timestamp
+
+      val group = groupBy(msg.msg.asInstanceOf[IN])
+      if (!groups.contains(group)) {
+        val operator =
+          userConf.getValue[SingleInputFunction[IN, 
OUT]](GEARPUMP_STREAMING_OPERATOR).get
+        groups += group -> operator
+      }
+
+      val operator = groups(group)
+
+      operator.process(msg.msg.asInstanceOf[IN]).foreach { msg =>
+        taskContext.output(new Message(msg.asInstanceOf[AnyRef], time))
+      }
+    }
+  }
+
+  class SourceTask[T, OUT](
+      source: DataSource, operator: Option[SingleInputFunction[T, OUT]], 
taskContext: TaskContext,
+      userConf: UserConfig)
+    extends Task(taskContext, userConf) {
+
+    def this(taskContext: TaskContext, userConf: UserConfig) = {
+      this(
+        
userConf.getValue[DataSource](GEARPUMP_STREAMING_SOURCE)(taskContext.system).get,
+        userConf.getValue[SingleInputFunction[T, 
OUT]](GEARPUMP_STREAMING_OPERATOR)(
+          taskContext.system),
+        taskContext, userConf)
+    }
+
+    override def onStart(startTime: StartTime): Unit = {
+      source.open(taskContext, startTime.startTime)
+      self ! Message("start", System.currentTimeMillis())
+    }
+
+    override def onNext(msg: Message): Unit = {
+      val time = System.currentTimeMillis()
+      Option(source.read()).foreach { msg =>
+        operator match {
+          case Some(operator) =>
+            operator match {
+              case bad: DummyInputFunction[T] =>
+                taskContext.output(msg)
+              case _ =>
+                operator.process(msg.msg.asInstanceOf[T]).foreach(msg => {
+                  taskContext.output(new Message(msg.asInstanceOf[AnyRef], 
time))
+                })
+            }
+          case None =>
+            taskContext.output(msg)
+        }
+      }
+
+      self ! Message("next", System.currentTimeMillis())
+    }
+
+    override def onStop(): Unit = {
+      source.close()
+    }
+  }
+
+  class TransformTask[IN, OUT](
+      operator: Option[SingleInputFunction[IN, OUT]], taskContext: TaskContext,
+      userConf: UserConfig) extends Task(taskContext, userConf) {
+
+    def this(taskContext: TaskContext, userConf: UserConfig) = {
+      this(userConf.getValue[SingleInputFunction[IN, OUT]](
+        GEARPUMP_STREAMING_OPERATOR)(taskContext.system), taskContext, 
userConf)
+    }
+
+    override def onStart(startTime: StartTime): Unit = {
+    }
+
+    override def onNext(msg: Message): Unit = {
+      val time = msg.timestamp
+
+      operator match {
+        case Some(operator) =>
+          operator.process(msg.msg.asInstanceOf[IN]).foreach { msg =>
+            taskContext.output(new Message(msg.asInstanceOf[AnyRef], time))
+          }
+        case None =>
+          taskContext.output(new Message(msg.msg, time))
+      }
+    }
+  }
+
+  class SinkTask[T](dataSink: DataSink, taskContext: TaskContext, userConf: 
UserConfig)
+    extends Task(taskContext, userConf) {
+
+    def this(taskContext: TaskContext, userConf: UserConfig) = {
+      
this(userConf.getValue[DataSink](GEARPUMP_STREAMING_SINK)(taskContext.system).get,
+        taskContext, userConf)
+    }
+
+    override def onStart(startTime: StartTime): Unit = {
+      dataSink.open(taskContext)
+    }
+
+    override def onNext(msg: Message): Unit = {
+      dataSink.write(msg)
+    }
+
+    override def onStop(): Unit = {
+      dataSink.close()
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
new file mode 100644
index 0000000..3af5e97
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.dsl.plan
+
+import akka.actor.ActorSystem
+
+import org.apache.gearpump.partitioner.{CoLocationPartitioner, 
HashPartitioner, Partitioner}
+import org.apache.gearpump.streaming.Processor
+import org.apache.gearpump.streaming.dsl.op._
+import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner
+import org.apache.gearpump.streaming.task.Task
+import org.apache.gearpump.util.Graph
+
+class Planner {
+
+  /*
+   * Converts Dag of Op to Dag of TaskDescription. TaskDescription is part of 
the low
+   * level Graph API.
+   */
+  def plan(dag: Graph[Op, OpEdge])(implicit system: ActorSystem)
+    : Graph[Processor[_ <: Task], _ <: Partitioner] = {
+
+    val opTranslator = new OpTranslator()
+
+    val newDag = optimize(dag)
+    newDag.mapEdge { (node1, edge, node2) =>
+      edge match {
+        case Shuffle =>
+          node2.head match {
+            case groupBy: GroupByOp[Any @unchecked, Any @unchecked] =>
+              new GroupByPartitioner(groupBy.fun)
+            case _ => new HashPartitioner
+          }
+        case Direct =>
+          new CoLocationPartitioner
+      }
+    }.mapVertex { opChain =>
+      opTranslator.translate(opChain)
+    }
+  }
+
+  private def optimize(dag: Graph[Op, OpEdge]): Graph[OpChain, OpEdge] = {
+    val newGraph = dag.mapVertex(op => OpChain(List(op)))
+
+    val nodes = newGraph.topologicalOrderWithCirclesIterator.toList.reverse
+    for (node <- nodes) {
+      val outGoingEdges = newGraph.outgoingEdgesOf(node)
+      for (edge <- outGoingEdges) {
+        merge(newGraph, edge._1, edge._3)
+      }
+    }
+    newGraph
+  }
+
+  private def merge(dag: Graph[OpChain, OpEdge], node1: OpChain, node2: 
OpChain)
+    : Graph[OpChain, OpEdge] = {
+    if (dag.outDegreeOf(node1) == 1 &&
+      dag.inDegreeOf(node2) == 1 &&
+      // For processor node, we don't allow it to merge with downstream 
operators
+      !node1.head.isInstanceOf[ProcessorOp[_ <: Task]]) {
+      val (_, edge, _) = dag.outgoingEdgesOf(node1)(0)
+      if (edge == Direct) {
+        val opList = OpChain(node1.ops ++ node2.ops)
+        dag.addVertex(opList)
+        for (incomingEdge <- dag.incomingEdgesOf(node1)) {
+          dag.addEdge(incomingEdge._1, incomingEdge._2, opList)
+        }
+
+        for (outgoingEdge <- dag.outgoingEdgesOf(node2)) {
+          dag.addEdge(opList, outgoingEdge._2, outgoingEdge._3)
+        }
+
+        // Remove the old vertex
+        dag.removeVertex(node1)
+        dag.removeVertex(node2)
+      }
+    }
+    dag
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/executor/Executor.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/executor/Executor.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/executor/Executor.scala
new file mode 100644
index 0000000..0eeb0eb
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/executor/Executor.scala
@@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.executor
+
+import java.lang.management.ManagementFactory
+import scala.concurrent.duration._
+
+import akka.actor.SupervisorStrategy.Resume
+import akka.actor._
+import com.typesafe.config.Config
+import org.apache.commons.lang.exception.ExceptionUtils
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.worker.WorkerId
+import org.apache.gearpump.cluster.{ClusterConfig, ExecutorContext, UserConfig}
+import org.apache.gearpump.metrics.Metrics.ReportMetrics
+import org.apache.gearpump.metrics.{JvmMetricsSet, Metrics, 
MetricsReporterService}
+import org.apache.gearpump.serializer.SerializationFramework
+import org.apache.gearpump.streaming.AppMasterToExecutor.{MsgLostException, 
TasksChanged, TasksLaunched, _}
+import org.apache.gearpump.streaming.ExecutorToAppMaster.{MessageLoss, 
RegisterExecutor, RegisterTask, UnRegisterTask}
+import org.apache.gearpump.streaming.ProcessorId
+import org.apache.gearpump.streaming.executor.Executor._
+import org.apache.gearpump.streaming.executor.TaskLauncher.TaskArgument
+import org.apache.gearpump.streaming.task.{Subscriber, TaskId}
+import org.apache.gearpump.transport.{Express, HostPort}
+import org.apache.gearpump.util.Constants._
+import org.apache.gearpump.util.{ActorUtil, Constants, LogUtil, 
TimeOutScheduler}
+
+/**
+ * Executor is child of AppMaster.
+ * It usually represents a JVM process. It is a container for all tasks.
+ */
+
+// TODO: What if Executor stuck in state DynamicDag and cannot get out???
+// For example, due to some message loss when there is network glitch.
+// Executor will hang there for ever???
+//
+class Executor(executorContext: ExecutorContext, userConf : UserConfig, 
launcher: ITaskLauncher)
+  extends Actor with TimeOutScheduler{
+
+  def this(executorContext: ExecutorContext, userConf: UserConfig) = {
+    this(executorContext, userConf, TaskLauncher(executorContext, userConf))
+  }
+
+  import context.dispatcher
+  import executorContext.{appId, appMaster, executorId, resource, worker}
+
+  private val LOG: Logger = LogUtil.getLogger(getClass, executor = executorId, 
app = appId)
+
+  private implicit val timeOut = FUTURE_TIMEOUT
+  private val address = ActorUtil.getFullPath(context.system, self.path)
+  private val systemConfig = context.system.settings.config
+  private val serializerPool = getSerializerPool()
+  private val taskDispatcher = 
systemConfig.getString(Constants.GEARPUMP_TASK_DISPATCHER)
+
+  private var state = State.ACTIVE
+  private var transitionStart = 0L
+  // States transition start, in unix time
+  private var transitionEnd = 0L
+  // States transition end, in unix time
+  private val transitWarningThreshold = 5000 // ms,
+
+  // Starts health check Ticks
+  self ! HealthCheck
+
+  LOG.info(s"Executor $executorId has been started, start to register 
itself...")
+  LOG.info(s"Executor actor path: ${ActorUtil.getFullPath(context.system, 
self.path)}")
+
+  appMaster ! RegisterExecutor(self, executorId, resource, worker)
+  context.watch(appMaster)
+
+  private var tasks = Map.empty[TaskId, ActorRef]
+  private val taskArgumentStore = new TaskArgumentStore()
+
+  val express = Express(context.system)
+
+  val metricsEnabled = systemConfig.getBoolean(GEARPUMP_METRIC_ENABLED)
+
+  if (metricsEnabled) {
+    // Registers jvm metrics
+    Metrics(context.system).register(new 
JvmMetricsSet(s"app$appId.executor$executorId"))
+
+    val metricsReportService = context.actorOf(Props(new 
MetricsReporterService(
+      Metrics(context.system))))
+    appMaster.tell(ReportMetrics, metricsReportService)
+  }
+
+  private val NOT_INITIALIZED = -1
+  def receive: Receive = applicationReady(dagVersion = NOT_INITIALIZED)
+
+  private def getTaskId(actorRef: ActorRef): Option[TaskId] = {
+    tasks.find(_._2 == actorRef).map(_._1)
+  }
+
+  override val supervisorStrategy =
+    OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1.minute) {
+      case _: MsgLostException =>
+        val taskId = getTaskId(sender)
+        val cause = s"We got MessageLossException from task 
${getTaskId(sender)}, " +
+          s"replaying application..."
+        LOG.error(cause)
+        taskId.foreach(appMaster ! MessageLoss(executorId, _, cause))
+        Resume
+      case ex: Throwable =>
+        val taskId = getTaskId(sender)
+        val errorMsg = s"We got ${ex.getClass.getName} from $taskId, we will 
treat it as" +
+          s" MessageLoss, so that the system will replay all lost message"
+        LOG.error(errorMsg, ex)
+        val detailErrorMsg = errorMsg + "\n" + ExceptionUtils.getStackTrace(ex)
+        taskId.foreach(appMaster ! MessageLoss(executorId, _, detailErrorMsg))
+        Resume
+    }
+
+  private def launchTask(taskId: TaskId, argument: TaskArgument): ActorRef = {
+    launcher.launch(List(taskId), argument, context, serializerPool, 
taskDispatcher).values.head
+  }
+
+  private def assertVersion(expectVersion: Int, version: Int, clue: Any): Unit 
= {
+    if (expectVersion != version) {
+      val errorMessage = s"Version mismatch: we expect dag version 
$expectVersion, " +
+        s"but get $version; clue: $clue"
+      LOG.error(errorMessage)
+      throw new DagVersionMismatchException(errorMessage)
+    }
+  }
+
+  def dynamicDagPhase1(
+      dagVersion: Int, launched: List[TaskId], changed: List[ChangeTask], 
registered: List[TaskId])
+    : Receive = {
+    state = State.DYNAMIC_DAG_PHASE1
+    box({
+      case launch@LaunchTasks(taskIds, version, processorDescription,
+      subscribers: List[Subscriber]) => {
+        assertVersion(dagVersion, version, clue = launch)
+
+        LOG.info(s"Launching Task $taskIds for app: $appId")
+        val taskArgument = TaskArgument(version, processorDescription, 
subscribers)
+        taskIds.foreach(taskArgumentStore.add(_, taskArgument))
+        val newAdded = launcher.launch(taskIds, taskArgument, context, 
serializerPool,
+          taskDispatcher)
+        newAdded.foreach { newAddedTask =>
+          context.watch(newAddedTask._2)
+        }
+        tasks ++= newAdded
+        sender ! TasksLaunched
+        context.become(dynamicDagPhase1(version, launched ++ taskIds, changed, 
registered))
+      }
+      case change@ChangeTasks(taskIds, version, life, subscribers) =>
+        assertVersion(dagVersion, version, clue = change)
+
+        LOG.info(s"Change Tasks $taskIds for app: $appId, verion: $life, 
$dagVersion, $subscribers")
+
+        val newChangedTasks = taskIds.map { taskId =>
+          for (taskArgument <- taskArgumentStore.get(dagVersion, taskId)) {
+            val processorDescription = 
taskArgument.processorDescription.copy(life = life)
+            taskArgumentStore.add(taskId, TaskArgument(dagVersion, 
processorDescription,
+              subscribers))
+          }
+          ChangeTask(taskId, dagVersion, life, subscribers)
+        }
+        sender ! TasksChanged(taskIds)
+        context.become(dynamicDagPhase1(dagVersion, launched, changed ++ 
newChangedTasks,
+          registered))
+
+      case locations@TaskLocationsReady(taskLocations, version) =>
+        LOG.info(s"TaskLocations Ready...")
+        assertVersion(dagVersion, version, clue = locations)
+
+        // Check whether all tasks has been registered.
+        if ((launched.toSet -- registered.toSet).isEmpty) {
+          // Confirm all tasks has been registered.
+          val result = taskLocations.locations.filter {
+            location => !location._1.equals(express.localHost)
+          }.flatMap { kv =>
+            val (host, taskIdList) = kv
+            taskIdList.map(taskId => (TaskId.toLong(taskId), host))
+          }
+
+          val replyTo = sender
+          express.startClients(taskLocations.locations.keySet).foreach { _ =>
+            express.remoteAddressMap.send(result)
+            express.remoteAddressMap.future().foreach { _ =>
+              LOG.info(s"sending TaskLocationsReceived back to appmaster")
+              replyTo ! TaskLocationsReceived(version, executorId)
+            }
+          }
+          context.become(dynamicDagPhase2(dagVersion, launched, changed))
+        } else {
+          LOG.error("Inconsistency between AppMaser and Executor! AppMaster 
thinks DynamicDag " +
+            "transition is ready, while Executor have not get all tasks 
registered, " +
+            "that task will not be functional...")
+
+          // Reject TaskLocations...
+          val missedTasks = (launched.toSet -- registered.toSet).toList
+          val errorMsg = "We have not received TaskRegistered for following 
tasks: " +
+            missedTasks.mkString(", ")
+          LOG.error(errorMsg)
+          sender ! TaskLocationsRejected(dagVersion, executorId, errorMsg, 
null)
+          // Stays with current status...
+        }
+
+      case confirm: TaskRegistered =>
+        tasks.get(confirm.taskId).foreach {
+          case actorRef: ActorRef =>
+            tasks += confirm.taskId -> actorRef
+            actorRef forward confirm
+        }
+        context.become(dynamicDagPhase1(dagVersion, launched, changed,
+          registered :+ confirm.taskId))
+
+      case rejected: TaskRejected =>
+        // Means this task shoud not exists...
+        tasks.get(rejected.taskId).foreach(_ ! PoisonPill)
+        tasks -= rejected.taskId
+        LOG.error(s"Task ${rejected.taskId} is rejected by AppMaster, shutting 
down it...")
+
+      case register: RegisterTask =>
+        appMaster ! register
+    })
+  }
+
+  def dynamicDagPhase2(dagVersion: Int, launched: List[TaskId], changed: 
List[ChangeTask])
+    : Receive = {
+    LOG.info("Transit to dynamic Dag Phase2")
+    state = State.DYNAMIC_DAG_PHASE2
+    box {
+      case startAll@StartAllTasks(version) =>
+        LOG.info(s"Start All Tasks...")
+        assertVersion(dagVersion, version, clue = startAll)
+
+        launched.foreach(taskId => tasks.get(taskId).foreach(_ ! 
StartTask(taskId)))
+        changed.foreach(changeTask => tasks.get(changeTask.taskId).foreach(_ ! 
changeTask))
+
+        taskArgumentStore.removeNewerVersion(dagVersion)
+        taskArgumentStore.removeObsoleteVersion
+        context.become(applicationReady(dagVersion))
+    }
+  }
+
+  def applicationReady(dagVersion: Int): Receive = {
+    state = State.ACTIVE
+    transitionEnd = System.currentTimeMillis()
+
+    if (dagVersion != NOT_INITIALIZED) {
+      LOG.info("Transit to state Application Ready. This transition takes " +
+        (transitionEnd - transitionStart) + " milliseconds")
+    }
+    box {
+      case start: StartDynamicDag =>
+        LOG.info("received StartDynamicDag")
+        if (start.dagVersion > dagVersion) {
+          transitionStart = System.currentTimeMillis()
+          LOG.info(s"received $start, Executor transit to dag version: 
${start.dagVersion} from " +
+            s"current version $dagVersion")
+          context.become(dynamicDagPhase1(start.dagVersion, List.empty[TaskId],
+            List.empty[ChangeTask], List.empty[TaskId]))
+        }
+      case launch: LaunchTasks =>
+        if (launch.dagVersion > dagVersion) {
+          transitionStart = System.currentTimeMillis()
+          LOG.info(s"received $launch, Executor transit to dag " +
+            s"version: ${launch.dagVersion} from current version $dagVersion")
+          context.become(dynamicDagPhase1(launch.dagVersion, 
List.empty[TaskId],
+            List.empty[ChangeTask], List.empty[TaskId]))
+          self forward launch
+        }
+
+      case change: ChangeTasks =>
+        if (change.dagVersion > dagVersion) {
+          transitionStart = System.currentTimeMillis()
+          LOG.info(s"received $change, Executor transit to dag version: 
${change.dagVersion} from" +
+            s" current version $dagVersion")
+          context.become(dynamicDagPhase1(change.dagVersion, 
List.empty[TaskId],
+            List.empty[ChangeTask], List.empty[TaskId]))
+          self forward change
+        }
+
+      case StopTask(taskId) =>
+        // Old soldiers never die, they just fade away ;)
+        val fadeAwayTask = tasks.get(taskId)
+        if (fadeAwayTask.isDefined) {
+          context.stop(fadeAwayTask.get)
+        }
+        tasks -= taskId
+
+      case unRegister@UnRegisterTask(taskId, _) =>
+        // Sends UnRegisterTask to AppMaster
+        appMaster ! unRegister
+    }
+  }
+
+  def restartingTasks(dagVersion: Int, remain: Int, needRestart: 
List[TaskId]): Receive = {
+    state = State.RECOVERY
+    box {
+      case TaskStopped(actor) =>
+        for (taskId <- getTaskId(actor)) {
+          if (taskArgumentStore.get(dagVersion, taskId).nonEmpty) {
+            val newNeedRestart = needRestart :+ taskId
+            val newRemain = remain - 1
+            if (newRemain == 0) {
+              val newRestarted = newNeedRestart.map { taskId_ =>
+                val taskActor = launchTask(taskId_, 
taskArgumentStore.get(dagVersion, taskId_).get)
+                context.watch(taskActor)
+                taskId_ -> taskActor
+              }.toMap
+
+              tasks = newRestarted
+              context.become(dynamicDagPhase1(dagVersion, newNeedRestart, 
List.empty[ChangeTask],
+                List.empty[TaskId]))
+            } else {
+              context.become(restartingTasks(dagVersion, newRemain, 
newNeedRestart))
+            }
+          }
+        }
+    }
+  }
+
+  val terminationWatch: Receive = {
+    case Terminated(actor) =>
+      if (actor.compareTo(appMaster) == 0) {
+        LOG.info(s"AppMaster ${appMaster.path.toString} is terminated, 
shutting down current " +
+          s"executor $appId, $executorId")
+        context.stop(self)
+      } else {
+        self ! TaskStopped(actor)
+      }
+  }
+
+  def onRestartTasks: Receive = {
+    case RestartTasks(dagVersion) =>
+      LOG.info(s"Executor received restart tasks")
+      val tasksToRestart = tasks.keys.count(taskArgumentStore.get(dagVersion, 
_).nonEmpty)
+      express.remoteAddressMap.send(Map.empty[Long, HostPort])
+      context.become(restartingTasks(dagVersion, remain = tasksToRestart,
+        needRestart = List.empty[TaskId]))
+
+      tasks.values.foreach {
+        case task: ActorRef => task ! PoisonPill
+      }
+  }
+
+  def executorService: Receive = terminationWatch orElse onRestartTasks orElse 
{
+    case taskChanged: TaskChanged =>
+    // Skip
+    case get: GetExecutorSummary =>
+      val logFile = LogUtil.applicationLogDir(systemConfig)
+      val processorTasks = 
tasks.keySet.groupBy(_.processorId).mapValues(_.toList).view.force
+      sender ! ExecutorSummary(
+        executorId,
+        worker.workerId,
+        address,
+        logFile.getAbsolutePath,
+        state,
+        tasks.size,
+        processorTasks,
+        jvmName = ManagementFactory.getRuntimeMXBean().getName())
+
+    case query: QueryExecutorConfig =>
+      sender ! 
ExecutorConfig(ClusterConfig.filterOutDefaultConfig(systemConfig))
+    case HealthCheck =>
+      context.system.scheduler.scheduleOnce(3.second)(HealthCheck)
+      if (state != State.ACTIVE && (transitionEnd - transitionStart) > 
transitWarningThreshold) {
+        LOG.error(s"Executor status: " + state +
+          s", it takes too long(${transitionEnd - transitionStart}) to do 
transition")
+      }
+  }
+
+  private def getSerializerPool(): SerializationFramework = {
+    val system = context.system.asInstanceOf[ExtendedActorSystem]
+    val clazz = 
Class.forName(systemConfig.getString(Constants.GEARPUMP_SERIALIZER_POOL))
+    val pool = clazz.newInstance().asInstanceOf[SerializationFramework]
+    pool.init(system, userConf)
+    pool.asInstanceOf[SerializationFramework]
+  }
+
+  private def unHandled(state: String): Receive = {
+    case other =>
+      LOG.info(s"Received unknown message $other in state: $state")
+  }
+
+  private def box(receive: Receive): Receive = {
+    executorService orElse receive orElse unHandled(state)
+  }
+}
+
+object Executor {
+  case class RestartTasks(dagVersion: Int)
+
+  class TaskArgumentStore {
+
+    private var store = Map.empty[TaskId, List[TaskArgument]]
+
+    def add(taskId: TaskId, task: TaskArgument): Unit = {
+      val list = store.getOrElse(taskId, List.empty[TaskArgument])
+      store += taskId -> (task :: list)
+    }
+
+    def get(dagVersion: Int, taskId: TaskId): Option[TaskArgument] = {
+      store.get(taskId).flatMap { list =>
+        list.find { arg =>
+          arg.dagVersion <= dagVersion
+        }
+      }
+    }
+
+    /**
+     * When the new DAG is successfully deployed, then we should remove 
obsolete
+     * TaskArgument of old DAG.
+     */
+    def removeObsoleteVersion(): Unit = {
+      store = store.map { kv =>
+        val (k, list) = kv
+        (k, list.take(1))
+      }
+    }
+
+    def removeNewerVersion(currentVersion: Int): Unit = {
+      store = store.map { kv =>
+        val (k, list) = kv
+        (k, list.filter(_.dagVersion <= currentVersion))
+      }
+    }
+  }
+
+  case class TaskStopped(task: ActorRef)
+
+  case class ExecutorSummary(
+      id: Int,
+      workerId: WorkerId,
+      actorPath: String,
+      logFile: String,
+      status: String,
+      taskCount: Int,
+      tasks: Map[ProcessorId, List[TaskId]],
+      jvmName: String
+  )
+
+  object ExecutorSummary {
+    def empty: ExecutorSummary = {
+      ExecutorSummary(0, WorkerId.unspecified, "", "", "", 1, null, jvmName = 
"")
+    }
+  }
+
+  case class GetExecutorSummary(executorId: Int)
+
+  case class QueryExecutorConfig(executorId: Int)
+
+  case class ExecutorConfig(config: Config)
+
+  class DagVersionMismatchException(msg: String) extends Exception(msg)
+
+  object State {
+    val ACTIVE = "active"
+    val DYNAMIC_DAG_PHASE1 = "dynamic_dag_phase1"
+    val DYNAMIC_DAG_PHASE2 = "dynamic_dag_phase2"
+    val RECOVERY = "dag_recovery"
+  }
+
+  object HealthCheck
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/executor/ExecutorRestartPolicy.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/executor/ExecutorRestartPolicy.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/executor/ExecutorRestartPolicy.scala
new file mode 100644
index 0000000..ef96ab9
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/executor/ExecutorRestartPolicy.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.executor
+
+import scala.collection.immutable
+import scala.concurrent.duration.Duration
+
+import org.apache.gearpump.streaming.task.TaskId
+import org.apache.gearpump.util.RestartPolicy
+
+/**
+ *
+ * Controls how many retries to recover failed executors.
+ *
+ * @param maxNrOfRetries the number of times a executor is allowed to be 
restarted,
+ *                       negative value means no limit, if the limit is 
exceeded the policy
+ *                       will not allow to restart the executor
+ * @param withinTimeRange duration of the time window for maxNrOfRetries, 
Duration.Inf
+ *                        means no window
+ */
+class ExecutorRestartPolicy(maxNrOfRetries: Int, withinTimeRange: Duration) {
+  private var executorToTaskIds = Map.empty[Int, Set[TaskId]]
+  private var taskRestartPolocies = new immutable.HashMap[TaskId, 
RestartPolicy]
+
+  def addTaskToExecutor(executorId: Int, taskId: TaskId): Unit = {
+    var taskSetForExecutorId = executorToTaskIds.getOrElse(executorId, 
Set.empty[TaskId])
+    taskSetForExecutorId += taskId
+    executorToTaskIds += executorId -> taskSetForExecutorId
+    if (!taskRestartPolocies.contains(taskId)) {
+      taskRestartPolocies += taskId -> new RestartPolicy(maxNrOfRetries, 
withinTimeRange)
+    }
+  }
+
+  def allowRestartExecutor(executorId: Int): Boolean = {
+    executorToTaskIds.get(executorId).map { taskIds =>
+      taskIds.foreach { taskId =>
+        taskRestartPolocies.get(taskId).map { policy =>
+          if (!policy.allowRestart) {
+            // scalastyle:off return
+            return false
+            // scalastyle:on return
+          }
+        }
+      }
+    }
+    true
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/executor/TaskLauncher.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/executor/TaskLauncher.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/executor/TaskLauncher.scala
new file mode 100644
index 0000000..18490ee
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/executor/TaskLauncher.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.executor
+
+import akka.actor.{Actor, ActorRef, ActorRefFactory, Props}
+
+import org.apache.gearpump.cluster.{ExecutorContext, UserConfig}
+import org.apache.gearpump.serializer.SerializationFramework
+import org.apache.gearpump.streaming.ProcessorDescription
+import org.apache.gearpump.streaming.executor.TaskLauncher.TaskArgument
+import org.apache.gearpump.streaming.task._
+import org.apache.gearpump.streaming.util.ActorPathUtil
+
+trait ITaskLauncher {
+
+  /** Launch a list of task actors */
+  def launch(taskIds: List[TaskId], argument: TaskArgument,
+      context: ActorRefFactory, serializer: SerializationFramework, 
dispatcher: String)
+    : Map[TaskId, ActorRef]
+}
+
+class TaskLauncher(
+    appId: Int,
+    appName: String,
+    executorId: Int,
+    appMaster: ActorRef,
+    userConf: UserConfig,
+    taskActorClass: Class[_ <: Actor])
+  extends ITaskLauncher{
+
+  override def launch(
+      taskIds: List[TaskId], argument: TaskArgument,
+      context: ActorRefFactory, serializer: SerializationFramework, 
dispatcher: String)
+    : Map[TaskId, ActorRef] = {
+    import argument.{processorDescription, subscribers}
+
+    val taskConf = userConf.withConfig(processorDescription.taskConf)
+
+    val taskContext = TaskContextData(executorId,
+      appId, appName, appMaster,
+      processorDescription.parallelism,
+      processorDescription.life, subscribers)
+
+    val taskClass = TaskUtil.loadClass(processorDescription.taskClass)
+
+    var tasks = Map.empty[TaskId, ActorRef]
+    taskIds.foreach { taskId =>
+      val task = new TaskWrapper(taskId, taskClass, taskContext, taskConf)
+      val taskActor = context.actorOf(Props(taskActorClass, taskId, 
taskContext, userConf, task,
+        serializer).withDispatcher(dispatcher), 
ActorPathUtil.taskActorName(taskId))
+      tasks += taskId -> taskActor
+    }
+    tasks
+  }
+}
+
+object TaskLauncher {
+
+  case class TaskArgument(
+      dagVersion: Int, processorDescription: ProcessorDescription,
+      subscribers: List[Subscriber])
+
+  def apply(executorContext: ExecutorContext, userConf: UserConfig): 
TaskLauncher = {
+    import executorContext.{appId, appMaster, appName, executorId}
+    new TaskLauncher(appId, appName, executorId, appMaster, userConf, 
classOf[TaskActor])
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregator.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregator.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregator.scala
new file mode 100644
index 0000000..6721cfc
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregator.scala
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.metrics
+
+import java.util
+
+import com.typesafe.config.Config
+
+import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.cluster.ClientToMaster.ReadOption
+import org.apache.gearpump.cluster.MasterToClient.HistoryMetricsItem
+import io.gearpump.google.common.collect.Iterators
+import org.apache.gearpump.metrics.Metrics.{Histogram, Meter}
+import org.apache.gearpump.metrics.MetricsAggregator
+import org.apache.gearpump.streaming.metrics.ProcessorAggregator._
+import org.apache.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
+
+/**
+ *
+ * Does aggregation on metrics after grouping by these three attributes:
+ *  1. processorId
+ *  2. time section(represented as a index integer)
+ *  3. metricName(like sendThroughput)
+ *
+ * It assumes that for each 
[[org.apache.gearpump.cluster.MasterToClient.HistoryMetricsItem]], the
+ * name follow the format 
app(appId).processor(processorId).task(taskId).(metricName)
+ *
+ * It parses the name to get processorId and metricName. If the parsing fails, 
then current
+ * [[org.apache.gearpump.cluster.MasterToClient.HistoryMetricsItem]] will be 
skipped.
+ *
+ * NOTE: this class is optimized for performance.
+ */
+class ProcessorAggregator(historyMetricConfig: HistoryMetricsConfig) extends 
MetricsAggregator {
+
+  def this(config: Config) = {
+    this(HistoryMetricsConfig(config))
+  }
+
+  private val aggregatorFactory: AggregatorFactory = new AggregatorFactory()
+
+  /**
+   * Accepts options:
+   * key: "readOption", value: one of "readLatest", "readRecent", "readHistory"
+   */
+  override def aggregate(options: Map[String, String],
+      inputs: Iterator[HistoryMetricsItem]): List[HistoryMetricsItem] = {
+    val readOption = 
options.get(ReadOption.Key).getOrElse(ReadOption.ReadLatest)
+    aggregate(readOption, inputs, System.currentTimeMillis())
+  }
+
+  def aggregate(
+      readOption: ReadOption.ReadOption, inputs: Iterator[HistoryMetricsItem], 
now: TimeStamp)
+    : List[HistoryMetricsItem] = {
+    val (start, end, interval) = getTimeRange(readOption, now)
+    val timeSlotsCount = ((end - start - 1) / interval + 1).toInt
+    val map = new MultiLayerMap[Aggregator](timeSlotsCount)
+
+    val taskIdentity = new TaskIdentity(0, null)
+    while (inputs.hasNext) {
+      val item = inputs.next()
+
+      if (item.value.isInstanceOf[Meter] || 
item.value.isInstanceOf[Histogram]) {
+        if (item.time >= start && item.time < end) {
+          val timeIndex = ((item.time - start) / interval).toInt
+
+          if (parseName(item.value.name, taskIdentity)) {
+            var op = map.get(timeIndex, taskIdentity.group)
+            if (op == null) {
+              op = aggregatorFactory.create(item, taskIdentity.group)
+              map.put(timeIndex, taskIdentity.group, op)
+            }
+            op.aggregate(item)
+          }
+        }
+      }
+    }
+
+    val result = new Array[HistoryMetricsItem](map.size)
+    val iterator = map.valueIterator
+    var index = 0
+    while (iterator.hasNext()) {
+      val op = iterator.next()
+      result(index) = op.result
+      index += 1
+    }
+
+    result.toList
+  }
+
+  // Returns (start, end, interval)
+  private def getTimeRange(readOption: ReadOption.ReadOption, now: TimeStamp)
+    : (TimeStamp, TimeStamp, TimeStamp) = {
+    readOption match {
+      case ReadOption.ReadRecent =>
+        val end = now
+        val start = end - (historyMetricConfig.retainRecentDataSeconds) * 1000
+        val interval = historyMetricConfig.retainRecentDataIntervalMs
+        (floor(start, interval), floor(end, interval), interval)
+      case ReadOption.ReadHistory =>
+        val end = now
+        val start = end - (historyMetricConfig.retainHistoryDataHours) * 3600 
* 1000
+        val interval = historyMetricConfig.retainHistoryDataIntervalMs
+        (floor(start, interval), floor(end, interval), interval)
+      case _ =>
+        // All data points are aggregated together.
+        (0L, Long.MaxValue, Long.MaxValue)
+    }
+  }
+
+  // The original metrics data is divided by interval points:
+  // time series (0, interval, 2*interval, 3*interval....)
+  // floor(..) make sure the Aggregator use the same set of interval points.
+  private def floor(value: Long, interval: Long): Long = {
+    (value / interval) * interval
+  }
+
+  // Returns "app0.processor0:sendThroughput" as the group Id.
+  private def parseName(name: String, result: TaskIdentity): Boolean = {
+    val taskIndex = name.indexOf(TASK_TAG)
+    if (taskIndex > 0) {
+      val processor = name.substring(0, taskIndex)
+      val typeIndex = name.indexOf(":", taskIndex + 1)
+      if (typeIndex > 0) {
+        result.task = (name.substring(taskIndex + TASK_TAG.length, 
typeIndex)).toShort
+        val metricName = name.substring(typeIndex)
+        result.group = processor + metricName
+        true
+      } else {
+        false
+      }
+    } else {
+      false
+    }
+  }
+}
+
+object ProcessorAggregator {
+  val readOption = ReadOption.Key
+
+  private val TASK_TAG = ".task"
+
+  private class TaskIdentity(var task: Short, var group: String)
+
+  /**
+   *
+   * MultiLayerMap has multiple layers. For each layer, there
+   * is a hashMap.
+   *
+   * To access a value with get, user need to specify first layer Id, then key.
+   *
+   * This class is optimized for performance.
+   */
+  class MultiLayerMap[Value](layers: Int) {
+
+    private var _size: Int = 0
+    private val map: Array[java.util.HashMap[String, Value]] = 
createMap(layers)
+
+    /**
+     * @param key key in current layer
+     * @return return null if key is not found
+     */
+    def get(layer: Int, key: String): Value = {
+      if (layer < layers) {
+        map(layer).get(key)
+      } else {
+        null.asInstanceOf[Value]
+      }
+    }
+
+    def put(layer: Int, key: String, value: Value): Unit = {
+      if (layer < layers) {
+        map(layer).put(key, value)
+        _size += 1
+      }
+    }
+
+    def size: Int = _size
+
+    def valueIterator: util.Iterator[Value] = {
+      val iterators = new Array[util.Iterator[Value]](layers)
+      var layer = 0
+      while (layer < layers) {
+        iterators(layer) = map(layer).values().iterator()
+        layer += 1
+      }
+
+      Iterators.concat(iterators: _*)
+    }
+
+    private def createMap(layers: Int) = {
+      val map = new Array[java.util.HashMap[String, Value]](layers)
+      var index = 0
+      val length = map.length
+      while (index < length) {
+        map(index) = new java.util.HashMap[String, Value]()
+        index += 1
+      }
+      map
+    }
+  }
+
+  trait Aggregator {
+    def aggregate(item: HistoryMetricsItem): Unit
+    def result: HistoryMetricsItem
+  }
+
+  class HistogramAggregator(name: String) extends Aggregator {
+
+    var count: Long = 0
+    var mean: Double = 0
+    var stddev: Double = 0
+    var median: Double = 0
+    var p95: Double = 0
+    var p99: Double = 0
+    var p999: Double = 0
+
+    var startTime: TimeStamp = Long.MaxValue
+
+    override def aggregate(item: HistoryMetricsItem): Unit = {
+      val input = item.value.asInstanceOf[Histogram]
+      count += 1
+      mean += input.mean
+      stddev += input.stddev
+      median += input.median
+      p95 += input.p95
+      p99 += input.p99
+      p999 += input.p999
+
+      if (item.time < startTime) {
+        startTime = item.time
+      }
+    }
+
+    override def result: HistoryMetricsItem = {
+      if (count > 0) {
+        HistoryMetricsItem(startTime, Histogram(name, mean / count, stddev / 
count,
+          median / count, p95 / count, p99 / count, p999 / count))
+      } else {
+        HistoryMetricsItem(0, Histogram(name, 0, 0, 0, 0, 0, 0))
+      }
+    }
+  }
+
+  class MeterAggregator(name: String) extends Aggregator {
+
+    var count: Long = 0
+    var meanRate: Double = 0
+    var m1: Double = 0
+    var rateUnit: String = null
+
+    var startTime: TimeStamp = Long.MaxValue
+
+    override def aggregate(item: HistoryMetricsItem): Unit = {
+
+      val input = item.value.asInstanceOf[Meter]
+      count += input.count
+
+      meanRate += input.meanRate
+      m1 += input.m1
+
+      if (null == rateUnit) {
+        rateUnit = input.rateUnit
+      }
+
+      if (item.time < startTime) {
+        startTime = item.time
+      }
+    }
+
+    override def result: HistoryMetricsItem = {
+      HistoryMetricsItem(startTime, Meter(name, count, meanRate,
+        m1, rateUnit))
+    }
+  }
+
+  class AggregatorFactory {
+    def create(item: HistoryMetricsItem, name: String): Aggregator = {
+      item.value match {
+        case meter: Meter => new MeterAggregator(name)
+        case histogram: Histogram => new HistogramAggregator(name)
+        case _ => null // not supported
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/TaskFilterAggregator.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/TaskFilterAggregator.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/TaskFilterAggregator.scala
new file mode 100644
index 0000000..6bc8964
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/TaskFilterAggregator.scala
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.metrics
+
+import scala.collection.mutable.ListBuffer
+import scala.util.{Failure, Success, Try}
+
+import com.typesafe.config.Config
+
+import org.apache.gearpump.cluster.ClientToMaster.ReadOption
+import org.apache.gearpump.cluster.MasterToClient.HistoryMetricsItem
+import org.apache.gearpump.metrics.MetricsAggregator
+import org.apache.gearpump.util.{Constants, LogUtil}
+
+/**
+ * Filters the latest metrics data by specifying a
+ * processor Id range, and taskId range.
+ */
+class TaskFilterAggregator(maxLimit: Int) extends MetricsAggregator {
+
+  import org.apache.gearpump.streaming.metrics.TaskFilterAggregator._
+
+  def this(config: Config) = {
+    this(config.getInt(Constants.GEARPUMP_METRICS_MAX_LIMIT))
+  }
+  override def aggregate(options: Map[String, String], inputs: 
Iterator[HistoryMetricsItem])
+    : List[HistoryMetricsItem] = {
+
+    if (options.get(ReadOption.Key) != Some(ReadOption.ReadLatest)) {
+      // Returns empty set
+      List.empty[HistoryMetricsItem]
+    } else {
+      val parsed = Options.parse(options)
+      if (parsed != null) {
+        aggregate(parsed, inputs)
+      } else {
+        List.empty[HistoryMetricsItem]
+      }
+    }
+  }
+
+  def aggregate(options: Options, inputs: Iterator[HistoryMetricsItem])
+    : List[HistoryMetricsItem] = {
+
+    val result = new ListBuffer[HistoryMetricsItem]
+    val effectiveLimit = Math.min(options.limit, maxLimit)
+    var count = 0
+
+    val taskIdentity = new TaskIdentity(0, 0)
+
+    while (inputs.hasNext && count < effectiveLimit) {
+      val item = inputs.next()
+      if (parseName(item.value.name, taskIdentity)) {
+        if (taskIdentity.processor >= options.startProcessor &&
+          taskIdentity.processor < options.endProcessor &&
+          taskIdentity.task >= options.startTask &&
+          taskIdentity.task < options.endTask) {
+          result.prepend(item)
+          count += 1
+        }
+      }
+    }
+    result.toList
+  }
+
+  // Assume the name format is: "app0.processor0.task0:sendThroughput", returns
+  // (processorId, taskId)
+  //
+  // returns true if success
+  private def parseName(name: String, result: TaskIdentity): Boolean = {
+    val processorStart = name.indexOf(PROCESSOR_TAG)
+    if (processorStart != -1) {
+      val taskStart = name.indexOf(TASK_TAG, processorStart + 1)
+      if (taskStart != -1) {
+        val processorId = name.substring(processorStart, 
taskStart).substring(PROCESSOR_TAG.length)
+          .toInt
+        result.processor = processorId
+        val taskEnd = name.indexOf(":", taskStart + 1)
+        if (taskEnd != -1) {
+          val taskId = name.substring(taskStart, 
taskEnd).substring(TASK_TAG.length).toInt
+          result.task = taskId
+          true
+        } else {
+          false
+        }
+      } else {
+        false
+      }
+    } else {
+      false
+    }
+  }
+}
+
+object TaskFilterAggregator {
+  val StartTask = "startTask"
+  val EndTask = "endTask"
+  val StartProcessor = "startProcessor"
+  val EndProcessor = "endProcessor"
+  val Limit = "limit"
+
+  val TASK_TAG = ".task"
+  val PROCESSOR_TAG = ".processor"
+
+  private class TaskIdentity(var processor: Int, var task: Int)
+
+  case class Options(
+      limit: Int, startTask: Int, endTask: Int, startProcessor: Int, 
endProcessor: Int)
+
+  private val LOG = LogUtil.getLogger(getClass)
+
+  object Options {
+
+    def acceptAll: Options = {
+      new Options(Int.MaxValue, 0, Int.MaxValue, 0, Int.MaxValue)
+    }
+
+    def parse(options: Map[String, String]): Options = {
+      // Do sanity check
+      val optionTry = Try {
+        val startTask = options.get(StartTask).map(_.toInt).getOrElse(0)
+        val endTask = 
options.get(EndTask).map(_.toInt).getOrElse(Integer.MAX_VALUE)
+        val startProcessor = 
options.get(StartProcessor).map(_.toInt).getOrElse(0)
+        val endProcessor = 
options.get(EndProcessor).map(_.toInt).getOrElse(Integer.MAX_VALUE)
+        val limit = options.get(Limit).map(_.toInt).getOrElse(DEFAULT_LIMIT)
+        new Options(limit, startTask, endTask, startProcessor, endProcessor)
+      }
+
+      optionTry match {
+        case Success(options) => options
+        case Failure(ex) =>
+          LOG.error("Failed to parse the options in TaskFilterAggregator. 
Error msg: " +
+            ex.getMessage)
+          null
+      }
+    }
+  }
+
+  val DEFAULT_LIMIT = 1000
+  val MAX_LIMIT = 1000
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/package.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/package.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/package.scala
new file mode 100644
index 0000000..27b1136
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/package.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump
+
+package object streaming {
+  type ProcessorId = Int
+  type TaskIndex = Int
+  type ExecutorId = Int
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSink.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSink.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSink.scala
new file mode 100644
index 0000000..2d81a1f
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSink.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.sink
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.streaming.task.TaskContext
+
+/**
+ * Interface to implement custom data sink where result of a DAG is typically 
written
+ * a DataSink could be a data store like HBase or simply a console
+ *
+ * An example would be like:
+ * {{{
+ *  class ConsoleSink extends DataSink[String] {
+ *
+ *    def open(context: TaskContext): Unit = {}
+ *
+ *    def write(s: String): Unit = {
+ *      Console.println(s)
+ *    }
+ *
+ *    def close(): Unit = {}
+ *  }
+ * }}}
+ *
+ * Subclass is required to be serializable
+ */
+trait DataSink extends java.io.Serializable {
+
+  /**
+   * Opens connection to data sink
+   * invoked at onStart() method of [[org.apache.gearpump.streaming.task.Task]]
+   * @param context is the task context at runtime
+   */
+  def open(context: TaskContext): Unit
+
+  /**
+   * Writes message into data sink
+   * invoked at onNext() method of [[org.apache.gearpump.streaming.task.Task]]
+   * @param message wraps data to be written out
+   */
+  def write(message: Message): Unit
+
+  /**
+   * Closes connection to data sink
+   * invoked at onClose() method of [[org.apache.gearpump.streaming.task.Task]]
+   */
+  def close(): Unit
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkProcessor.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkProcessor.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkProcessor.scala
new file mode 100644
index 0000000..973e371
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkProcessor.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.sink
+
+import akka.actor.ActorSystem
+
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.Processor
+
+/**
+ * Utility that helps user to create a DAG ending in [[DataSink]]
+ * user should pass in a [[DataSink]].
+ *
+ * here is an example to build a DAG that does word count and write to 
KafkaSink
+ * {{{
+ *    val split = Processor[Split](1)
+ *    val sum = Processor[Sum](1)
+ *    val sink = new KafkaSink()
+ *    val sinkProcessor = DataSinkProcessor(sink, 1)
+ *    val dag = split ~> sum ~> sink
+ * }}}
+ */
+object DataSinkProcessor {
+  def apply(
+      dataSink: DataSink,
+      parallelism: Int,
+      description: String = "",
+      taskConf: UserConfig = UserConfig.empty)(implicit system: ActorSystem)
+    : Processor[DataSinkTask] = {
+    Processor[DataSinkTask](parallelism, description = description,
+      taskConf.withValue[DataSink](DataSinkTask.DATA_SINK, dataSink))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala
new file mode 100644
index 0000000..eb6118d
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.sink
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+
+object DataSinkTask {
+  val DATA_SINK = "data_sink"
+}
+
+/**
+ * General task that runs any [[DataSink]]
+ */
+class DataSinkTask(context: TaskContext, conf: UserConfig) extends 
Task(context, conf) {
+  import org.apache.gearpump.streaming.sink.DataSinkTask._
+
+  private val sink = conf.getValue[DataSink](DATA_SINK).get
+
+  override def onStart(startTime: StartTime): Unit = {
+    LOG.info("opening data sink...")
+    sink.open(context)
+  }
+
+  override def onNext(message: Message): Unit = {
+    sink.write(message)
+  }
+
+  override def onStop(): Unit = {
+    LOG.info("closing data sink...")
+    sink.close()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSource.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSource.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSource.scala
new file mode 100644
index 0000000..0fb6db4
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSource.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.source
+
+import org.apache.gearpump.streaming.task.TaskContext
+import org.apache.gearpump.Message
+
+import scala.util.Random
+
+/**
+ * Interface to implement custom source where data is read into the system.
+ * a DataSource could be a message queue like kafka or simply data generation 
source.
+ *
+ * An example would be like
+ * {{{
+ *  GenMsgSource extends DataSource {
+ *
+ *    def open(context: TaskContext, startTime: TimeStamp): Unit = {}
+ *
+ *    def read(context: TaskContext): Message = {
+ *      Message("message")
+ *    }
+ *
+ *    def close(): Unit = {}
+ *  }
+ * }}}
+ *
+ * subclass is required to be serializable
+ */
+trait DataSource extends java.io.Serializable {
+
+  /**
+   * Opens connection to data source
+   * invoked in onStart() method of 
[[org.apache.gearpump.streaming.source.DataSourceTask]]
+   *
+   * @param context is the task context at runtime
+   * @param startTime is the start time of system
+   */
+  def open(context: TaskContext, startTime: Long): Unit
+
+  /**
+   * Reads next message from data source and
+   * returns null if no message is available
+   *
+   * @return a [[org.apache.gearpump.Message]] or null
+   */
+  def read(): Message
+
+  /**
+   * Closes connection to data source.
+   * invoked in onStop() method of 
[[org.apache.gearpump.streaming.source.DataSourceTask]]
+   */
+  def close(): Unit
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceConfig.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceConfig.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceConfig.scala
new file mode 100644
index 0000000..4a76958
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceConfig.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.source
+
+object DataSourceConfig {
+
+  val SOURCE_READ_BATCH_SIZE = "gearpump.source.read.batch.size"
+  val SOURCE_TIMESTAMP_FILTER = "gearpump.source.timestamp.filter.class"
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala
new file mode 100644
index 0000000..ddd6c27
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.source
+
+import akka.actor.ActorSystem
+
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.Processor
+
+/**
+ * Utility that helps user to create a DAG starting with [[DataSourceTask]]
+ * user should pass in a [[DataSource]]
+ *
+ * Here is an example to build a DAG that reads from Kafka source followed by 
word count
+ * {{{
+ *    val source = new KafkaSource()
+ *    val sourceProcessor =  DataSourceProcessor(source, 1)
+ *    val split = Processor[Split](1)
+ *    val sum = Processor[Sum](1)
+ *    val dag = sourceProcessor ~> split ~> sum
+ * }}}
+ */
+object DataSourceProcessor {
+  def apply(
+      dataSource: DataSource,
+      parallelism: Int,
+      description: String = "",
+      taskConf: UserConfig = UserConfig.empty)(implicit system: ActorSystem)
+    : Processor[DataSourceTask] = {
+    Processor[DataSourceTask](parallelism, description = description,
+      taskConf.withValue[DataSource](DataSourceTask.DATA_SOURCE, dataSource))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
new file mode 100644
index 0000000..6777721
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.source
+
+import org.apache.gearpump._
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+
+object DataSourceTask {
+  val DATA_SOURCE = "data_source"
+}
+
+/**
+ * Default Task container for 
[[org.apache.gearpump.streaming.source.DataSource]] that
+ * reads from DataSource in batch
+ * See [[org.apache.gearpump.streaming.source.DataSourceProcessor]] for its 
usage
+ *
+ * DataSourceTask calls:
+ *  - `DataSource.open()` in `onStart` and pass in
+ *  [[org.apache.gearpump.streaming.task.TaskContext]]
+ * and application start time
+ *  - `DataSource.read()` in each `onNext`, which reads a batch of messages
+ *  - `DataSource.close()` in `onStop`
+ */
+class DataSourceTask(context: TaskContext, conf: UserConfig) extends 
Task(context, conf) {
+  import org.apache.gearpump.streaming.source.DataSourceTask._
+
+  private val source = conf.getValue[DataSource](DATA_SOURCE).get
+  private val batchSize = 
conf.getInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE).getOrElse(1000)
+  private var startTime = 0L
+
+  override def onStart(newStartTime: StartTime): Unit = {
+    startTime = newStartTime.startTime
+    LOG.info(s"opening data source at $startTime")
+    source.open(context, startTime)
+    self ! Message("start", System.currentTimeMillis())
+  }
+
+  override def onNext(message: Message): Unit = {
+    0.until(batchSize).foreach { _ =>
+      Option(source.read()).foreach(context.output)
+    }
+    self ! Message("continue", System.currentTimeMillis())
+  }
+
+  override def onStop(): Unit = {
+    LOG.info("closing data source...")
+    source.close()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/source/DefaultTimeStampFilter.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DefaultTimeStampFilter.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DefaultTimeStampFilter.scala
new file mode 100644
index 0000000..df54cc2
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DefaultTimeStampFilter.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.source
+
+import org.apache.gearpump.streaming.transaction.api.TimeStampFilter
+import org.apache.gearpump.{Message, TimeStamp}
+
+/**
+ * TimeStampFilter filters out messages which have obsolete (smaller) 
timestamp.
+ */
+class DefaultTimeStampFilter extends TimeStampFilter {
+  override def filter(msg: Message, predicate: TimeStamp): Option[Message] = {
+    Option(msg).find(_.timestamp >= predicate)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/Monoid.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/Monoid.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/Monoid.scala
new file mode 100644
index 0000000..9886a72
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/Monoid.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.state.api
+
+trait Monoid[T] extends java.io.Serializable {
+  def plus(l: T, r: T): T
+  def zero: T
+}
+
+trait Group[T] extends Monoid[T] {
+  def minus(l: T, r: T): T
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/MonoidState.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/MonoidState.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/MonoidState.scala
new file mode 100644
index 0000000..0e2f83a
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/MonoidState.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.state.api
+
+import org.apache.gearpump.TimeStamp
+
+/**
+ * MonoidState uses Algebird Monoid to aggregate state
+ *
+ * on start, state value is initialized to monoid.zero
+ * on each new message, existing state value is aggregated with
+ * the incoming value using monoid.plus to get a new state value
+ */
+abstract class MonoidState[T](monoid: Monoid[T]) extends PersistentState[T] {
+  // Left state updated by messages before checkpoint time
+  private[state] var left: T = monoid.zero
+  // Right state updated by message after checkpoint time
+  private[state] var right: T = monoid.zero
+
+  protected var checkpointTime = Long.MaxValue
+
+  override def get: Option[T] = Option(monoid.plus(left, right))
+
+  override def setNextCheckpointTime(nextCheckpointTime: TimeStamp): Unit = {
+    checkpointTime = nextCheckpointTime
+  }
+
+  protected def updateState(timestamp: TimeStamp, t: T): Unit = {
+    if (timestamp < checkpointTime) {
+      left = monoid.plus(left, t)
+    } else {
+      right = monoid.plus(right, t)
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentState.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentState.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentState.scala
new file mode 100644
index 0000000..906d331
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentState.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.state.api
+
+import org.apache.gearpump._
+
+/**
+ * PersistentState is part of the transaction API
+ *
+ * Users could get transaction support from the framework by
+ * conforming to PersistentState APIs and extending PersistentTask
+ * to manage the state
+ */
+trait PersistentState[T] {
+
+  /**
+   * Recovers state to a previous checkpoint
+   * usually invoked by the framework
+   */
+  def recover(timestamp: TimeStamp, bytes: Array[Byte]): Unit
+
+  /**
+   * Updates state on a new message
+   * this is invoked by user
+   */
+  def update(timestamp: TimeStamp, t: T): Unit
+
+  /**
+   * Sets next checkpoint time
+   * should be invoked by the framework
+   */
+  def setNextCheckpointTime(timeStamp: TimeStamp): Unit
+
+  /**
+   * Gets a binary snapshot of state
+   * usually invoked by the framework
+   */
+  def checkpoint(): Array[Byte]
+
+  /**
+   * Unwraps the raw value of state
+   */
+  def get: Option[T]
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
new file mode 100644
index 0000000..e40d8cd
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.state.api
+
+import java.util.concurrent.TimeUnit
+import scala.concurrent.duration.FiniteDuration
+
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.state.impl.{CheckpointManager, 
PersistentStateConfig}
+import org.apache.gearpump.streaming.task.{ReportCheckpointClock, StartTime, 
Task, TaskContext}
+import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory
+import org.apache.gearpump.util.LogUtil
+import org.apache.gearpump.{Message, TimeStamp}
+
+object PersistentTask {
+  val CHECKPOINT = Message("checkpoint")
+  val LOG = LogUtil.getLogger(getClass)
+}
+
+/**
+ * PersistentTask is part of the transaction API
+ *
+ * Users should extend this task if they want to get transaction support
+ * from the framework
+ */
+abstract class PersistentTask[T](taskContext: TaskContext, conf: UserConfig)
+  extends Task(taskContext, conf) {
+  import taskContext._
+
+  import org.apache.gearpump.streaming.state.api.PersistentTask._
+
+  val checkpointStoreFactory = conf.getValue[CheckpointStoreFactory](
+    PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY).get
+  val checkpointStore = checkpointStoreFactory.getCheckpointStore(conf, 
taskContext)
+  val checkpointInterval = 
conf.getLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS).get
+  val checkpointManager = new CheckpointManager(checkpointInterval, 
checkpointStore)
+  // System time interval to attempt checkpoint
+  private val checkpointAttemptInterval = 1000L
+
+  /**
+   * Subclass should override this method to pass in a PersistentState. the 
framework has already
+   * offered two states:
+   *  - NonWindowState: state with no time or other boundary
+   *  - WindowState:  each state is bounded by a time window
+   */
+  def persistentState: PersistentState[T]
+
+  /**
+   * Subclass should override this method to specify how a new message should 
update state
+   */
+  def processMessage(state: PersistentState[T], message: Message): Unit
+
+  /** Persistent state that will be stored (by checkpointing) automatically to 
storage like HDFS */
+  val state = persistentState
+
+  final override def onStart(startTime: StartTime): Unit = {
+    val timestamp = startTime.startTime
+    checkpointManager
+      .recover(timestamp)
+      .foreach(state.recover(timestamp, _))
+
+    reportCheckpointClock(timestamp)
+    scheduleCheckpoint(checkpointAttemptInterval)
+  }
+
+  final override def onNext(message: Message): Unit = {
+    message match {
+      case CHECKPOINT =>
+        val upstreamMinClock = taskContext.upstreamMinClock
+        if (checkpointManager.shouldCheckpoint(upstreamMinClock)) {
+          checkpointManager.getCheckpointTime.foreach { checkpointTime =>
+            val serialized = state.checkpoint()
+            checkpointManager.checkpoint(checkpointTime, serialized)
+              .foreach(state.setNextCheckpointTime)
+            taskContext.output(Message(serialized, checkpointTime))
+            reportCheckpointClock(checkpointTime)
+          }
+        }
+        scheduleCheckpoint(checkpointAttemptInterval)
+      case _ =>
+        checkpointManager.update(message.timestamp)
+          .foreach(state.setNextCheckpointTime)
+        processMessage(state, message)
+    }
+  }
+
+  final override def onStop(): Unit = {
+    checkpointManager.close()
+  }
+
+  private def scheduleCheckpoint(interval: Long): Unit = {
+    scheduleOnce(new FiniteDuration(interval, TimeUnit.MILLISECONDS))(self ! 
CHECKPOINT)
+  }
+
+  private def reportCheckpointClock(timestamp: TimeStamp): Unit = {
+    appMaster ! ReportCheckpointClock(taskContext.taskId, timestamp)
+  }
+}


Reply via email to