http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/dsl/StreamApp.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/dsl/StreamApp.scala 
b/streaming/src/main/scala/io/gearpump/streaming/dsl/StreamApp.scala
deleted file mode 100644
index a2ac70f..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/dsl/StreamApp.scala
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.dsl
-
-import scala.language.implicitConversions
-
-import akka.actor.ActorSystem
-
-import io.gearpump.cluster.UserConfig
-import io.gearpump.cluster.client.ClientContext
-import io.gearpump.streaming.StreamApplication
-import io.gearpump.streaming.dsl.op.{DataSourceOp, Op, OpEdge, ProcessorOp}
-import io.gearpump.streaming.dsl.plan.Planner
-import io.gearpump.streaming.source.DataSource
-import io.gearpump.streaming.task.{Task, TaskContext}
-import io.gearpump.util.Graph
-import io.gearpump.{Message, TimeStamp}
-
-/**
- * Example:
- * {{{
- * val data = "This is a good start, bingo!! bingo!!"
- * app.fromCollection(data.lines.toList).
- * // word => (word, count)
- * flatMap(line => line.split("[\\s]+")).map((_, 1)).
- * // (word, count1), (word, count2) => (word, count1 + count2)
- * groupBy(kv => kv._1).reduce(sum(_, _))
- *
- * val appId = context.submit(app)
- * context.close()
- * }}}
- *
- * @param name name of app
- */
-class StreamApp(
-    val name: String, system: ActorSystem, userConfig: UserConfig, val graph: 
Graph[Op, OpEdge]) {
-
-  def this(name: String, system: ActorSystem, userConfig: UserConfig) = {
-    this(name, system, userConfig, Graph.empty[Op, OpEdge])
-  }
-
-  def plan(): StreamApplication = {
-    implicit val actorSystem = system
-    val planner = new Planner
-    val dag = planner.plan(graph)
-    StreamApplication(name, dag, userConfig)
-  }
-}
-
-object StreamApp {
-  def apply(name: String, context: ClientContext, userConfig: UserConfig = 
UserConfig.empty)
-    : StreamApp = {
-    new StreamApp(name, context.system, userConfig)
-  }
-
-  implicit def streamAppToApplication(streamApp: StreamApp): StreamApplication 
= {
-    streamApp.plan
-  }
-
-  implicit class Source(app: StreamApp) extends java.io.Serializable {
-
-    def source[T](dataSource: DataSource, parallism: Int): Stream[T] = {
-      source(dataSource, parallism, UserConfig.empty)
-    }
-
-    def source[T](dataSource: DataSource, parallism: Int, description: 
String): Stream[T] = {
-      source(dataSource, parallism, UserConfig.empty, description)
-    }
-
-    def source[T](dataSource: DataSource, parallism: Int, conf: UserConfig): 
Stream[T] = {
-      source(dataSource, parallism, conf, description = null)
-    }
-
-    def source[T](dataSource: DataSource, parallism: Int, conf: UserConfig, 
description: String)
-      : Stream[T] = {
-      implicit val sourceOp = DataSourceOp(dataSource, parallism, conf, 
description)
-      app.graph.addVertex(sourceOp)
-      new Stream[T](app.graph, sourceOp)
-    }
-    def source[T](seq: Seq[T], parallism: Int, description: String): Stream[T] 
= {
-      this.source(new CollectionDataSource[T](seq), parallism, 
UserConfig.empty, description)
-    }
-
-    def source[T](source: Class[_ <: Task], parallism: Int, conf: UserConfig, 
description: String)
-      : Stream[T] = {
-      val sourceOp = ProcessorOp(source, parallism, conf, 
Option(description).getOrElse("source"))
-      app.graph.addVertex(sourceOp)
-      new Stream[T](app.graph, sourceOp)
-    }
-  }
-}
-
-/** A test message source which generated message sequence repeatedly. */
-class CollectionDataSource[T](seq: Seq[T]) extends DataSource {
-  private val iterator: Iterator[T] = seq.iterator
-
-  override def read(): Message = {
-    if (iterator.hasNext) {
-      Message(iterator.next())
-    } else {
-      null
-    }
-  }
-
-  override def close(): Unit = {}
-
-  override def open(context: TaskContext, startTime: TimeStamp): Unit = {}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStream.scala 
b/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStream.scala
deleted file mode 100644
index 549cc6e..0000000
--- 
a/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStream.scala
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.dsl.javaapi
-
-import scala.collection.JavaConverters._
-
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.dsl.Stream
-import io.gearpump.streaming.javaapi.dsl.functions._
-import io.gearpump.streaming.task.Task
-
-/**
- * Java DSL
- */
-class JavaStream[T](val stream: Stream[T]) {
-
-  /** FlatMap on stream */
-  def flatMap[R](fn: FlatMapFunction[T, R], description: String): 
JavaStream[R] = {
-    new JavaStream[R](stream.flatMap({ t: T => fn(t).asScala }, description))
-  }
-
-  /** Map on stream */
-  def map[R](fn: MapFunction[T, R], description: String): JavaStream[R] = {
-    new JavaStream[R](stream.map({ t: T => fn(t) }, description))
-  }
-
-  /** Only keep the messages that FilterFunction returns true.  */
-  def filter(fn: FilterFunction[T], description: String): JavaStream[T] = {
-    new JavaStream[T](stream.filter({ t: T => fn(t) }, description))
-  }
-
-  /** Does aggregation on the stream */
-  def reduce(fn: ReduceFunction[T], description: String): JavaStream[T] = {
-    new JavaStream[T](stream.reduce({ (t1: T, t2: T) => fn(t1, t2) }, 
description))
-  }
-
-  def log(): Unit = {
-    stream.log()
-  }
-
-  /** Merges streams of same type together */
-  def merge(other: JavaStream[T], description: String): JavaStream[T] = {
-    new JavaStream[T](stream.merge(other.stream, description))
-  }
-
-  /**
-   * Group by a stream and turns it to a list of sub-streams. Operations 
chained after
-   * groupBy applies to sub-streams.
-   */
-  def groupBy[Group](fn: GroupByFunction[T, Group], parallelism: Int, 
description: String)
-    : JavaStream[T] = {
-    new JavaStream[T](stream.groupBy({t: T => fn(t)}, parallelism, 
description))
-  }
-
-  /** Add a low level Processor to process messages */
-  def process[R](
-      processor: Class[_ <: Task], parallelism: Int, conf: UserConfig, 
description: String)
-    : JavaStream[R] = {
-    new JavaStream[R](stream.process(processor, parallelism, conf, 
description))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala
 
b/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala
deleted file mode 100644
index e39e054..0000000
--- 
a/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.dsl.javaapi
-
-import java.util.Collection
-import scala.collection.JavaConverters._
-
-import io.gearpump.cluster.UserConfig
-import io.gearpump.cluster.client.ClientContext
-import io.gearpump.streaming.dsl.{CollectionDataSource, StreamApp}
-import io.gearpump.streaming.source.DataSource
-
-class JavaStreamApp(name: String, context: ClientContext, userConfig: 
UserConfig) {
-
-  private val streamApp = StreamApp(name, context, userConfig)
-
-  def source[T](collection: Collection[T], parallelism: Int,
-      conf: UserConfig, description: String): JavaStream[T] = {
-    val dataSource = new CollectionDataSource(collection.asScala.toSeq)
-    source(dataSource, parallelism, conf, description)
-  }
-
-  def source[T](dataSource: DataSource, parallelism: Int,
-      conf: UserConfig, description: String): JavaStream[T] = {
-    new JavaStream[T](streamApp.source(dataSource, parallelism, conf, 
description))
-  }
-
-  def run(): Unit = {
-    context.submit(streamApp)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/dsl/op/OP.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/dsl/op/OP.scala 
b/streaming/src/main/scala/io/gearpump/streaming/dsl/op/OP.scala
deleted file mode 100644
index f0a86fa..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/dsl/op/OP.scala
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.dsl.op
-
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.sink.DataSink
-import io.gearpump.streaming.source.DataSource
-import io.gearpump.streaming.task.Task
-
-/**
- * Operators for the DSL
- */
-sealed trait Op {
-  def description: String
-  def conf: UserConfig
-}
-
-/**
- * When translated to running DAG, SlaveOP can be attach to MasterOP or other 
SlaveOP
- * "Attach" means running in same Actor.
- */
-trait SlaveOp[T] extends Op
-
-case class FlatMapOp[T, R](
-    fun: (T) => TraversableOnce[R], description: String, conf: UserConfig = 
UserConfig.empty)
-  extends SlaveOp[T]
-
-case class ReduceOp[T](fun: (T, T) => T, description: String, conf: UserConfig 
= UserConfig.empty)
-  extends SlaveOp[T]
-
-trait MasterOp extends Op
-
-trait ParameterizedOp[T] extends MasterOp
-
-case class MergeOp(description: String, override val conf: UserConfig = 
UserConfig.empty)
-  extends MasterOp
-
-case class GroupByOp[T, R](
-    fun: T => R, parallelism: Int, description: String,
-    override val conf: UserConfig = UserConfig.empty)
-  extends ParameterizedOp[T]
-
-case class ProcessorOp[T <: Task](
-    processor: Class[T], parallelism: Int, conf: UserConfig, description: 
String)
-  extends ParameterizedOp[T]
-
-case class DataSourceOp[T](
-    dataSource: DataSource, parallelism: Int, conf: UserConfig, description: 
String)
-  extends ParameterizedOp[T]
-
-case class DataSinkOp[T](
-    dataSink: DataSink, parallelism: Int, conf: UserConfig, description: 
String)
-  extends ParameterizedOp[T]
-
-/**
- * Contains operators which can be chained to single one.
- *
- * For example, flatmap().map().reduce() can be chained to single operator as
- * no data shuffling is required.
- * @param ops list of operations
- */
-case class OpChain(ops: List[Op]) extends Op {
-  def head: Op = ops.head
-  def last: Op = ops.last
-
-  def description: String = null
-
-  override def conf: UserConfig = {
-    // The head's conf has priority
-    ops.reverse.foldLeft(UserConfig.empty) { (conf, op) =>
-      conf.withConfig(op.conf)
-    }
-  }
-}
-
-trait OpEdge
-
-/**
- * The upstream OP and downstream OP doesn't require network data shuffle.
- *
- * For example, map, flatmap operation doesn't require network shuffle, we can 
use Direct
- * to represent the relation with upstream operators.
- */
-case object Direct extends OpEdge
-
-/**
- * The upstream OP and downstream OP DOES require network data shuffle.
- *
- * For example, map, flatmap operation doesn't require network shuffle, we can 
use Direct
- * to represent the relation with upstream operators.
- */
-case object Shuffle extends OpEdge
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala
 
b/streaming/src/main/scala/io/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala
deleted file mode 100644
index b842c7b..0000000
--- 
a/streaming/src/main/scala/io/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.dsl.partitioner
-
-import io.gearpump.Message
-import io.gearpump.partitioner.UnicastPartitioner
-
-/**
- * Partition messages by applying group by function first.
- *
- * For example:
- * {{{
- * case class People(name: String, gender: String)
- *
- * object Test{
- *
- *   val groupBy: (People => String) = people => people.gender
- *   val partitioner = GroupByPartitioner(groupBy)
- * }
- * }}}
- *
- * @param groupBy First apply message with groupBy function, then pick the 
hashCode of the output
- *   to do the partitioning. You must define hashCode() for output type of 
groupBy function.
- */
-class GroupByPartitioner[T, GROUP](groupBy: T => GROUP = null) extends 
UnicastPartitioner {
-  override def getPartition(msg: Message, partitionNum: Int, 
currentPartitionId: Int): Int = {
-    val hashCode = groupBy(msg.msg.asInstanceOf[T]).hashCode()
-    (hashCode & Integer.MAX_VALUE) % partitionNum
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/OpTranslator.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/OpTranslator.scala 
b/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/OpTranslator.scala
deleted file mode 100644
index 11b4c34..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/OpTranslator.scala
+++ /dev/null
@@ -1,296 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.dsl.plan
-
-import scala.collection.TraversableOnce
-
-import akka.actor.ActorSystem
-import org.slf4j.Logger
-
-import io.gearpump._
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.Constants._
-import io.gearpump.streaming.Processor
-import io.gearpump.streaming.Processor.DefaultProcessor
-import io.gearpump.streaming.dsl.op._
-import io.gearpump.streaming.dsl.plan.OpTranslator._
-import io.gearpump.streaming.sink.DataSink
-import io.gearpump.streaming.source.DataSource
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
-import io.gearpump.util.LogUtil
-
-/**
- * Translates a OP to a TaskDescription
- */
-class OpTranslator extends java.io.Serializable {
-  val LOG: Logger = LogUtil.getLogger(getClass)
-
-  def translate(ops: OpChain)(implicit system: ActorSystem): Processor[_ <: 
Task] = {
-
-    val baseConfig = ops.conf
-
-    ops.ops.head match {
-      case op: MasterOp =>
-        val tail = ops.ops.tail
-        val func = toFunction(tail)
-        val userConfig = baseConfig.withValue(GEARPUMP_STREAMING_OPERATOR, 
func)
-
-        op match {
-          case DataSourceOp(dataSource, parallism, conf, description) =>
-            Processor[SourceTask[Object, Object]](parallism,
-              description = description + "." + func.description,
-              userConfig.withValue(GEARPUMP_STREAMING_SOURCE, dataSource))
-          case groupby@GroupByOp(_, parallism, description, _) =>
-            Processor[GroupByTask[Object, Object, Object]](parallism,
-              description = description + "." + func.description,
-              userConfig.withValue(GEARPUMP_STREAMING_GROUPBY_FUNCTION, 
groupby))
-          case merge: MergeOp =>
-            Processor[TransformTask[Object, Object]](1,
-              description = op.description + "." + func.description,
-              userConfig)
-          case ProcessorOp(processor, parallism, conf, description) =>
-            DefaultProcessor(parallism,
-              description = description + "." + func.description,
-              userConfig, processor)
-          case DataSinkOp(dataSink, parallelism, conf, description) =>
-            Processor[SinkTask[Object]](parallelism,
-              description = description + func.description,
-              userConfig.withValue(GEARPUMP_STREAMING_SINK, dataSink))
-        }
-      case op: SlaveOp[_] =>
-        val func = toFunction(ops.ops)
-        val userConfig = baseConfig.withValue(GEARPUMP_STREAMING_OPERATOR, 
func)
-
-        Processor[TransformTask[Object, Object]](1,
-          description = func.description,
-          taskConf = userConfig)
-      case chain: OpChain =>
-        throw new RuntimeException("Not supposed to be called!")
-    }
-  }
-
-  private def toFunction(ops: List[Op]): SingleInputFunction[Object, Object] = 
{
-    val func: SingleInputFunction[Object, Object] = new 
DummyInputFunction[Object]()
-    val totalFunction = ops.foldLeft(func) { (fun, op) =>
-
-      val opFunction = op match {
-        case flatmap: FlatMapOp[Object @unchecked, Object @unchecked] =>
-          new FlatMapFunction(flatmap.fun, flatmap.description)
-        case reduce: ReduceOp[Object @unchecked] =>
-          new ReduceFunction(reduce.fun, reduce.description)
-        case _ =>
-          throw new RuntimeException("Not supposed to be called!")
-      }
-      fun.andThen(opFunction.asInstanceOf[SingleInputFunction[Object, Object]])
-    }
-    totalFunction.asInstanceOf[SingleInputFunction[Object, Object]]
-  }
-}
-
-object OpTranslator {
-
-  trait SingleInputFunction[IN, OUT] extends Serializable {
-    def process(value: IN): TraversableOnce[OUT]
-    def andThen[OUTER](other: SingleInputFunction[OUT, OUTER]): 
SingleInputFunction[IN, OUTER] = {
-      new AndThen(this, other)
-    }
-
-    def description: String
-  }
-
-  class DummyInputFunction[T] extends SingleInputFunction[T, T] {
-    override def andThen[OUTER](other: SingleInputFunction[T, OUTER])
-      : SingleInputFunction[T, OUTER] = {
-      other
-    }
-
-    // Should never be called
-    override def process(value: T): TraversableOnce[T] = None
-
-    override def description: String = ""
-  }
-
-  class AndThen[IN, MIDDLE, OUT](
-      first: SingleInputFunction[IN, MIDDLE], second: 
SingleInputFunction[MIDDLE, OUT])
-    extends SingleInputFunction[IN, OUT] {
-
-    override def process(value: IN): TraversableOnce[OUT] = {
-      first.process(value).flatMap(second.process(_))
-    }
-
-    override def description: String = {
-      Option(first.description).flatMap { description =>
-        Option(second.description).map(description + "." + _)
-      }.getOrElse(null)
-    }
-  }
-
-  class FlatMapFunction[IN, OUT](fun: IN => TraversableOnce[OUT], 
descriptionMessage: String)
-    extends SingleInputFunction[IN, OUT] {
-
-    override def process(value: IN): TraversableOnce[OUT] = {
-      fun(value)
-    }
-
-    override def description: String = {
-      this.descriptionMessage
-    }
-  }
-
-  class ReduceFunction[T](fun: (T, T) => T, descriptionMessage: String)
-    extends SingleInputFunction[T, T] {
-
-    private var state: Any = null
-
-    override def process(value: T): TraversableOnce[T] = {
-      if (state == null) {
-        state = value
-      } else {
-        state = fun(state.asInstanceOf[T], value)
-      }
-      Some(state.asInstanceOf[T])
-    }
-
-    override def description: String = descriptionMessage
-  }
-
-  class GroupByTask[IN, GROUP, OUT](
-      groupBy: IN => GROUP, taskContext: TaskContext, userConf: UserConfig)
-    extends Task(taskContext, userConf) {
-
-    def this(taskContext: TaskContext, userConf: UserConfig) = {
-      this(userConf.getValue[GroupByOp[IN, GROUP]](
-        GEARPUMP_STREAMING_GROUPBY_FUNCTION )(taskContext.system).get.fun,
-        taskContext, userConf)
-    }
-
-    private var groups = Map.empty[GROUP, SingleInputFunction[IN, OUT]]
-
-    override def onStart(startTime: StartTime): Unit = {
-    }
-
-    override def onNext(msg: Message): Unit = {
-      val time = msg.timestamp
-
-      val group = groupBy(msg.msg.asInstanceOf[IN])
-      if (!groups.contains(group)) {
-        val operator =
-          userConf.getValue[SingleInputFunction[IN, 
OUT]](GEARPUMP_STREAMING_OPERATOR).get
-        groups += group -> operator
-      }
-
-      val operator = groups(group)
-
-      operator.process(msg.msg.asInstanceOf[IN]).foreach { msg =>
-        taskContext.output(new Message(msg.asInstanceOf[AnyRef], time))
-      }
-    }
-  }
-
-  class SourceTask[T, OUT](
-      source: DataSource, operator: Option[SingleInputFunction[T, OUT]], 
taskContext: TaskContext,
-      userConf: UserConfig)
-    extends Task(taskContext, userConf) {
-
-    def this(taskContext: TaskContext, userConf: UserConfig) = {
-      this(
-        
userConf.getValue[DataSource](GEARPUMP_STREAMING_SOURCE)(taskContext.system).get,
-        userConf.getValue[SingleInputFunction[T, 
OUT]](GEARPUMP_STREAMING_OPERATOR)(
-          taskContext.system),
-        taskContext, userConf)
-    }
-
-    override def onStart(startTime: StartTime): Unit = {
-      source.open(taskContext, startTime.startTime)
-      self ! Message("start", System.currentTimeMillis())
-    }
-
-    override def onNext(msg: Message): Unit = {
-      val time = System.currentTimeMillis()
-      Option(source.read()).foreach { msg =>
-        operator match {
-          case Some(operator) =>
-            operator match {
-              case bad: DummyInputFunction[T] =>
-                taskContext.output(msg)
-              case _ =>
-                operator.process(msg.msg.asInstanceOf[T]).foreach(msg => {
-                  taskContext.output(new Message(msg.asInstanceOf[AnyRef], 
time))
-                })
-            }
-          case None =>
-            taskContext.output(msg)
-        }
-      }
-
-      self ! Message("next", System.currentTimeMillis())
-    }
-
-    override def onStop(): Unit = {
-      source.close()
-    }
-  }
-
-  class TransformTask[IN, OUT](
-      operator: Option[SingleInputFunction[IN, OUT]], taskContext: TaskContext,
-      userConf: UserConfig) extends Task(taskContext, userConf) {
-
-    def this(taskContext: TaskContext, userConf: UserConfig) = {
-      this(userConf.getValue[SingleInputFunction[IN, OUT]](
-        GEARPUMP_STREAMING_OPERATOR)(taskContext.system), taskContext, 
userConf)
-    }
-
-    override def onStart(startTime: StartTime): Unit = {
-    }
-
-    override def onNext(msg: Message): Unit = {
-      val time = msg.timestamp
-
-      operator match {
-        case Some(operator) =>
-          operator.process(msg.msg.asInstanceOf[IN]).foreach { msg =>
-            taskContext.output(new Message(msg.asInstanceOf[AnyRef], time))
-          }
-        case None =>
-          taskContext.output(new Message(msg.msg, time))
-      }
-    }
-  }
-
-  class SinkTask[T](dataSink: DataSink, taskContext: TaskContext, userConf: 
UserConfig)
-    extends Task(taskContext, userConf) {
-
-    def this(taskContext: TaskContext, userConf: UserConfig) = {
-      
this(userConf.getValue[DataSink](GEARPUMP_STREAMING_SINK)(taskContext.system).get,
-        taskContext, userConf)
-    }
-
-    override def onStart(startTime: StartTime): Unit = {
-      dataSink.open(taskContext)
-    }
-
-    override def onNext(msg: Message): Unit = {
-      dataSink.write(msg)
-    }
-
-    override def onStop(): Unit = {
-      dataSink.close()
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/Planner.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/Planner.scala 
b/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/Planner.scala
deleted file mode 100644
index aafd8d3..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/Planner.scala
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.dsl.plan
-
-import akka.actor.ActorSystem
-
-import io.gearpump.partitioner.{CoLocationPartitioner, HashPartitioner, 
Partitioner}
-import io.gearpump.streaming.Processor
-import io.gearpump.streaming.dsl.op._
-import io.gearpump.streaming.dsl.partitioner.GroupByPartitioner
-import io.gearpump.streaming.task.Task
-import io.gearpump.util.Graph
-
-class Planner {
-
-  /*
-   * Converts Dag of Op to Dag of TaskDescription. TaskDescription is part of 
the low
-   * level Graph API.
-   */
-  def plan(dag: Graph[Op, OpEdge])(implicit system: ActorSystem)
-    : Graph[Processor[_ <: Task], _ <: Partitioner] = {
-
-    val opTranslator = new OpTranslator()
-
-    val newDag = optimize(dag)
-    newDag.mapEdge { (node1, edge, node2) =>
-      edge match {
-        case Shuffle =>
-          node2.head match {
-            case groupBy: GroupByOp[Any @unchecked, Any @unchecked] =>
-              new GroupByPartitioner(groupBy.fun)
-            case _ => new HashPartitioner
-          }
-        case Direct =>
-          new CoLocationPartitioner
-      }
-    }.mapVertex { opChain =>
-      opTranslator.translate(opChain)
-    }
-  }
-
-  private def optimize(dag: Graph[Op, OpEdge]): Graph[OpChain, OpEdge] = {
-    val newGraph = dag.mapVertex(op => OpChain(List(op)))
-
-    val nodes = newGraph.topologicalOrderWithCirclesIterator.toList.reverse
-    for (node <- nodes) {
-      val outGoingEdges = newGraph.outgoingEdgesOf(node)
-      for (edge <- outGoingEdges) {
-        merge(newGraph, edge._1, edge._3)
-      }
-    }
-    newGraph
-  }
-
-  private def merge(dag: Graph[OpChain, OpEdge], node1: OpChain, node2: 
OpChain)
-    : Graph[OpChain, OpEdge] = {
-    if (dag.outDegreeOf(node1) == 1 &&
-      dag.inDegreeOf(node2) == 1 &&
-      // For processor node, we don't allow it to merge with downstream 
operators
-      !node1.head.isInstanceOf[ProcessorOp[_ <: Task]]) {
-      val (_, edge, _) = dag.outgoingEdgesOf(node1)(0)
-      if (edge == Direct) {
-        val opList = OpChain(node1.ops ++ node2.ops)
-        dag.addVertex(opList)
-        for (incomingEdge <- dag.incomingEdgesOf(node1)) {
-          dag.addEdge(incomingEdge._1, incomingEdge._2, opList)
-        }
-
-        for (outgoingEdge <- dag.outgoingEdgesOf(node2)) {
-          dag.addEdge(opList, outgoingEdge._2, outgoingEdge._3)
-        }
-
-        // Remove the old vertex
-        dag.removeVertex(node1)
-        dag.removeVertex(node2)
-      }
-    }
-    dag
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/executor/Executor.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/executor/Executor.scala 
b/streaming/src/main/scala/io/gearpump/streaming/executor/Executor.scala
deleted file mode 100644
index 48007d5..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/executor/Executor.scala
+++ /dev/null
@@ -1,476 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.executor
-
-import java.lang.management.ManagementFactory
-import scala.concurrent.duration._
-
-import akka.actor.SupervisorStrategy.Resume
-import akka.actor._
-import com.typesafe.config.Config
-import org.apache.commons.lang.exception.ExceptionUtils
-import org.slf4j.Logger
-
-import io.gearpump.cluster.worker.WorkerId
-import io.gearpump.cluster.{ClusterConfig, ExecutorContext, UserConfig}
-import io.gearpump.metrics.Metrics.ReportMetrics
-import io.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService}
-import io.gearpump.serializer.SerializationFramework
-import io.gearpump.streaming.AppMasterToExecutor.{MsgLostException, 
TasksChanged, TasksLaunched, _}
-import io.gearpump.streaming.ExecutorToAppMaster.{MessageLoss, 
RegisterExecutor, RegisterTask, UnRegisterTask}
-import io.gearpump.streaming.ProcessorId
-import io.gearpump.streaming.executor.Executor._
-import io.gearpump.streaming.executor.TaskLauncher.TaskArgument
-import io.gearpump.streaming.task.{Subscriber, TaskId}
-import io.gearpump.transport.{Express, HostPort}
-import io.gearpump.util.Constants._
-import io.gearpump.util.{ActorUtil, Constants, LogUtil, TimeOutScheduler}
-
-/**
- * Executor is child of AppMaster.
- * It usually represents a JVM process. It is a container for all tasks.
- */
-
-// TODO: What if Executor stuck in state DynamicDag and cannot get out???
-// For example, due to some message loss when there is network glitch.
-// Executor will hang there for ever???
-//
-class Executor(executorContext: ExecutorContext, userConf : UserConfig, 
launcher: ITaskLauncher)
-  extends Actor with TimeOutScheduler{
-
-  def this(executorContext: ExecutorContext, userConf: UserConfig) = {
-    this(executorContext, userConf, TaskLauncher(executorContext, userConf))
-  }
-
-  import context.dispatcher
-  import executorContext.{appId, appMaster, executorId, resource, worker}
-
-  private val LOG: Logger = LogUtil.getLogger(getClass, executor = executorId, 
app = appId)
-
-  private implicit val timeOut = FUTURE_TIMEOUT
-  private val address = ActorUtil.getFullPath(context.system, self.path)
-  private val systemConfig = context.system.settings.config
-  private val serializerPool = getSerializerPool()
-  private val taskDispatcher = 
systemConfig.getString(Constants.GEARPUMP_TASK_DISPATCHER)
-
-  private var state = State.ACTIVE
-  private var transitionStart = 0L
-  // States transition start, in unix time
-  private var transitionEnd = 0L
-  // States transition end, in unix time
-  private val transitWarningThreshold = 5000 // ms,
-
-  // Starts health check Ticks
-  self ! HealthCheck
-
-  LOG.info(s"Executor $executorId has been started, start to register 
itself...")
-  LOG.info(s"Executor actor path: ${ActorUtil.getFullPath(context.system, 
self.path)}")
-
-  appMaster ! RegisterExecutor(self, executorId, resource, worker)
-  context.watch(appMaster)
-
-  private var tasks = Map.empty[TaskId, ActorRef]
-  private val taskArgumentStore = new TaskArgumentStore()
-
-  val express = Express(context.system)
-
-  val metricsEnabled = systemConfig.getBoolean(GEARPUMP_METRIC_ENABLED)
-
-  if (metricsEnabled) {
-    // Registers jvm metrics
-    Metrics(context.system).register(new 
JvmMetricsSet(s"app$appId.executor$executorId"))
-
-    val metricsReportService = context.actorOf(Props(new 
MetricsReporterService(
-      Metrics(context.system))))
-    appMaster.tell(ReportMetrics, metricsReportService)
-  }
-
-  private val NOT_INITIALIZED = -1
-  def receive: Receive = applicationReady(dagVersion = NOT_INITIALIZED)
-
-  private def getTaskId(actorRef: ActorRef): Option[TaskId] = {
-    tasks.find(_._2 == actorRef).map(_._1)
-  }
-
-  override val supervisorStrategy =
-    OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1.minute) {
-      case _: MsgLostException =>
-        val taskId = getTaskId(sender)
-        val cause = s"We got MessageLossException from task 
${getTaskId(sender)}, " +
-          s"replaying application..."
-        LOG.error(cause)
-        taskId.foreach(appMaster ! MessageLoss(executorId, _, cause))
-        Resume
-      case ex: Throwable =>
-        val taskId = getTaskId(sender)
-        val errorMsg = s"We got ${ex.getClass.getName} from $taskId, we will 
treat it as" +
-          s" MessageLoss, so that the system will replay all lost message"
-        LOG.error(errorMsg, ex)
-        val detailErrorMsg = errorMsg + "\n" + ExceptionUtils.getStackTrace(ex)
-        taskId.foreach(appMaster ! MessageLoss(executorId, _, detailErrorMsg))
-        Resume
-    }
-
-  private def launchTask(taskId: TaskId, argument: TaskArgument): ActorRef = {
-    launcher.launch(List(taskId), argument, context, serializerPool, 
taskDispatcher).values.head
-  }
-
-  private def assertVersion(expectVersion: Int, version: Int, clue: Any): Unit 
= {
-    if (expectVersion != version) {
-      val errorMessage = s"Version mismatch: we expect dag version 
$expectVersion, " +
-        s"but get $version; clue: $clue"
-      LOG.error(errorMessage)
-      throw new DagVersionMismatchException(errorMessage)
-    }
-  }
-
-  def dynamicDagPhase1(
-      dagVersion: Int, launched: List[TaskId], changed: List[ChangeTask], 
registered: List[TaskId])
-    : Receive = {
-    state = State.DYNAMIC_DAG_PHASE1
-    box({
-      case launch@LaunchTasks(taskIds, version, processorDescription,
-      subscribers: List[Subscriber]) => {
-        assertVersion(dagVersion, version, clue = launch)
-
-        LOG.info(s"Launching Task $taskIds for app: $appId")
-        val taskArgument = TaskArgument(version, processorDescription, 
subscribers)
-        taskIds.foreach(taskArgumentStore.add(_, taskArgument))
-        val newAdded = launcher.launch(taskIds, taskArgument, context, 
serializerPool,
-          taskDispatcher)
-        newAdded.foreach { newAddedTask =>
-          context.watch(newAddedTask._2)
-        }
-        tasks ++= newAdded
-        sender ! TasksLaunched
-        context.become(dynamicDagPhase1(version, launched ++ taskIds, changed, 
registered))
-      }
-      case change@ChangeTasks(taskIds, version, life, subscribers) =>
-        assertVersion(dagVersion, version, clue = change)
-
-        LOG.info(s"Change Tasks $taskIds for app: $appId, verion: $life, 
$dagVersion, $subscribers")
-
-        val newChangedTasks = taskIds.map { taskId =>
-          for (taskArgument <- taskArgumentStore.get(dagVersion, taskId)) {
-            val processorDescription = 
taskArgument.processorDescription.copy(life = life)
-            taskArgumentStore.add(taskId, TaskArgument(dagVersion, 
processorDescription,
-              subscribers))
-          }
-          ChangeTask(taskId, dagVersion, life, subscribers)
-        }
-        sender ! TasksChanged(taskIds)
-        context.become(dynamicDagPhase1(dagVersion, launched, changed ++ 
newChangedTasks,
-          registered))
-
-      case locations@TaskLocationsReady(taskLocations, version) =>
-        LOG.info(s"TaskLocations Ready...")
-        assertVersion(dagVersion, version, clue = locations)
-
-        // Check whether all tasks has been registered.
-        if ((launched.toSet -- registered.toSet).isEmpty) {
-          // Confirm all tasks has been registered.
-          val result = taskLocations.locations.filter {
-            location => !location._1.equals(express.localHost)
-          }.flatMap { kv =>
-            val (host, taskIdList) = kv
-            taskIdList.map(taskId => (TaskId.toLong(taskId), host))
-          }
-
-          val replyTo = sender
-          express.startClients(taskLocations.locations.keySet).foreach { _ =>
-            express.remoteAddressMap.send(result)
-            express.remoteAddressMap.future().foreach { _ =>
-              LOG.info(s"sending TaskLocationsReceived back to appmaster")
-              replyTo ! TaskLocationsReceived(version, executorId)
-            }
-          }
-          context.become(dynamicDagPhase2(dagVersion, launched, changed))
-        } else {
-          LOG.error("Inconsistency between AppMaser and Executor! AppMaster 
thinks DynamicDag " +
-            "transition is ready, while Executor have not get all tasks 
registered, " +
-            "that task will not be functional...")
-
-          // Reject TaskLocations...
-          val missedTasks = (launched.toSet -- registered.toSet).toList
-          val errorMsg = "We have not received TaskRegistered for following 
tasks: " +
-            missedTasks.mkString(", ")
-          LOG.error(errorMsg)
-          sender ! TaskLocationsRejected(dagVersion, executorId, errorMsg, 
null)
-          // Stays with current status...
-        }
-
-      case confirm: TaskRegistered =>
-        tasks.get(confirm.taskId).foreach {
-          case actorRef: ActorRef =>
-            tasks += confirm.taskId -> actorRef
-            actorRef forward confirm
-        }
-        context.become(dynamicDagPhase1(dagVersion, launched, changed,
-          registered :+ confirm.taskId))
-
-      case rejected: TaskRejected =>
-        // Means this task shoud not exists...
-        tasks.get(rejected.taskId).foreach(_ ! PoisonPill)
-        tasks -= rejected.taskId
-        LOG.error(s"Task ${rejected.taskId} is rejected by AppMaster, shutting 
down it...")
-
-      case register: RegisterTask =>
-        appMaster ! register
-    })
-  }
-
-  def dynamicDagPhase2(dagVersion: Int, launched: List[TaskId], changed: 
List[ChangeTask])
-    : Receive = {
-    LOG.info("Transit to dynamic Dag Phase2")
-    state = State.DYNAMIC_DAG_PHASE2
-    box {
-      case startAll@StartAllTasks(version) =>
-        LOG.info(s"Start All Tasks...")
-        assertVersion(dagVersion, version, clue = startAll)
-
-        launched.foreach(taskId => tasks.get(taskId).foreach(_ ! 
StartTask(taskId)))
-        changed.foreach(changeTask => tasks.get(changeTask.taskId).foreach(_ ! 
changeTask))
-
-        taskArgumentStore.removeNewerVersion(dagVersion)
-        taskArgumentStore.removeObsoleteVersion
-        context.become(applicationReady(dagVersion))
-    }
-  }
-
-  def applicationReady(dagVersion: Int): Receive = {
-    state = State.ACTIVE
-    transitionEnd = System.currentTimeMillis()
-
-    if (dagVersion != NOT_INITIALIZED) {
-      LOG.info("Transit to state Application Ready. This transition takes " +
-        (transitionEnd - transitionStart) + " milliseconds")
-    }
-    box {
-      case start: StartDynamicDag =>
-        LOG.info("received StartDynamicDag")
-        if (start.dagVersion > dagVersion) {
-          transitionStart = System.currentTimeMillis()
-          LOG.info(s"received $start, Executor transit to dag version: 
${start.dagVersion} from " +
-            s"current version $dagVersion")
-          context.become(dynamicDagPhase1(start.dagVersion, List.empty[TaskId],
-            List.empty[ChangeTask], List.empty[TaskId]))
-        }
-      case launch: LaunchTasks =>
-        if (launch.dagVersion > dagVersion) {
-          transitionStart = System.currentTimeMillis()
-          LOG.info(s"received $launch, Executor transit to dag " +
-            s"version: ${launch.dagVersion} from current version $dagVersion")
-          context.become(dynamicDagPhase1(launch.dagVersion, 
List.empty[TaskId],
-            List.empty[ChangeTask], List.empty[TaskId]))
-          self forward launch
-        }
-
-      case change: ChangeTasks =>
-        if (change.dagVersion > dagVersion) {
-          transitionStart = System.currentTimeMillis()
-          LOG.info(s"received $change, Executor transit to dag version: 
${change.dagVersion} from" +
-            s" current version $dagVersion")
-          context.become(dynamicDagPhase1(change.dagVersion, 
List.empty[TaskId],
-            List.empty[ChangeTask], List.empty[TaskId]))
-          self forward change
-        }
-
-      case StopTask(taskId) =>
-        // Old soldiers never die, they just fade away ;)
-        val fadeAwayTask = tasks.get(taskId)
-        if (fadeAwayTask.isDefined) {
-          context.stop(fadeAwayTask.get)
-        }
-        tasks -= taskId
-
-      case unRegister@UnRegisterTask(taskId, _) =>
-        // Sends UnRegisterTask to AppMaster
-        appMaster ! unRegister
-    }
-  }
-
-  def restartingTasks(dagVersion: Int, remain: Int, needRestart: 
List[TaskId]): Receive = {
-    state = State.RECOVERY
-    box {
-      case TaskStopped(actor) =>
-        for (taskId <- getTaskId(actor)) {
-          if (taskArgumentStore.get(dagVersion, taskId).nonEmpty) {
-            val newNeedRestart = needRestart :+ taskId
-            val newRemain = remain - 1
-            if (newRemain == 0) {
-              val newRestarted = newNeedRestart.map { taskId_ =>
-                val taskActor = launchTask(taskId_, 
taskArgumentStore.get(dagVersion, taskId_).get)
-                context.watch(taskActor)
-                taskId_ -> taskActor
-              }.toMap
-
-              tasks = newRestarted
-              context.become(dynamicDagPhase1(dagVersion, newNeedRestart, 
List.empty[ChangeTask],
-                List.empty[TaskId]))
-            } else {
-              context.become(restartingTasks(dagVersion, newRemain, 
newNeedRestart))
-            }
-          }
-        }
-    }
-  }
-
-  val terminationWatch: Receive = {
-    case Terminated(actor) =>
-      if (actor.compareTo(appMaster) == 0) {
-        LOG.info(s"AppMaster ${appMaster.path.toString} is terminated, 
shutting down current " +
-          s"executor $appId, $executorId")
-        context.stop(self)
-      } else {
-        self ! TaskStopped(actor)
-      }
-  }
-
-  def onRestartTasks: Receive = {
-    case RestartTasks(dagVersion) =>
-      LOG.info(s"Executor received restart tasks")
-      val tasksToRestart = tasks.keys.count(taskArgumentStore.get(dagVersion, 
_).nonEmpty)
-      express.remoteAddressMap.send(Map.empty[Long, HostPort])
-      context.become(restartingTasks(dagVersion, remain = tasksToRestart,
-        needRestart = List.empty[TaskId]))
-
-      tasks.values.foreach {
-        case task: ActorRef => task ! PoisonPill
-      }
-  }
-
-  def executorService: Receive = terminationWatch orElse onRestartTasks orElse 
{
-    case taskChanged: TaskChanged =>
-    // Skip
-    case get: GetExecutorSummary =>
-      val logFile = LogUtil.applicationLogDir(systemConfig)
-      val processorTasks = 
tasks.keySet.groupBy(_.processorId).mapValues(_.toList).view.force
-      sender ! ExecutorSummary(
-        executorId,
-        worker.workerId,
-        address,
-        logFile.getAbsolutePath,
-        state,
-        tasks.size,
-        processorTasks,
-        jvmName = ManagementFactory.getRuntimeMXBean().getName())
-
-    case query: QueryExecutorConfig =>
-      sender ! 
ExecutorConfig(ClusterConfig.filterOutDefaultConfig(systemConfig))
-    case HealthCheck =>
-      context.system.scheduler.scheduleOnce(3.second)(HealthCheck)
-      if (state != State.ACTIVE && (transitionEnd - transitionStart) > 
transitWarningThreshold) {
-        LOG.error(s"Executor status: " + state +
-          s", it takes too long(${transitionEnd - transitionStart}) to do 
transition")
-      }
-  }
-
-  private def getSerializerPool(): SerializationFramework = {
-    val system = context.system.asInstanceOf[ExtendedActorSystem]
-    val clazz = 
Class.forName(systemConfig.getString(Constants.GEARPUMP_SERIALIZER_POOL))
-    val pool = clazz.newInstance().asInstanceOf[SerializationFramework]
-    pool.init(system, userConf)
-    pool.asInstanceOf[SerializationFramework]
-  }
-
-  private def unHandled(state: String): Receive = {
-    case other =>
-      LOG.info(s"Received unknown message $other in state: $state")
-  }
-
-  private def box(receive: Receive): Receive = {
-    executorService orElse receive orElse unHandled(state)
-  }
-}
-
-object Executor {
-  case class RestartTasks(dagVersion: Int)
-
-  class TaskArgumentStore {
-
-    private var store = Map.empty[TaskId, List[TaskArgument]]
-
-    def add(taskId: TaskId, task: TaskArgument): Unit = {
-      val list = store.getOrElse(taskId, List.empty[TaskArgument])
-      store += taskId -> (task :: list)
-    }
-
-    def get(dagVersion: Int, taskId: TaskId): Option[TaskArgument] = {
-      store.get(taskId).flatMap { list =>
-        list.find { arg =>
-          arg.dagVersion <= dagVersion
-        }
-      }
-    }
-
-    /**
-     * When the new DAG is successfully deployed, then we should remove 
obsolete
-     * TaskArgument of old DAG.
-     */
-    def removeObsoleteVersion(): Unit = {
-      store = store.map { kv =>
-        val (k, list) = kv
-        (k, list.take(1))
-      }
-    }
-
-    def removeNewerVersion(currentVersion: Int): Unit = {
-      store = store.map { kv =>
-        val (k, list) = kv
-        (k, list.filter(_.dagVersion <= currentVersion))
-      }
-    }
-  }
-
-  case class TaskStopped(task: ActorRef)
-
-  case class ExecutorSummary(
-      id: Int,
-      workerId: WorkerId,
-      actorPath: String,
-      logFile: String,
-      status: String,
-      taskCount: Int,
-      tasks: Map[ProcessorId, List[TaskId]],
-      jvmName: String
-  )
-
-  object ExecutorSummary {
-    def empty: ExecutorSummary = {
-      ExecutorSummary(0, WorkerId.unspecified, "", "", "", 1, null, jvmName = 
"")
-    }
-  }
-
-  case class GetExecutorSummary(executorId: Int)
-
-  case class QueryExecutorConfig(executorId: Int)
-
-  case class ExecutorConfig(config: Config)
-
-  class DagVersionMismatchException(msg: String) extends Exception(msg)
-
-  object State {
-    val ACTIVE = "active"
-    val DYNAMIC_DAG_PHASE1 = "dynamic_dag_phase1"
-    val DYNAMIC_DAG_PHASE2 = "dynamic_dag_phase2"
-    val RECOVERY = "dag_recovery"
-  }
-
-  object HealthCheck
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/executor/ExecutorRestartPolicy.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/executor/ExecutorRestartPolicy.scala
 
b/streaming/src/main/scala/io/gearpump/streaming/executor/ExecutorRestartPolicy.scala
deleted file mode 100644
index c40aa5f..0000000
--- 
a/streaming/src/main/scala/io/gearpump/streaming/executor/ExecutorRestartPolicy.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.executor
-
-import scala.collection.immutable
-import scala.concurrent.duration.Duration
-
-import io.gearpump.streaming.task.TaskId
-import io.gearpump.util.RestartPolicy
-
-/**
- *
- * Controls how many retries to recover failed executors.
- *
- * @param maxNrOfRetries the number of times a executor is allowed to be 
restarted,
- *                       negative value means no limit, if the limit is 
exceeded the policy
- *                       will not allow to restart the executor
- * @param withinTimeRange duration of the time window for maxNrOfRetries, 
Duration.Inf
- *                        means no window
- */
-class ExecutorRestartPolicy(maxNrOfRetries: Int, withinTimeRange: Duration) {
-  private var executorToTaskIds = Map.empty[Int, Set[TaskId]]
-  private var taskRestartPolocies = new immutable.HashMap[TaskId, 
RestartPolicy]
-
-  def addTaskToExecutor(executorId: Int, taskId: TaskId): Unit = {
-    var taskSetForExecutorId = executorToTaskIds.getOrElse(executorId, 
Set.empty[TaskId])
-    taskSetForExecutorId += taskId
-    executorToTaskIds += executorId -> taskSetForExecutorId
-    if (!taskRestartPolocies.contains(taskId)) {
-      taskRestartPolocies += taskId -> new RestartPolicy(maxNrOfRetries, 
withinTimeRange)
-    }
-  }
-
-  def allowRestartExecutor(executorId: Int): Boolean = {
-    executorToTaskIds.get(executorId).map { taskIds =>
-      taskIds.foreach { taskId =>
-        taskRestartPolocies.get(taskId).map { policy =>
-          if (!policy.allowRestart) {
-            // scalastyle:off return
-            return false
-            // scalastyle:on return
-          }
-        }
-      }
-    }
-    true
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/executor/TaskLauncher.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/executor/TaskLauncher.scala 
b/streaming/src/main/scala/io/gearpump/streaming/executor/TaskLauncher.scala
deleted file mode 100644
index d377ea4..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/executor/TaskLauncher.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.executor
-
-import akka.actor.{Actor, ActorRef, ActorRefFactory, Props}
-
-import io.gearpump.cluster.{ExecutorContext, UserConfig}
-import io.gearpump.serializer.SerializationFramework
-import io.gearpump.streaming.ProcessorDescription
-import io.gearpump.streaming.executor.TaskLauncher.TaskArgument
-import io.gearpump.streaming.task._
-import io.gearpump.streaming.util.ActorPathUtil
-
-trait ITaskLauncher {
-
-  /** Launch a list of task actors */
-  def launch(taskIds: List[TaskId], argument: TaskArgument,
-      context: ActorRefFactory, serializer: SerializationFramework, 
dispatcher: String)
-    : Map[TaskId, ActorRef]
-}
-
-class TaskLauncher(
-    appId: Int,
-    appName: String,
-    executorId: Int,
-    appMaster: ActorRef,
-    userConf: UserConfig,
-    taskActorClass: Class[_ <: Actor])
-  extends ITaskLauncher{
-
-  override def launch(
-      taskIds: List[TaskId], argument: TaskArgument,
-      context: ActorRefFactory, serializer: SerializationFramework, 
dispatcher: String)
-    : Map[TaskId, ActorRef] = {
-    import argument.{processorDescription, subscribers}
-
-    val taskConf = userConf.withConfig(processorDescription.taskConf)
-
-    val taskContext = TaskContextData(executorId,
-      appId, appName, appMaster,
-      processorDescription.parallelism,
-      processorDescription.life, subscribers)
-
-    val taskClass = TaskUtil.loadClass(processorDescription.taskClass)
-
-    var tasks = Map.empty[TaskId, ActorRef]
-    taskIds.foreach { taskId =>
-      val task = new TaskWrapper(taskId, taskClass, taskContext, taskConf)
-      val taskActor = context.actorOf(Props(taskActorClass, taskId, 
taskContext, userConf, task,
-        serializer).withDispatcher(dispatcher), 
ActorPathUtil.taskActorName(taskId))
-      tasks += taskId -> taskActor
-    }
-    tasks
-  }
-}
-
-object TaskLauncher {
-
-  case class TaskArgument(
-      dagVersion: Int, processorDescription: ProcessorDescription,
-      subscribers: List[Subscriber])
-
-  def apply(executorContext: ExecutorContext, userConf: UserConfig): 
TaskLauncher = {
-    import executorContext.{appId, appMaster, appName, executorId}
-    new TaskLauncher(appId, appName, executorId, appMaster, userConf, 
classOf[TaskActor])
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/metrics/ProcessorAggregator.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/metrics/ProcessorAggregator.scala
 
b/streaming/src/main/scala/io/gearpump/streaming/metrics/ProcessorAggregator.scala
deleted file mode 100644
index 8be2e72..0000000
--- 
a/streaming/src/main/scala/io/gearpump/streaming/metrics/ProcessorAggregator.scala
+++ /dev/null
@@ -1,301 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.metrics
-
-import java.util
-
-import com.typesafe.config.Config
-
-import io.gearpump.TimeStamp
-import io.gearpump.cluster.ClientToMaster.ReadOption
-import io.gearpump.cluster.MasterToClient.HistoryMetricsItem
-import io.gearpump.google.common.collect.Iterators
-import io.gearpump.metrics.Metrics.{Histogram, Meter}
-import io.gearpump.metrics.MetricsAggregator
-import io.gearpump.streaming.metrics.ProcessorAggregator._
-import io.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
-
-/**
- *
- * Does aggregation on metrics after grouping by these three attributes:
- *  1. processorId
- *  2. time section(represented as a index integer)
- *  3. metricName(like sendThroughput)
- *
- * It assumes that for each 
[[io.gearpump.cluster.MasterToClient.HistoryMetricsItem]], the name
- * follow the format 
app(appId).processor(processorId).task(taskId).(metricName)
- *
- * It parses the name to get processorId and metricName. If the parsing fails, 
then current
- * [[io.gearpump.cluster.MasterToClient.HistoryMetricsItem]] will be skipped.
- *
- * NOTE: this class is optimized for performance.
- */
-class ProcessorAggregator(historyMetricConfig: HistoryMetricsConfig) extends 
MetricsAggregator {
-
-  def this(config: Config) = {
-    this(HistoryMetricsConfig(config))
-  }
-
-  private val aggregatorFactory: AggregatorFactory = new AggregatorFactory()
-
-  /**
-   * Accepts options:
-   * key: "readOption", value: one of "readLatest", "readRecent", "readHistory"
-   */
-  override def aggregate(options: Map[String, String],
-      inputs: Iterator[HistoryMetricsItem]): List[HistoryMetricsItem] = {
-    val readOption = 
options.get(ReadOption.Key).getOrElse(ReadOption.ReadLatest)
-    aggregate(readOption, inputs, System.currentTimeMillis())
-  }
-
-  def aggregate(
-      readOption: ReadOption.ReadOption, inputs: Iterator[HistoryMetricsItem], 
now: TimeStamp)
-    : List[HistoryMetricsItem] = {
-    val (start, end, interval) = getTimeRange(readOption, now)
-    val timeSlotsCount = ((end - start - 1) / interval + 1).toInt
-    val map = new MultiLayerMap[Aggregator](timeSlotsCount)
-
-    val taskIdentity = new TaskIdentity(0, null)
-    while (inputs.hasNext) {
-      val item = inputs.next()
-
-      if (item.value.isInstanceOf[Meter] || 
item.value.isInstanceOf[Histogram]) {
-        if (item.time >= start && item.time < end) {
-          val timeIndex = ((item.time - start) / interval).toInt
-
-          if (parseName(item.value.name, taskIdentity)) {
-            var op = map.get(timeIndex, taskIdentity.group)
-            if (op == null) {
-              op = aggregatorFactory.create(item, taskIdentity.group)
-              map.put(timeIndex, taskIdentity.group, op)
-            }
-            op.aggregate(item)
-          }
-        }
-      }
-    }
-
-    val result = new Array[HistoryMetricsItem](map.size)
-    val iterator = map.valueIterator
-    var index = 0
-    while (iterator.hasNext()) {
-      val op = iterator.next()
-      result(index) = op.result
-      index += 1
-    }
-
-    result.toList
-  }
-
-  // Returns (start, end, interval)
-  private def getTimeRange(readOption: ReadOption.ReadOption, now: TimeStamp)
-    : (TimeStamp, TimeStamp, TimeStamp) = {
-    readOption match {
-      case ReadOption.ReadRecent =>
-        val end = now
-        val start = end - (historyMetricConfig.retainRecentDataSeconds) * 1000
-        val interval = historyMetricConfig.retainRecentDataIntervalMs
-        (floor(start, interval), floor(end, interval), interval)
-      case ReadOption.ReadHistory =>
-        val end = now
-        val start = end - (historyMetricConfig.retainHistoryDataHours) * 3600 
* 1000
-        val interval = historyMetricConfig.retainHistoryDataIntervalMs
-        (floor(start, interval), floor(end, interval), interval)
-      case _ =>
-        // All data points are aggregated together.
-        (0L, Long.MaxValue, Long.MaxValue)
-    }
-  }
-
-  // The original metrics data is divided by interval points:
-  // time series (0, interval, 2*interval, 3*interval....)
-  // floor(..) make sure the Aggregator use the same set of interval points.
-  private def floor(value: Long, interval: Long): Long = {
-    (value / interval) * interval
-  }
-
-  // Returns "app0.processor0:sendThroughput" as the group Id.
-  private def parseName(name: String, result: TaskIdentity): Boolean = {
-    val taskIndex = name.indexOf(TASK_TAG)
-    if (taskIndex > 0) {
-      val processor = name.substring(0, taskIndex)
-      val typeIndex = name.indexOf(":", taskIndex + 1)
-      if (typeIndex > 0) {
-        result.task = (name.substring(taskIndex + TASK_TAG.length, 
typeIndex)).toShort
-        val metricName = name.substring(typeIndex)
-        result.group = processor + metricName
-        true
-      } else {
-        false
-      }
-    } else {
-      false
-    }
-  }
-}
-
-object ProcessorAggregator {
-  val readOption = ReadOption.Key
-
-  private val TASK_TAG = ".task"
-
-  private class TaskIdentity(var task: Short, var group: String)
-
-  /**
-   *
-   * MultiLayerMap has multiple layers. For each layer, there
-   * is a hashMap.
-   *
-   * To access a value with get, user need to specify first layer Id, then key.
-   *
-   * This class is optimized for performance.
-   */
-  class MultiLayerMap[Value](layers: Int) {
-
-    private var _size: Int = 0
-    private val map: Array[java.util.HashMap[String, Value]] = 
createMap(layers)
-
-    /**
-     * @param key key in current layer
-     * @return return null if key is not found
-     */
-    def get(layer: Int, key: String): Value = {
-      if (layer < layers) {
-        map(layer).get(key)
-      } else {
-        null.asInstanceOf[Value]
-      }
-    }
-
-    def put(layer: Int, key: String, value: Value): Unit = {
-      if (layer < layers) {
-        map(layer).put(key, value)
-        _size += 1
-      }
-    }
-
-    def size: Int = _size
-
-    def valueIterator: util.Iterator[Value] = {
-      val iterators = new Array[util.Iterator[Value]](layers)
-      var layer = 0
-      while (layer < layers) {
-        iterators(layer) = map(layer).values().iterator()
-        layer += 1
-      }
-
-      Iterators.concat(iterators: _*)
-    }
-
-    private def createMap(layers: Int) = {
-      val map = new Array[java.util.HashMap[String, Value]](layers)
-      var index = 0
-      val length = map.length
-      while (index < length) {
-        map(index) = new java.util.HashMap[String, Value]()
-        index += 1
-      }
-      map
-    }
-  }
-
-  trait Aggregator {
-    def aggregate(item: HistoryMetricsItem): Unit
-    def result: HistoryMetricsItem
-  }
-
-  class HistogramAggregator(name: String) extends Aggregator {
-
-    var count: Long = 0
-    var mean: Double = 0
-    var stddev: Double = 0
-    var median: Double = 0
-    var p95: Double = 0
-    var p99: Double = 0
-    var p999: Double = 0
-
-    var startTime: TimeStamp = Long.MaxValue
-
-    override def aggregate(item: HistoryMetricsItem): Unit = {
-      val input = item.value.asInstanceOf[Histogram]
-      count += 1
-      mean += input.mean
-      stddev += input.stddev
-      median += input.median
-      p95 += input.p95
-      p99 += input.p99
-      p999 += input.p999
-
-      if (item.time < startTime) {
-        startTime = item.time
-      }
-    }
-
-    override def result: HistoryMetricsItem = {
-      if (count > 0) {
-        HistoryMetricsItem(startTime, Histogram(name, mean / count, stddev / 
count,
-          median / count, p95 / count, p99 / count, p999 / count))
-      } else {
-        HistoryMetricsItem(0, Histogram(name, 0, 0, 0, 0, 0, 0))
-      }
-    }
-  }
-
-  class MeterAggregator(name: String) extends Aggregator {
-
-    var count: Long = 0
-    var meanRate: Double = 0
-    var m1: Double = 0
-    var rateUnit: String = null
-
-    var startTime: TimeStamp = Long.MaxValue
-
-    override def aggregate(item: HistoryMetricsItem): Unit = {
-
-      val input = item.value.asInstanceOf[Meter]
-      count += input.count
-
-      meanRate += input.meanRate
-      m1 += input.m1
-
-      if (null == rateUnit) {
-        rateUnit = input.rateUnit
-      }
-
-      if (item.time < startTime) {
-        startTime = item.time
-      }
-    }
-
-    override def result: HistoryMetricsItem = {
-      HistoryMetricsItem(startTime, Meter(name, count, meanRate,
-        m1, rateUnit))
-    }
-  }
-
-  class AggregatorFactory {
-    def create(item: HistoryMetricsItem, name: String): Aggregator = {
-      item.value match {
-        case meter: Meter => new MeterAggregator(name)
-        case histogram: Histogram => new HistogramAggregator(name)
-        case _ => null // not supported
-      }
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/metrics/TaskFilterAggregator.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/metrics/TaskFilterAggregator.scala
 
b/streaming/src/main/scala/io/gearpump/streaming/metrics/TaskFilterAggregator.scala
deleted file mode 100644
index dbf79ec..0000000
--- 
a/streaming/src/main/scala/io/gearpump/streaming/metrics/TaskFilterAggregator.scala
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.metrics
-
-import scala.collection.mutable.ListBuffer
-import scala.util.{Failure, Success, Try}
-
-import com.typesafe.config.Config
-
-import io.gearpump.cluster.ClientToMaster.ReadOption
-import io.gearpump.cluster.MasterToClient.HistoryMetricsItem
-import io.gearpump.metrics.MetricsAggregator
-import io.gearpump.util.{Constants, LogUtil}
-
-/**
- * Filters the latest metrics data by specifying a
- * processor Id range, and taskId range.
- */
-class TaskFilterAggregator(maxLimit: Int) extends MetricsAggregator {
-
-  import io.gearpump.streaming.metrics.TaskFilterAggregator._
-
-  def this(config: Config) = {
-    this(config.getInt(Constants.GEARPUMP_METRICS_MAX_LIMIT))
-  }
-  override def aggregate(options: Map[String, String], inputs: 
Iterator[HistoryMetricsItem])
-    : List[HistoryMetricsItem] = {
-
-    if (options.get(ReadOption.Key) != Some(ReadOption.ReadLatest)) {
-      // Returns empty set
-      List.empty[HistoryMetricsItem]
-    } else {
-      val parsed = Options.parse(options)
-      if (parsed != null) {
-        aggregate(parsed, inputs)
-      } else {
-        List.empty[HistoryMetricsItem]
-      }
-    }
-  }
-
-  def aggregate(options: Options, inputs: Iterator[HistoryMetricsItem])
-    : List[HistoryMetricsItem] = {
-
-    val result = new ListBuffer[HistoryMetricsItem]
-    val effectiveLimit = Math.min(options.limit, maxLimit)
-    var count = 0
-
-    val taskIdentity = new TaskIdentity(0, 0)
-
-    while (inputs.hasNext && count < effectiveLimit) {
-      val item = inputs.next()
-      if (parseName(item.value.name, taskIdentity)) {
-        if (taskIdentity.processor >= options.startProcessor &&
-          taskIdentity.processor < options.endProcessor &&
-          taskIdentity.task >= options.startTask &&
-          taskIdentity.task < options.endTask) {
-          result.prepend(item)
-          count += 1
-        }
-      }
-    }
-    result.toList
-  }
-
-  // Assume the name format is: "app0.processor0.task0:sendThroughput", returns
-  // (processorId, taskId)
-  //
-  // returns true if success
-  private def parseName(name: String, result: TaskIdentity): Boolean = {
-    val processorStart = name.indexOf(PROCESSOR_TAG)
-    if (processorStart != -1) {
-      val taskStart = name.indexOf(TASK_TAG, processorStart + 1)
-      if (taskStart != -1) {
-        val processorId = name.substring(processorStart, 
taskStart).substring(PROCESSOR_TAG.length)
-          .toInt
-        result.processor = processorId
-        val taskEnd = name.indexOf(":", taskStart + 1)
-        if (taskEnd != -1) {
-          val taskId = name.substring(taskStart, 
taskEnd).substring(TASK_TAG.length).toInt
-          result.task = taskId
-          true
-        } else {
-          false
-        }
-      } else {
-        false
-      }
-    } else {
-      false
-    }
-  }
-}
-
-object TaskFilterAggregator {
-  val StartTask = "startTask"
-  val EndTask = "endTask"
-  val StartProcessor = "startProcessor"
-  val EndProcessor = "endProcessor"
-  val Limit = "limit"
-
-  val TASK_TAG = ".task"
-  val PROCESSOR_TAG = ".processor"
-
-  private class TaskIdentity(var processor: Int, var task: Int)
-
-  case class Options(
-      limit: Int, startTask: Int, endTask: Int, startProcessor: Int, 
endProcessor: Int)
-
-  private val LOG = LogUtil.getLogger(getClass)
-
-  object Options {
-
-    def acceptAll: Options = {
-      new Options(Int.MaxValue, 0, Int.MaxValue, 0, Int.MaxValue)
-    }
-
-    def parse(options: Map[String, String]): Options = {
-      // Do sanity check
-      val optionTry = Try {
-        val startTask = options.get(StartTask).map(_.toInt).getOrElse(0)
-        val endTask = 
options.get(EndTask).map(_.toInt).getOrElse(Integer.MAX_VALUE)
-        val startProcessor = 
options.get(StartProcessor).map(_.toInt).getOrElse(0)
-        val endProcessor = 
options.get(EndProcessor).map(_.toInt).getOrElse(Integer.MAX_VALUE)
-        val limit = options.get(Limit).map(_.toInt).getOrElse(DEFAULT_LIMIT)
-        new Options(limit, startTask, endTask, startProcessor, endProcessor)
-      }
-
-      optionTry match {
-        case Success(options) => options
-        case Failure(ex) =>
-          LOG.error("Failed to parse the options in TaskFilterAggregator. 
Error msg: " +
-            ex.getMessage)
-          null
-      }
-    }
-  }
-
-  val DEFAULT_LIMIT = 1000
-  val MAX_LIMIT = 1000
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/package.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/package.scala 
b/streaming/src/main/scala/io/gearpump/streaming/package.scala
deleted file mode 100644
index 95d51f0..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/package.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump
-
-package object streaming {
-  type ProcessorId = Int
-  type TaskIndex = Int
-  type ExecutorId = Int
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/sink/DataSink.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/sink/DataSink.scala 
b/streaming/src/main/scala/io/gearpump/streaming/sink/DataSink.scala
deleted file mode 100644
index b036619..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/sink/DataSink.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.sink
-
-import io.gearpump.Message
-import io.gearpump.streaming.task.TaskContext
-
-/**
- * Interface to implement custom data sink where result of a DAG is typically 
written
- * a DataSink could be a data store like HBase or simply a console
- *
- * An example would be like:
- * {{{
- *  class ConsoleSink extends DataSink[String] {
- *
- *    def open(context: TaskContext): Unit = {}
- *
- *    def write(s: String): Unit = {
- *      Console.println(s)
- *    }
- *
- *    def close(): Unit = {}
- *  }
- * }}}
- *
- * Subclass is required to be serializable
- */
-trait DataSink extends java.io.Serializable {
-
-  /**
-   * Opens connection to data sink
-   * invoked at onStart() method of [[io.gearpump.streaming.task.Task]]
-   * @param context is the task context at runtime
-   */
-  def open(context: TaskContext): Unit
-
-  /**
-   * Writes message into data sink
-   * invoked at onNext() method of [[io.gearpump.streaming.task.Task]]
-   * @param message wraps data to be written out
-   */
-  def write(message: Message): Unit
-
-  /**
-   * Closes connection to data sink
-   * invoked at onClose() method of [[io.gearpump.streaming.task.Task]]
-   */
-  def close(): Unit
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/sink/DataSinkProcessor.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/sink/DataSinkProcessor.scala 
b/streaming/src/main/scala/io/gearpump/streaming/sink/DataSinkProcessor.scala
deleted file mode 100644
index d753cc2..0000000
--- 
a/streaming/src/main/scala/io/gearpump/streaming/sink/DataSinkProcessor.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.sink
-
-import akka.actor.ActorSystem
-
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.Processor
-
-/**
- * Utility that helps user to create a DAG ending in [[DataSink]]
- * user should pass in a [[DataSink]].
- *
- * here is an example to build a DAG that does word count and write to 
KafkaSink
- * {{{
- *    val split = Processor[Split](1)
- *    val sum = Processor[Sum](1)
- *    val sink = new KafkaSink()
- *    val sinkProcessor = DataSinkProcessor(sink, 1)
- *    val dag = split ~> sum ~> sink
- * }}}
- */
-object DataSinkProcessor {
-  def apply(
-      dataSink: DataSink,
-      parallelism: Int,
-      description: String = "",
-      taskConf: UserConfig = UserConfig.empty)(implicit system: ActorSystem)
-    : Processor[DataSinkTask] = {
-    Processor[DataSinkTask](parallelism, description = description,
-      taskConf.withValue[DataSink](DataSinkTask.DATA_SINK, dataSink))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/sink/DataSinkTask.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/sink/DataSinkTask.scala 
b/streaming/src/main/scala/io/gearpump/streaming/sink/DataSinkTask.scala
deleted file mode 100644
index 7436617..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/sink/DataSinkTask.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.sink
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
-
-object DataSinkTask {
-  val DATA_SINK = "data_sink"
-}
-
-/**
- * General task that runs any [[DataSink]]
- */
-class DataSinkTask(context: TaskContext, conf: UserConfig) extends 
Task(context, conf) {
-  import io.gearpump.streaming.sink.DataSinkTask._
-
-  private val sink = conf.getValue[DataSink](DATA_SINK).get
-
-  override def onStart(startTime: StartTime): Unit = {
-    LOG.info("opening data sink...")
-    sink.open(context)
-  }
-
-  override def onNext(message: Message): Unit = {
-    sink.write(message)
-  }
-
-  override def onStop(): Unit = {
-    LOG.info("closing data sink...")
-    sink.close()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/source/DataSource.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/source/DataSource.scala 
b/streaming/src/main/scala/io/gearpump/streaming/source/DataSource.scala
deleted file mode 100644
index e145079..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/source/DataSource.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.source
-
-import io.gearpump.streaming.task.TaskContext
-import io.gearpump.Message
-
-import scala.util.Random
-
-/**
- * Interface to implement custom source where data is read into the system.
- * a DataSource could be a message queue like kafka or simply data generation 
source.
- *
- * An example would be like
- * {{{
- *  GenMsgSource extends DataSource {
- *
- *    def open(context: TaskContext, startTime: TimeStamp): Unit = {}
- *
- *    def read(context: TaskContext): Message = {
- *      Message("message")
- *    }
- *
- *    def close(): Unit = {}
- *  }
- * }}}
- *
- * subclass is required to be serializable
- */
-trait DataSource extends java.io.Serializable {
-
-  /**
-   * Opens connection to data source
-   * invoked in onStart() method of 
[[io.gearpump.streaming.source.DataSourceTask]]
-   *
-   * @param context is the task context at runtime
-   * @param startTime is the start time of system
-   */
-  def open(context: TaskContext, startTime: Long): Unit
-
-  /**
-   * Reads next message from data source and
-   * returns null if no message is available
-   *
-   * @return a [[io.gearpump.Message]] or null
-   */
-  def read(): Message
-
-  /**
-   * Closes connection to data source.
-   * invoked in onStop() method of 
[[io.gearpump.streaming.source.DataSourceTask]]
-   */
-  def close(): Unit
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceConfig.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceConfig.scala 
b/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceConfig.scala
deleted file mode 100644
index 6ca939f..0000000
--- 
a/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceConfig.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.source
-
-object DataSourceConfig {
-
-  val SOURCE_READ_BATCH_SIZE = "gearpump.source.read.batch.size"
-  val SOURCE_TIMESTAMP_FILTER = "gearpump.source.timestamp.filter.class"
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceProcessor.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceProcessor.scala
 
b/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceProcessor.scala
deleted file mode 100644
index 384b86a..0000000
--- 
a/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceProcessor.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.source
-
-import akka.actor.ActorSystem
-
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.Processor
-
-/**
- * Utility that helps user to create a DAG starting with [[DataSourceTask]]
- * user should pass in a [[DataSource]]
- *
- * Here is an example to build a DAG that reads from Kafka source followed by 
word count
- * {{{
- *    val source = new KafkaSource()
- *    val sourceProcessor =  DataSourceProcessor(source, 1)
- *    val split = Processor[Split](1)
- *    val sum = Processor[Sum](1)
- *    val dag = sourceProcessor ~> split ~> sum
- * }}}
- */
-object DataSourceProcessor {
-  def apply(
-      dataSource: DataSource,
-      parallelism: Int,
-      description: String = "",
-      taskConf: UserConfig = UserConfig.empty)(implicit system: ActorSystem)
-    : Processor[DataSourceTask] = {
-    Processor[DataSourceTask](parallelism, description = description,
-      taskConf.withValue[DataSource](DataSourceTask.DATA_SOURCE, dataSource))
-  }
-}


Reply via email to