http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/Serializer.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/Serializer.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/Serializer.scala
new file mode 100644
index 0000000..22aa897
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/Serializer.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.state.api
+
+import scala.util.Try
+
+trait Serializer[T] extends java.io.Serializable {
+  def serialize(t: T): Array[Byte]
+  def deserialize(bytes: Array[Byte]): Try[T]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/CheckpointManager.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/CheckpointManager.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/CheckpointManager.scala
new file mode 100644
index 0000000..82b7952
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/CheckpointManager.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.state.impl
+
+import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.streaming.transaction.api.CheckpointStore
+
+/** Manage physical checkpoints to persitent storage like HDFS */
+class CheckpointManager(checkpointInterval: Long,
+    checkpointStore: CheckpointStore) {
+
+  private var maxMessageTime: Long = 0L
+  private var checkpointTime: Option[Long] = None
+
+  def recover(timestamp: TimeStamp): Option[Array[Byte]] = {
+    checkpointStore.recover(timestamp)
+  }
+
+  def checkpoint(timestamp: TimeStamp, checkpoint: Array[Byte]): 
Option[TimeStamp] = {
+    checkpointStore.persist(timestamp, checkpoint)
+    checkpointTime = checkpointTime.collect { case time if maxMessageTime > 
time =>
+      time + (1 + (maxMessageTime - time) / checkpointInterval) * 
checkpointInterval
+    }
+
+    checkpointTime
+  }
+
+  def update(messageTime: TimeStamp): Option[TimeStamp] = {
+    maxMessageTime = Math.max(maxMessageTime, messageTime)
+    if (checkpointTime.isEmpty) {
+      checkpointTime = Some((1 + messageTime / checkpointInterval) * 
checkpointInterval)
+    }
+
+    checkpointTime
+  }
+
+  def shouldCheckpoint(upstreamMinClock: TimeStamp): Boolean = {
+    checkpointTime.exists(time => upstreamMinClock >= time)
+  }
+
+  def getCheckpointTime: Option[TimeStamp] = checkpointTime
+
+  def close(): Unit = {
+    checkpointStore.close()
+  }
+
+  private[impl] def getMaxMessageTime: TimeStamp = maxMessageTime
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala
new file mode 100644
index 0000000..753050e
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.state.impl
+
+import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+import org.apache.gearpump.streaming.transaction.api.{CheckpointStore, 
CheckpointStoreFactory}
+
+/**
+ * an in memory store provided for test
+ * should not be used in real cases
+ */
+class InMemoryCheckpointStore extends CheckpointStore {
+  private var checkpoints = Map.empty[TimeStamp, Array[Byte]]
+
+  override def persist(timestamp: TimeStamp, checkpoint: Array[Byte]): Unit = {
+    checkpoints += timestamp -> checkpoint
+  }
+
+  override def recover(timestamp: TimeStamp): Option[Array[Byte]] = {
+    checkpoints.get(timestamp)
+  }
+
+  override def close(): Unit = {
+    checkpoints = Map.empty[TimeStamp, Array[Byte]]
+  }
+}
+
+class InMemoryCheckpointStoreFactory extends CheckpointStoreFactory {
+  override def getCheckpointStore(conf: UserConfig, taskContext: TaskContext): 
CheckpointStore = {
+    new InMemoryCheckpointStore
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/NonWindowState.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/NonWindowState.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/NonWindowState.scala
new file mode 100644
index 0000000..b161713
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/NonWindowState.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.state.impl
+
+import org.slf4j.Logger
+
+import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.streaming.state.api.{Monoid, MonoidState, 
Serializer}
+import org.apache.gearpump.streaming.state.impl.NonWindowState._
+import org.apache.gearpump.util.LogUtil
+
+object NonWindowState {
+  val LOG: Logger = LogUtil.getLogger(classOf[NonWindowState[_]])
+}
+
+/**
+ * a MonoidState storing non-window state
+ */
+class NonWindowState[T](monoid: Monoid[T], serializer: Serializer[T])
+  extends MonoidState[T](monoid) {
+
+  override def recover(timestamp: TimeStamp, bytes: Array[Byte]): Unit = {
+    serializer.deserialize(bytes).foreach(left = _)
+  }
+
+  override def update(timestamp: TimeStamp, t: T): Unit = {
+    updateState(timestamp, t)
+  }
+
+  override def checkpoint(): Array[Byte] = {
+    val serialized = serializer.serialize(left)
+    LOG.debug(s"checkpoint time: $checkpointTime; checkpoint value: 
($checkpointTime, $left)")
+    left = monoid.plus(left, right)
+    right = monoid.zero
+    serialized
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/PersistentStateConfig.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/PersistentStateConfig.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/PersistentStateConfig.scala
new file mode 100644
index 0000000..de2991f
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/PersistentStateConfig.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.state.impl
+
+object PersistentStateConfig {
+
+  val STATE_CHECKPOINT_ENABLE = "state.checkpoint.enable"
+  val STATE_CHECKPOINT_INTERVAL_MS = "state.checkpoint.interval.ms"
+  val STATE_CHECKPOINT_STORE_FACTORY = "state.checkpoint.store.factory"
+  val STATE_WINDOW_SIZE = "state.window.size"
+  val STATE_WINDOW_STEP = "state.window.step"
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/Window.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/Window.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/Window.scala
new file mode 100644
index 0000000..c1f647e
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/Window.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.state.impl
+
+import org.apache.gearpump.TimeStamp
+
+/**
+ * Used in window applications
+ * it keeps the current window and slide ahead when the window expires
+ */
+class Window(val windowSize: Long, val windowStep: Long) {
+
+  def this(windowConfig: WindowConfig) = {
+    this(windowConfig.windowSize, windowConfig.windowStep)
+  }
+
+  private var clock: TimeStamp = 0L
+  private var startTime = 0L
+
+  def update(clock: TimeStamp): Unit = {
+    this.clock = clock
+  }
+
+  def slideOneStep(): Unit = {
+    startTime += windowStep
+  }
+
+  def slideTo(timestamp: TimeStamp): Unit = {
+    startTime = timestamp / windowStep * windowStep
+  }
+
+  def shouldSlide: Boolean = {
+    clock >= (startTime + windowSize)
+  }
+
+  def range: (TimeStamp, TimeStamp) = {
+    startTime -> (startTime + windowSize)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/WindowConfig.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/WindowConfig.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/WindowConfig.scala
new file mode 100644
index 0000000..b053837
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/WindowConfig.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.state.impl
+
+object WindowConfig {
+  val NAME = "window_config"
+}
+
+case class WindowConfig(windowSize: Long, windowStep: Long)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/WindowState.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/WindowState.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/WindowState.scala
new file mode 100644
index 0000000..348f09e
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/WindowState.scala
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.state.impl
+
+import scala.collection.immutable.TreeMap
+
+import org.slf4j.Logger
+
+import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.streaming.state.api.{Group, MonoidState, Serializer}
+import org.apache.gearpump.streaming.state.impl.WindowState._
+import org.apache.gearpump.streaming.task.TaskContext
+import org.apache.gearpump.util.LogUtil
+
+/**
+ * an interval is a dynamic time range that is divided by window boundary and 
checkpoint time
+ */
+case class Interval(startTime: TimeStamp, endTime: TimeStamp) extends 
Ordered[Interval] {
+  override def compare(that: Interval): Int = {
+    if (startTime < that.startTime) -1
+    else if (startTime > that.startTime) 1
+    else 0
+  }
+}
+
+object WindowState {
+  val LOG: Logger = LogUtil.getLogger(classOf[WindowState[_]])
+}
+
+/**
+ * this is a list of states, each of which is bounded by a time window
+ * state of each window doesn't affect each other
+ *
+ * WindowState requires a Algebird Group to be passed in
+ * Group augments Monoid with a minus function which makes it
+ * possible to undo the update by messages that have left the window
+ */
+class WindowState[T](group: Group[T],
+    serializer: Serializer[TreeMap[Interval, T]],
+    taskContext: TaskContext,
+    window: Window) extends MonoidState[T](group) {
+  /**
+   * each interval has a state updated by message with timestamp in
+   * [interval.startTime, interval.endTime)
+   */
+  private var intervalStates = TreeMap.empty[Interval, T]
+
+  private var lastCheckpointTime = 0L
+
+  override def recover(timestamp: TimeStamp, bytes: Array[Byte]): Unit = {
+    window.slideTo(timestamp)
+    serializer.deserialize(bytes)
+      .foreach { states =>
+        intervalStates = states
+        left = states.foldLeft(left) { case (accum, iter) =>
+          group.plus(accum, iter._2)
+        }
+      }
+  }
+
+  override def update(timestamp: TimeStamp, t: T): Unit = {
+    val (startTime, endTime) = window.range
+    if (timestamp >= startTime && timestamp < endTime) {
+      updateState(timestamp, t)
+    }
+
+    updateIntervalStates(timestamp, t, checkpointTime)
+
+    val upstreamMinClock = taskContext.upstreamMinClock
+    window.update(upstreamMinClock)
+
+    if (window.shouldSlide) {
+      window.slideOneStep()
+
+      val (newStartTime, newEndTime) = window.range
+      getIntervalStates(startTime, newStartTime).foreach { case (_, st) =>
+        left = group.minus(left, st)
+      }
+      if (checkpointTime > endTime) {
+        getIntervalStates(endTime, checkpointTime).foreach { case (_, st) =>
+          left = group.plus(left, st)
+        }
+      } else {
+        getIntervalStates(endTime, newEndTime).foreach { case (_, st) =>
+          right = group.plus(right, st)
+        }
+      }
+    }
+  }
+
+  override def checkpoint(): Array[Byte] = {
+    left = group.plus(left, right)
+    right = group.zero
+
+    val states = getIntervalStates(window.range._1, checkpointTime)
+    lastCheckpointTime = checkpointTime
+    LOG.debug(s"checkpoint time: $checkpointTime; checkpoint value: 
($checkpointTime, $states)")
+    serializer.serialize(states)
+  }
+
+  /**
+   * Each message will update state in corresponding Interval[StartTime, 
endTime),
+   *
+   * which is decided by the message's timestamp t where
+   * {{{
+   * startTime = Math.max(lowerBound1, lowerBound2, checkpointTime)
+   * endTime = Math.min(upperBound1, upperBound2, checkpointTime)
+   * lowerBound1 = step * Nmax1 <= t
+   * lowerBound2 = step * Nmax2 + size <= t
+   * upperBound1 = step * Nmin1 > t
+   * upperBound2 = step * Nmin2 + size > t
+   * }}}
+   */
+  private[impl] def getInterval(timestamp: TimeStamp, checkpointTime: 
TimeStamp): Interval = {
+    val windowSize = window.windowSize
+    val windowStep = window.windowStep
+    val lowerBound1 = timestamp / windowStep * windowStep
+    val lowerBound2 =
+      if (timestamp < windowSize) 0L
+      else (timestamp - windowSize) / windowStep * windowStep + windowSize
+    val upperBound1 = (timestamp / windowStep + 1) * windowStep
+    val upperBound2 =
+      if (timestamp < windowSize) windowSize
+      else ((timestamp - windowSize) / windowStep + 1) * windowStep + 
windowSize
+    val lowerBound = Math.max(lowerBound1, lowerBound2)
+    val upperBound = Math.min(upperBound1, upperBound2)
+    if (checkpointTime > timestamp) {
+      Interval(Math.max(lowerBound, lastCheckpointTime), Math.min(upperBound, 
checkpointTime))
+    } else {
+      Interval(Math.max(lowerBound, checkpointTime), upperBound)
+    }
+  }
+
+  private[impl] def updateIntervalStates(timestamp: TimeStamp, t: T, 
checkpointTime: TimeStamp)
+  : Unit = {
+    val interval = getInterval(timestamp, checkpointTime)
+    intervalStates.get(interval) match {
+      case Some(st) =>
+        intervalStates += interval -> group.plus(st, t)
+      case None =>
+        intervalStates += interval -> group.plus(group.zero, t)
+    }
+  }
+
+  private[impl] def getIntervalStates(startTime: TimeStamp, endTime: TimeStamp)
+  : TreeMap[Interval, T] = {
+    intervalStates.dropWhile(_._1.endTime <= startTime).takeWhile(_._1.endTime 
<= endTime)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/storage/AppDataStore.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/storage/AppDataStore.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/storage/AppDataStore.scala
new file mode 100644
index 0000000..1db9f6a
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/storage/AppDataStore.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.storage
+
+import scala.concurrent._
+
+/**
+ * Generic storage to store KV Data.
+ */
+trait AppDataStore {
+  def put(key: String, value: Any): Future[Any]
+
+  def get(key: String): Future[Any]
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/storage/InMemoryAppStoreOnMaster.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/storage/InMemoryAppStoreOnMaster.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/storage/InMemoryAppStoreOnMaster.scala
new file mode 100644
index 0000000..ce98216
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/storage/InMemoryAppStoreOnMaster.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.storage
+
+import scala.concurrent.Future
+
+import akka.actor.ActorRef
+import akka.pattern.ask
+
+import org.apache.gearpump.cluster.AppMasterToMaster.{GetAppData, 
GetAppDataResult, SaveAppData}
+import org.apache.gearpump.util.Constants
+
+/**
+ * In memory application storage located on master nodes
+ */
+class InMemoryAppStoreOnMaster(appId: Int, master: ActorRef) extends 
AppDataStore {
+  implicit val timeout = Constants.FUTURE_TIMEOUT
+  import scala.concurrent.ExecutionContext.Implicits.global
+
+  override def put(key: String, value: Any): Future[Any] = {
+    master.ask(SaveAppData(appId, key, value))
+  }
+
+  override def get(key: String): Future[Any] = {
+    master.ask(GetAppData(appId, 
key)).asInstanceOf[Future[GetAppDataResult]].map { result =>
+      if (result.key.equals(key)) {
+        result.value
+      } else {
+        null
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/task/ExpressTransport.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/ExpressTransport.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/ExpressTransport.scala
new file mode 100644
index 0000000..7629cf9
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/ExpressTransport.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.task
+
+import akka.actor.{ActorRef, ExtendedActorSystem}
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.transport.netty.TaskMessage
+import org.apache.gearpump.transport.{Express, HostPort}
+import org.apache.gearpump.util.AkkaHelper
+/**
+ * ExpressTransport wire the networking function from default akka
+ * networking to customized implementation 
[[org.apache.gearpump.transport.Express]].
+ *
+ * See [[org.apache.gearpump.transport.Express]] for more information.
+ */
+trait ExpressTransport {
+  this: TaskActor =>
+
+  final val express = Express(context.system)
+  implicit val system = context.system.asInstanceOf[ExtendedActorSystem]
+
+  final def local: HostPort = express.localHost
+  lazy val sourceId = TaskId.toLong(taskId)
+
+  lazy val sessionRef: ActorRef = {
+    AkkaHelper.actorFor(system, s"/session#$sessionId")
+  }
+
+  def transport(msg: AnyRef, remotes: TaskId*): Unit = {
+    var serializedMessage: AnyRef = null
+
+    remotes.foreach { remote =>
+      val transportId = TaskId.toLong(remote)
+      val localActor = express.lookupLocalActor(transportId)
+      if (localActor.isDefined) {
+        localActor.get.tell(msg, sessionRef)
+      } else {
+        if (null == serializedMessage) {
+          msg match {
+            case message: Message =>
+              val bytes = serializerPool.get().serialize(message.msg)
+              serializedMessage = SerializedMessage(message.timestamp, bytes)
+            case _ => serializedMessage = msg
+          }
+        }
+        val taskMessage = new TaskMessage(sessionId, transportId, sourceId, 
serializedMessage)
+
+        val remoteAddress = express.lookupRemoteAddress(transportId)
+        if (remoteAddress.isDefined) {
+          express.transport(taskMessage, remoteAddress.get)
+        } else {
+          LOG.error(
+            s"Can not find target task $remote, maybe the application is 
undergoing recovery")
+        }
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializedMessage.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializedMessage.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializedMessage.scala
new file mode 100644
index 0000000..675d5cc
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializedMessage.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.task
+
+import java.io.{DataInput, DataOutput}
+
+import org.apache.gearpump.TimeStamp
+
+case class SerializedMessage(timeStamp: TimeStamp, bytes: Array[Byte])
+
+class SerializedMessageSerializer extends 
TaskMessageSerializer[SerializedMessage] {
+  override def getLength(obj: SerializedMessage): Int = 12 + obj.bytes.length
+
+  override def write(dataOutput: DataOutput, obj: SerializedMessage): Unit = {
+    dataOutput.writeLong(obj.timeStamp)
+    dataOutput.writeInt(obj.bytes.length)
+    dataOutput.write(obj.bytes)
+  }
+
+  override def read(dataInput: DataInput): SerializedMessage = {
+    val timestamp = dataInput.readLong()
+    val length = dataInput.readInt()
+    val bytes = new Array[Byte](length)
+    dataInput.readFully(bytes)
+    SerializedMessage(timestamp, bytes)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializerResolver.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializerResolver.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializerResolver.scala
new file mode 100644
index 0000000..4c7867b
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializerResolver.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.task
+
+import io.gearpump.esotericsoftware.kryo.util.{IntMap, ObjectMap}
+import org.apache.gearpump.streaming.task.SerializerResolver.Registration
+
+private[task] class SerializerResolver {
+  private var classId = 0
+  private val idToRegistration = new IntMap[Registration]()
+  private val classToRegistration = new ObjectMap[Class[_], Registration]()
+
+  def register[T](clazz: Class[T], serializer: TaskMessageSerializer[T]): Unit 
= {
+    val registration = new Registration(classId, clazz, serializer)
+    idToRegistration.put(classId, registration)
+    classToRegistration.put(clazz, registration)
+    classId += 1
+  }
+
+  def getRegistration(clazz: Class[_]): Registration = {
+    classToRegistration.get(clazz)
+  }
+
+  def getRegistration(clazzId: Int): Registration = {
+    idToRegistration.get(clazzId)
+  }
+}
+
+object SerializerResolver {
+  class Registration(val id: Int, val clazz: Class[_], val serializer: 
TaskMessageSerializer[_])
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/task/StartTime.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/StartTime.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/StartTime.scala
new file mode 100644
index 0000000..fb097d3
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/StartTime.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.task
+
+import org.apache.gearpump.TimeStamp
+
+/** Start time of streaming application. All message older than start time 
will be dropped */
+case class StartTime(startTime: TimeStamp = 0)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/task/StreamingTransportSerializer.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/StreamingTransportSerializer.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/StreamingTransportSerializer.scala
new file mode 100644
index 0000000..1bdc175
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/StreamingTransportSerializer.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.task
+
+import java.io.{DataInput, DataOutput}
+
+import org.slf4j.Logger
+
+import org.apache.gearpump.streaming.{AckRequestSerializer, AckSerializer, 
InitialAckRequestSerializer, LatencyProbeSerializer}
+import org.apache.gearpump.transport.netty.ITransportMessageSerializer
+import org.apache.gearpump.util.LogUtil
+
+class StreamingTransportSerializer extends ITransportMessageSerializer {
+  private val log: Logger = LogUtil.getLogger(getClass)
+  private val serializers = new SerializerResolver
+
+  serializers.register(classOf[Ack], new AckSerializer)
+  serializers.register(classOf[AckRequest], new AckRequestSerializer)
+  serializers.register(classOf[InitialAckRequest], new 
InitialAckRequestSerializer)
+  serializers.register(classOf[LatencyProbe], new LatencyProbeSerializer)
+  serializers.register(classOf[SerializedMessage], new 
SerializedMessageSerializer)
+
+  override def serialize(dataOutput: DataOutput, obj: Object): Unit = {
+    val registration = serializers.getRegistration(obj.getClass)
+    if (registration != null) {
+      dataOutput.writeInt(registration.id)
+      
registration.serializer.asInstanceOf[TaskMessageSerializer[AnyRef]].write(dataOutput,
 obj)
+    } else {
+      log.error(s"Can not find serializer for class type ${obj.getClass}")
+    }
+  }
+
+  override def deserialize(dataInput: DataInput, length: Int): Object = {
+    val classID = dataInput.readInt()
+    val registration = serializers.getRegistration(classID)
+    if (registration != null) {
+      
registration.serializer.asInstanceOf[TaskMessageSerializer[AnyRef]].read(dataInput)
+    } else {
+      log.error(s"Can not find serializer for class id $classID")
+      null
+    }
+  }
+
+  override def getLength(obj: Object): Int = {
+    val registration = serializers.getRegistration(obj.getClass)
+    if (registration != null) {
+      
registration.serializer.asInstanceOf[TaskMessageSerializer[AnyRef]].getLength(obj)
 + 4
+    } else {
+      log.error(s"Can not find serializer for class type ${obj.getClass}")
+      0
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscriber.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscriber.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscriber.scala
new file mode 100644
index 0000000..692d7f9
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscriber.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.task
+
+import org.apache.gearpump.partitioner.PartitionerDescription
+import org.apache.gearpump.streaming.{DAG, LifeTime}
+
+/**
+ * Each processor can have multiple downstream subscribers.
+ *
+ * For example: When processor A subscribe to processor B, then the output of 
B will be
+ * pushed to processor A.
+ *
+ * @param processorId subscriber processor Id
+ * @param partitionerDescription subscriber partitioner
+ */
+case class Subscriber(processorId: Int, partitionerDescription: 
PartitionerDescription,
+    parallelism: Int, lifeTime: LifeTime)
+
+object Subscriber {
+
+  /**
+   *
+   * List subscriptions of a processor.
+   * The topology information is retrieved from dag
+   *
+   * @param processorId   the processor to list
+   * @param dag     the DAG
+   * @return   the subscribers of this processor
+   */
+  def of(processorId: Int, dag: DAG): List[Subscriber] = {
+    val edges = dag.graph.outgoingEdgesOf(processorId)
+
+    edges.foldLeft(List.empty[Subscriber]) { (list, nodeEdgeNode) =>
+      val (_, partitioner, downstreamProcessorId) = nodeEdgeNode
+      val downstreamProcessor = dag.processors(downstreamProcessorId)
+      list :+ Subscriber(downstreamProcessorId, partitioner,
+        downstreamProcessor.parallelism, downstreamProcessor.life)
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
new file mode 100644
index 0000000..9accae2
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.task
+
+import org.slf4j.Logger
+
+import io.gearpump.google.common.primitives.Shorts
+import org.apache.gearpump.partitioner.{MulticastPartitioner, Partitioner, 
UnicastPartitioner}
+import org.apache.gearpump.streaming.AppMasterToExecutor.MsgLostException
+import org.apache.gearpump.streaming.LifeTime
+import org.apache.gearpump.streaming.task.Subscription._
+import org.apache.gearpump.util.LogUtil
+import org.apache.gearpump.{Message, TimeStamp}
+
+/**
+ * Manges the output and message clock for single downstream processor
+ *
+ * @param subscriber downstream processor
+ * @param maxPendingMessageCount trigger flow control. Should be bigger than
+ *                               maxPendingMessageCountPerAckRequest
+ * @param ackOnceEveryMessageCount send on AckRequest to the target
+ */
+class Subscription(
+    appId: Int,
+    executorId: Int,
+    taskId: TaskId,
+    subscriber: Subscriber, sessionId: Int,
+    transport: ExpressTransport,
+    maxPendingMessageCount: Int = MAX_PENDING_MESSAGE_COUNT,
+    ackOnceEveryMessageCount: Int = ONE_ACKREQUEST_EVERY_MESSAGE_COUNT) {
+
+  assert(maxPendingMessageCount >= ackOnceEveryMessageCount)
+  assert(maxPendingMessageCount < Short.MaxValue / 2)
+
+  private val LOG: Logger = LogUtil.getLogger(getClass, app = appId,
+    executor = executorId, task = taskId)
+
+  import subscriber.{parallelism, partitionerDescription, processorId}
+
+  // Don't worry if this store negative number. We will wrap the Short
+  private val messageCount: Array[Short] = new Array[Short](parallelism)
+  private val pendingMessageCount: Array[Short] = new Array[Short](parallelism)
+  private val candidateMinClockSince: Array[Short] = new 
Array[Short](parallelism)
+
+  private val minClockValue: Array[TimeStamp] = 
Array.fill(parallelism)(Long.MaxValue)
+  private val candidateMinClock: Array[TimeStamp] = 
Array.fill(parallelism)(Long.MaxValue)
+
+  private var maxPendingCount: Short = 0
+
+  private var life = subscriber.lifeTime
+
+  private val partitioner = 
partitionerDescription.partitionerFactory.partitioner
+  private val sendFn = partitioner match {
+    case up: UnicastPartitioner =>
+      (msg: Message) => {
+        val partition = up.getPartition(msg, parallelism, taskId.index)
+        sendMessage(msg, partition)
+      }
+    case mp: MulticastPartitioner =>
+      (msg: Message) => {
+        val partitions = mp.getPartitions(msg, parallelism, taskId.index)
+        partitions.map(partition => sendMessage(msg, partition)).sum
+      }
+  }
+
+  def changeLife(life: LifeTime): Unit = {
+    this.life = life
+  }
+
+  def start(): Unit = {
+    val ackRequest = InitialAckRequest(taskId, sessionId)
+    transport.transport(ackRequest, allTasks: _*)
+  }
+
+  def sendMessage(msg: Message): Int = {
+    sendFn(msg)
+  }
+
+  /**
+   * Returns how many message is actually sent by this subscription
+   *
+   * @param msg  the message to send
+   * @param partition  the target partition to send message to
+   * @return 1 if success
+   */
+  def sendMessage(msg: Message, partition: Int): Int = {
+
+    var count = 0
+    // Only sends message whose timestamp matches the lifeTime
+    if (partition != Partitioner.UNKNOWN_PARTITION_ID && 
life.contains(msg.timestamp)) {
+
+      val targetTask = TaskId(processorId, partition)
+      transport.transport(msg, targetTask)
+
+      this.minClockValue(partition) = Math.min(this.minClockValue(partition), 
msg.timestamp)
+      this.candidateMinClock(partition) = 
Math.min(this.candidateMinClock(partition), msg.timestamp)
+
+      incrementMessageCount(partition, 1)
+
+      if (messageCount(partition) % ackOnceEveryMessageCount == 0) {
+        sendAckRequest(partition)
+      }
+
+      if (messageCount(partition) / maxPendingMessageCount !=
+        (messageCount(partition) + ackOnceEveryMessageCount) / 
maxPendingMessageCount) {
+        sendLatencyProbe(partition)
+      }
+      count = 1
+      count
+    } else {
+      if (needFlush) {
+        flush()
+      }
+      count = 0
+      count
+    }
+  }
+
+  private var lastFlushTime: Long = 0L
+  private val FLUSH_INTERVAL = 5 * 1000 // ms
+  private def needFlush: Boolean = {
+    System.currentTimeMillis() - lastFlushTime > FLUSH_INTERVAL &&
+      Shorts.max(pendingMessageCount: _*) > 0
+  }
+
+  private def flush(): Unit = {
+    lastFlushTime = System.currentTimeMillis()
+    allTasks.foreach { targetTaskId =>
+      sendAckRequest(targetTaskId.index)
+    }
+  }
+
+  private def allTasks: scala.collection.Seq[TaskId] = {
+    (0 until parallelism).map { taskIndex =>
+      TaskId(processorId, taskIndex)
+    }
+  }
+
+  /**
+   * Handles acknowledge message. Throw MessageLossException if required.
+   *
+   * @param ack acknowledge message received
+   */
+  def receiveAck(ack: Ack): Unit = {
+
+    val index = ack.taskId.index
+
+    if (ack.sessionId == sessionId) {
+      if (ack.actualReceivedNum == ack.seq) {
+        if ((ack.seq - candidateMinClockSince(index)).toShort >= 0) {
+          if (ack.seq == messageCount(index)) {
+            // All messages have been acked.
+            minClockValue(index) = Long.MaxValue
+          } else {
+            minClockValue(index) = candidateMinClock(index)
+          }
+          candidateMinClock(index) = Long.MaxValue
+          candidateMinClockSince(index) = messageCount(index)
+        }
+
+        pendingMessageCount(ack.taskId.index) = 
(messageCount(ack.taskId.index) - ack.seq).toShort
+        updateMaxPendingCount()
+      } else {
+        LOG.error(s"Failed! received ack: $ack, received: 
${ack.actualReceivedNum}, " +
+          s"sent: ${ack.seq}, try to replay...")
+        throw new MsgLostException
+      }
+    }
+  }
+
+  def minClock: TimeStamp = {
+    minClockValue.min
+  }
+
+  def allowSendingMoreMessages(): Boolean = {
+    maxPendingCount < maxPendingMessageCount
+  }
+
+  def sendAckRequestOnStallingTime(stallingTime: TimeStamp): Unit = {
+    minClockValue.indices.foreach { i =>
+      if (minClockValue(i) == stallingTime && pendingMessageCount(i) > 0
+        && allowSendingMoreMessages) {
+        sendAckRequest(i)
+        sendLatencyProbe(i)
+      }
+    }
+  }
+
+  private def sendAckRequest(partition: Int): Unit = {
+    // Increments more count for each AckRequest
+    // to throttle the number of unacked AckRequest
+    incrementMessageCount(partition, ackOnceEveryMessageCount)
+    val targetTask = TaskId(processorId, partition)
+    val ackRequest = AckRequest(taskId, messageCount(partition), sessionId)
+    transport.transport(ackRequest, targetTask)
+  }
+
+  private def incrementMessageCount(partition: Int, count: Int): Unit = {
+    messageCount(partition) = (messageCount(partition) + count).toShort
+    pendingMessageCount(partition) = (pendingMessageCount(partition) + 
count).toShort
+    updateMaxPendingCount()
+  }
+
+  private def updateMaxPendingCount(): Unit = {
+    maxPendingCount = Shorts.max(pendingMessageCount: _*)
+  }
+
+  private def sendLatencyProbe(partition: Int): Unit = {
+    val probeLatency = LatencyProbe(System.currentTimeMillis())
+    val targetTask = TaskId(processorId, partition)
+    transport.transport(probeLatency, targetTask)
+  }
+}
+
+object Subscription {
+  // Makes sure it is smaller than MAX_PENDING_MESSAGE_COUNT
+  final val ONE_ACKREQUEST_EVERY_MESSAGE_COUNT = 100
+  final val MAX_PENDING_MESSAGE_COUNT = 1000
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala
new file mode 100644
index 0000000..9c76a40
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.task
+
+import scala.concurrent.duration.FiniteDuration
+
+import akka.actor.Actor.Receive
+import akka.actor.{ActorRef, ActorSystem, Cancellable, Props}
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.util.LogUtil
+import org.apache.gearpump.{Message, TimeStamp}
+
+/**
+ * This provides context information for a task.
+ */
+trait TaskContext {
+
+  def taskId: TaskId
+
+  def executorId: Int
+
+  def appId: Int
+
+  def appName: String
+
+  /**
+   * The actorRef of AppMaster
+   * @return application master's actor reference
+   */
+  def appMaster: ActorRef
+
+  /**
+   * The task parallelism
+   *
+   * For example, we can create 3 source tasks, and 3 sink tasks,
+   * the task parallelism is 3 for each.
+   *
+   * This can be useful when reading from partitioned data source.
+   * For example, for kafka, there may be 10 partitions, if we have
+   * parallelism of 2 for this task, then each task will be responsible
+   * to read data from 5 partitions.
+   *
+   * @return  the parallelism level
+   */
+  def parallelism: Int
+
+  /**
+   * Please don't use this if possible.
+   * @return  self actor ref
+   */
+  // TODO: We should remove the self from TaskContext
+  def self: ActorRef
+
+  /**
+   * Please don't use this if possible
+   * @return the actor system
+   */
+  // TODO: we should remove this in future
+  def system: ActorSystem
+
+  /**
+   * This can be used to output messages to downstream tasks. The data 
shuffling rule
+   * can be decided by Partitioner.
+   *
+   * @param msg message to output
+   */
+  def output(msg: Message): Unit
+
+  def actorOf(props: Props): ActorRef
+
+  def actorOf(props: Props, name: String): ActorRef
+
+  def schedule(initialDelay: FiniteDuration, interval: FiniteDuration)(f: => 
Unit): Cancellable
+
+  /**
+   * akka.actor.ActorRefProvider.scheduleOnce
+   *
+   * @param initialDelay  the initial delay
+   * @param f  the function to execute after initial delay
+   * @return the executable
+   */
+  def scheduleOnce(initialDelay: FiniteDuration)(f: => Unit): Cancellable
+
+  /**
+   * For managed message(type of Message), the sender only serve as a unique 
Id,
+   * It's address is not something meaningful, you should not use this directly
+   *
+   * For unmanaged message, the sender represent the sender ActorRef
+   * @return sender
+   */
+  def sender: ActorRef
+
+  /**
+   * Retrieves upstream min clock from TaskActor
+   *
+   * @return the min clock
+   */
+  def upstreamMinClock: TimeStamp
+
+  /**
+   * Logger is environment dependant, it should be provided by
+   * containing environment.
+   */
+  def logger: Logger
+}
+
+/**
+ * Streaming Task interface
+ */
+trait TaskInterface {
+
+  /**
+   * Method called with the task is initialized.
+   * @param startTime startTime that can be used to decide from when a source 
producer task should
+   *                  replay the data source, or from when a processor task 
should recover its
+   *                  checkpoint data in to in-memory state.
+   */
+  def onStart(startTime: StartTime): Unit
+
+  /**
+   * Method called for each message received.
+   *
+   * @param msg Message send by upstream tasks
+   */
+  def onNext(msg: Message): Unit
+
+  /**
+   * Method called when task is under clean up.
+   *
+   * This can be used to cleanup resource when the application finished.
+   */
+  def onStop(): Unit
+
+  /**
+   * Handlers unmanaged messages
+   *
+   * @return the handler
+   */
+  def receiveUnManagedMessage: Receive = null
+}
+
+abstract class Task(taskContext: TaskContext, userConf: UserConfig) extends 
TaskInterface {
+
+  import taskContext.{appId, executorId, taskId}
+
+  val LOG: Logger = LogUtil.getLogger(getClass, app = appId, executor = 
executorId, task = taskId)
+
+  protected implicit val system = taskContext.system
+
+  implicit val self = taskContext.self
+
+  /**
+   * For managed message(type of Message), the sender mean nothing,
+   * you should not use this directory
+   *
+   * For unmanaged message, the sender represent the sender actor
+   * @return the sender
+   */
+  protected def sender: ActorRef = taskContext.sender
+
+  def onStart(startTime: StartTime): Unit = {}
+
+  def onNext(msg: Message): Unit = {}
+
+  def onStop(): Unit = {}
+
+  override def receiveUnManagedMessage: Receive = {
+    case msg =>
+      LOG.error("Failed! Received unknown message " + "taskId: " + taskId + ", 
" + msg.toString)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
new file mode 100644
index 0000000..14742d0
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.task
+
+import java.util
+import java.util.concurrent.TimeUnit
+
+import akka.actor._
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.UserConfig
+import io.gearpump.gs.collections.impl.map.mutable.primitive.IntShortHashMap
+import org.apache.gearpump.metrics.Metrics
+import org.apache.gearpump.serializer.SerializationFramework
+import org.apache.gearpump.streaming.AppMasterToExecutor._
+import org.apache.gearpump.streaming.ExecutorToAppMaster._
+import org.apache.gearpump.streaming.{Constants, ProcessorId}
+import org.apache.gearpump.util.{LogUtil, TimeOutScheduler}
+import org.apache.gearpump.{Message, TimeStamp}
+
+/**
+ *
+ * All tasks of Gearpump runs inside a Actor. TaskActor is the Actor container 
for a task.
+ */
+class TaskActor(
+    val taskId: TaskId,
+    val taskContextData: TaskContextData,
+    userConf: UserConfig,
+    val task: TaskWrapper,
+    inputSerializerPool: SerializationFramework)
+    extends Actor with ExpressTransport with TimeOutScheduler {
+  var upstreamMinClock: TimeStamp = 0L
+  private var _minClock: TimeStamp = 0L
+
+  def serializerPool: SerializationFramework = inputSerializerPool
+
+  import taskContextData._
+
+  import org.apache.gearpump.streaming.Constants._
+  import org.apache.gearpump.streaming.task.TaskActor._
+  val config = context.system.settings.config
+
+  val LOG: Logger = LogUtil.getLogger(getClass, app = appId, executor = 
executorId, task = taskId)
+
+  // Metrics
+  private val metricName = 
s"app${appId}.processor${taskId.processorId}.task${taskId.index}"
+  private val receiveLatency = Metrics(context.system).histogram(
+    s"$metricName:receiveLatency", sampleRate = 1)
+  private val processTime = 
Metrics(context.system).histogram(s"$metricName:processTime")
+  private val sendThroughput = 
Metrics(context.system).meter(s"$metricName:sendThroughput")
+  private val receiveThroughput = 
Metrics(context.system).meter(s"$metricName:receiveThroughput")
+
+  private val maxPendingMessageCount = 
config.getInt(GEARPUMP_STREAMING_MAX_PENDING_MESSAGE_COUNT)
+  private val ackOnceEveryMessageCount = config.getInt(
+    GEARPUMP_STREAMING_ACK_ONCE_EVERY_MESSAGE_COUNT)
+
+  private val executor = context.parent
+  private var life = taskContextData.life
+
+  // Latency probe
+  import scala.concurrent.duration._
+
+  import context.dispatcher
+  final val LATENCY_PROBE_INTERVAL = FiniteDuration(1, TimeUnit.SECONDS)
+
+  // Clock report interval
+  final val CLOCK_REPORT_INTERVAL = FiniteDuration(1, TimeUnit.SECONDS)
+
+  // Flush interval
+  final val FLUSH_INTERVAL = FiniteDuration(100, TimeUnit.MILLISECONDS)
+
+  private val queue = new util.LinkedList[AnyRef]()
+
+  private var subscriptions = List.empty[(Int, Subscription)]
+
+  // SecurityChecker will be responsible of dropping messages from
+  // unknown sources
+  private val securityChecker = new SecurityChecker(taskId, self)
+  private[task] var sessionId = NONE_SESSION
+
+  // Reports to appMaster with my address
+  express.registerLocalActor(TaskId.toLong(taskId), self)
+
+  final def receive: Receive = null
+
+  task.setTaskActor(this)
+
+  def onStart(startTime: StartTime): Unit = {
+    task.onStart(startTime)
+  }
+
+  def onNext(msg: Message): Unit = task.onNext(msg)
+
+  def onUnManagedMessage(msg: Any): Unit = 
task.receiveUnManagedMessage.apply(msg)
+
+  def onStop(): Unit = task.onStop()
+
+  /**
+   * output to a downstream by specifying a arrayIndex
+   * @param arrayIndex this is not same as ProcessorId
+   */
+  def output(arrayIndex: Int, msg: Message): Unit = {
+    var count = 0
+    count += this.subscriptions(arrayIndex)._2.sendMessage(msg)
+    sendThroughput.mark(count)
+  }
+
+  def output(msg: Message): Unit = {
+    var count = 0
+    this.subscriptions.foreach { subscription =>
+      count += subscription._2.sendMessage(msg)
+    }
+    sendThroughput.mark(count)
+  }
+
+  final override def postStop(): Unit = {
+    onStop()
+  }
+
+  final override def preStart(): Unit = {
+    val register = RegisterTask(taskId, executorId, local)
+    LOG.info(s"$register")
+    executor ! register
+    context.become(waitForTaskRegistered)
+  }
+
+  private def allowSendingMoreMessages(): Boolean = {
+    subscriptions.forall(_._2.allowSendingMoreMessages())
+  }
+
+  private def doHandleMessage(): Unit = {
+    var done = false
+
+    var count = 0
+    val start = System.currentTimeMillis()
+
+    while (allowSendingMoreMessages() && !done) {
+      val msg = queue.poll()
+      if (msg != null) {
+        msg match {
+          case SendAck(ack, targetTask) =>
+            transport(ack, targetTask)
+          case m: Message =>
+            count += 1
+            onNext(m)
+          case other =>
+            // un-managed message
+            onUnManagedMessage(other)
+        }
+      } else {
+        done = true
+      }
+    }
+
+    receiveThroughput.mark(count)
+    if (count > 0) {
+      processTime.update((System.currentTimeMillis() - start) / count)
+    }
+  }
+
+  private def onStartClock(): Unit = {
+    LOG.info(s"received start, clock: $upstreamMinClock, sessionId: 
$sessionId")
+    subscriptions = subscribers.map { subscriber =>
+      (subscriber.processorId,
+        new Subscription(appId, executorId, taskId, subscriber, sessionId, 
this,
+          maxPendingMessageCount, ackOnceEveryMessageCount))
+    }.sortBy(_._1)
+
+    subscriptions.foreach(_._2.start())
+
+    import scala.collection.JavaConverters._
+    stashQueue.asScala.foreach { item =>
+      handleMessages(item.sender).apply(item.msg)
+    }
+    stashQueue.clear()
+
+    // Put this as the last step so that the subscription is already 
initialized.
+    // Message sending in current Task before onStart will not be delivered to
+    // target
+    onStart(new StartTime(upstreamMinClock))
+
+    appMaster ! GetUpstreamMinClock(taskId)
+    context.become(handleMessages(sender))
+  }
+
+  def waitForTaskRegistered: Receive = {
+    case start@TaskRegistered(_, sessionId, startClock) =>
+      this.sessionId = sessionId
+      this.upstreamMinClock = startClock
+      context.become(waitForStartClock)
+  }
+
+  private val stashQueue = new util.LinkedList[MessageAndSender]()
+
+  def waitForStartClock: Receive = {
+    case start: StartTask =>
+      onStartClock()
+    case other: AnyRef =>
+      stashQueue.add(MessageAndSender(other, sender()))
+  }
+
+  def handleMessages(sender: => ActorRef): Receive = {
+    case ackRequest: InitialAckRequest =>
+      val ackResponse = securityChecker.handleInitialAckRequest(ackRequest)
+      if (null != ackResponse) {
+        queue.add(SendAck(ackResponse, ackRequest.taskId))
+        doHandleMessage()
+      }
+    case ackRequest: AckRequest =>
+      // Enqueue to handle the ackRequest and send back ack later
+      val ackResponse = securityChecker.generateAckResponse(ackRequest, sender,
+        ackOnceEveryMessageCount)
+      if (null != ackResponse) {
+        queue.add(SendAck(ackResponse, ackRequest.taskId))
+        doHandleMessage()
+      }
+    case ack: Ack =>
+      subscriptions.find(_._1 == 
ack.taskId.processorId).foreach(_._2.receiveAck(ack))
+      doHandleMessage()
+    case inputMessage: SerializedMessage =>
+      val message = 
Message(serializerPool.get().deserialize(inputMessage.bytes),
+        inputMessage.timeStamp)
+      receiveMessage(message, sender)
+    case inputMessage: Message =>
+      receiveMessage(inputMessage, sender)
+    case upstream@UpstreamMinClock(upstreamClock) =>
+      this.upstreamMinClock = upstreamClock
+
+      val subMinClock = subscriptions.foldLeft(Long.MaxValue) { (min, sub) =>
+        val subMin = sub._2.minClock
+        // A subscription is holding back the _minClock;
+        // we send AckRequest to its tasks to push _minClock forward
+        if (subMin == _minClock) {
+          sub._2.sendAckRequestOnStallingTime(_minClock)
+        }
+        Math.min(min, subMin)
+      }
+
+      _minClock = Math.max(life.birth, Math.min(upstreamMinClock, subMinClock))
+
+      val update = UpdateClock(taskId, _minClock)
+      context.system.scheduler.scheduleOnce(CLOCK_REPORT_INTERVAL) {
+        appMaster ! update
+      }
+
+      // Checks whether current task is dead.
+      if (_minClock > life.death) {
+        // There will be no more message received...
+        val unRegister = UnRegisterTask(taskId, executorId)
+        executor ! unRegister
+
+        LOG.info(s"Sending $unRegister, current minclock: ${_minClock}, life: 
$life")
+      }
+
+    case ChangeTask(_, dagVersion, life, subscribers) =>
+      this.life = life
+      subscribers.foreach { subscriber =>
+        val processorId = subscriber.processorId
+        val subscription = getSubscription(processorId)
+        subscription match {
+          case Some(subscription) =>
+            subscription.changeLife(subscriber.lifeTime cross this.life)
+          case None =>
+            val subscription = new Subscription(appId, executorId, taskId, 
subscriber,
+              sessionId, this, maxPendingMessageCount, 
ackOnceEveryMessageCount)
+            subscription.start()
+            subscriptions :+=(subscriber.processorId, subscription)
+            // Sorting, keep the order
+            subscriptions = subscriptions.sortBy(_._1)
+        }
+      }
+      sender ! TaskChanged(taskId, dagVersion)
+    case LatencyProbe(timeStamp) =>
+      receiveLatency.update(System.currentTimeMillis() - timeStamp)
+    case send: SendMessageLoss =>
+      LOG.info("received SendMessageLoss")
+      throw new MsgLostException
+    case other: AnyRef =>
+      queue.add(other)
+      doHandleMessage()
+  }
+
+  /**
+   * Returns min clock of this task
+   */
+  def minClock: TimeStamp = _minClock
+
+  /**
+   * Returns min clock of upstream task
+   */
+  def getUpstreamMinClock: TimeStamp = upstreamMinClock
+
+  private def receiveMessage(msg: Message, sender: ActorRef): Unit = {
+    val messageAfterCheck = securityChecker.checkMessage(msg, sender)
+    messageAfterCheck match {
+      case Some(msg) =>
+        queue.add(msg)
+        doHandleMessage()
+      case None =>
+      // TODO: Indicate the error and avoid the LOG flood
+      // LOG.error(s"Task $taskId drop message $msg")
+    }
+  }
+
+  private def getSubscription(processorId: ProcessorId): Option[Subscription] 
= {
+    subscriptions.find(_._1 == processorId).map(_._2)
+  }
+}
+
+object TaskActor {
+  // 3 seconds
+  val CLOCK_SYNC_TIMEOUT_INTERVAL = 3 * 1000
+
+  // If the message comes from an unknown source, securityChecker will drop it
+  class SecurityChecker(task_id: TaskId, self: ActorRef) {
+
+    private val LOG: Logger = LogUtil.getLogger(getClass, task = task_id)
+
+    // Uses mutable HashMap for performance optimization
+    private val receivedMsgCount = new IntShortHashMap()
+
+    // Tricky performance optimization to save memory.
+    // We store the session Id in the uid of ActorPath
+    // ActorPath.hashCode is same as uid.
+    private def getSessionId(actor: ActorRef): Int = {
+      // TODO: As method uid is protected in [akka] package. We
+      // are using hashCode instead of uid.
+      actor.hashCode()
+    }
+
+    def handleInitialAckRequest(ackRequest: InitialAckRequest): Ack = {
+      LOG.debug(s"Handle InitialAckRequest for session $ackRequest")
+      val sessionId = ackRequest.sessionId
+      if (sessionId == NONE_SESSION) {
+        LOG.error(s"SessionId is not initialized, ackRequest: $ackRequest")
+        null
+      } else {
+        receivedMsgCount.put(sessionId, 0)
+        Ack(task_id, 0, 0, sessionId)
+      }
+    }
+
+    def generateAckResponse(ackRequest: AckRequest, sender: ActorRef, 
incrementCount: Int): Ack = {
+      val sessionId = ackRequest.sessionId
+      if (receivedMsgCount.containsKey(sessionId)) {
+        // Increments more count for each AckRequest
+        // to throttle the number of unacked AckRequest
+        receivedMsgCount.put(sessionId, (receivedMsgCount.get(sessionId) + 
incrementCount).toShort)
+        Ack(task_id, ackRequest.seq, receivedMsgCount.get(sessionId), 
ackRequest.sessionId)
+      } else {
+        LOG.error(s"get unknown AckRequest $ackRequest from 
${sender.toString()}")
+        null
+      }
+    }
+
+    // If the message comes from an unknown source, then drop it
+    def checkMessage(message: Message, sender: ActorRef): Option[Message] = {
+      if (sender.equals(self)) {
+        Some(message)
+      } else {
+        val sessionId = getSessionId(sender)
+        if (receivedMsgCount.containsKey(sessionId)) {
+          receivedMsgCount.put(sessionId, (receivedMsgCount.get(sessionId) + 
1).toShort)
+          Some(message)
+        } else {
+          // This is an illegal message,
+          LOG.debug(s"received message before receive the first AckRequest, 
session $sessionId")
+          None
+        }
+      }
+    }
+  }
+
+  case class SendAck(ack: Ack, targetTask: TaskId)
+
+  case object FLUSH
+
+  val NONE_SESSION = -1
+
+  case class MessageAndSender(msg: AnyRef, sender: ActorRef)
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskContextData.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskContextData.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskContextData.scala
new file mode 100644
index 0000000..0d12bc9
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskContextData.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.task
+
+import akka.actor.ActorRef
+
+import org.apache.gearpump.streaming.LifeTime
+
+case class TaskContextData(
+    executorId: Int,
+    appId: Int,
+    appName: String,
+    appMaster: ActorRef,
+    parallelism: Int,
+    life: LifeTime,
+    subscribers: List[Subscriber])

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala
new file mode 100644
index 0000000..e537f99
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.task
+
+import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.streaming.ProcessorId
+
+/*
+ * Initial AckRequest
+ */
+case class InitialAckRequest(taskId: TaskId, sessionId: Int)
+
+/*
+  Here the sessionId filed is used to distinguish messages
+    between different replays after the application restart
+ */
+case class AckRequest(taskId: TaskId, seq: Short, sessionId: Int)
+
+/**
+ * Ack back to sender task actor.
+ *
+ * @param seq The seq field represents the expected number of received 
messages and the
+ *            actualReceivedNum field means the actual received number since 
start.
+ */
+case class Ack(taskId: TaskId, seq: Short, actualReceivedNum: Short, 
sessionId: Int)
+
+sealed trait ClockEvent
+
+case class UpdateClock(taskId: TaskId, time: TimeStamp) extends ClockEvent
+
+object GetLatestMinClock extends ClockEvent
+
+case class GetUpstreamMinClock(taskId: TaskId) extends ClockEvent
+
+case class UpstreamMinClock(latestMinClock: TimeStamp)
+
+case class LatestMinClock(clock: TimeStamp)
+
+case class ReportCheckpointClock(taskId: TaskId, clock: TimeStamp)
+
+case object GetCheckpointClock
+
+case class CheckpointClock(clock: Option[TimeStamp])
+
+case object GetStartClock
+
+case class StartClock(clock: TimeStamp)
+
+/** Probe the latency between two upstream to downstream tasks. */
+case class LatencyProbe(timestamp: Long)
+
+case class SendMessageLoss()
+
+case object GetDAG
+
+case class CheckProcessorDeath(processorId: ProcessorId)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskId.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskId.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskId.scala
new file mode 100644
index 0000000..b44293c
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskId.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.task
+
+import org.apache.gearpump.streaming._
+
+case class TaskId(processorId: ProcessorId, index: TaskIndex)
+
+object TaskId {
+  def toLong(id: TaskId): Long = (id.processorId.toLong << 32) + id.index
+  def fromLong(id: Long): TaskId = TaskId(((id >> 32) & 0xFFFFFFFF).toInt, (id 
& 0xFFFFFFFF).toInt)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskMessageSerializer.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskMessageSerializer.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskMessageSerializer.scala
new file mode 100644
index 0000000..591dd46
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskMessageSerializer.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.task
+
+import java.io.{DataInput, DataOutput}
+
+trait TaskMessageSerializer[T] {
+  def write(dataOutput: DataOutput, obj: T)
+
+  def read(dataInput: DataInput): T
+
+  def getLength(obj: T): Int
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskUtil.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskUtil.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskUtil.scala
new file mode 100644
index 0000000..7459c64
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskUtil.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.task
+
+object TaskUtil {
+
+  /**
+   * Resolves a classname to a Task class.
+   *
+   * @param className  the class name to resolve
+   * @return resolved class
+   */
+  def loadClass(className: String): Class[_ <: Task] = {
+    val loader = Thread.currentThread().getContextClassLoader()
+    loader.loadClass(className).asSubclass(classOf[Task])
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala
new file mode 100644
index 0000000..cd33f7e
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.task
+
+import scala.concurrent.duration.FiniteDuration
+
+import akka.actor.Actor._
+import akka.actor.{ActorRef, ActorSystem, Cancellable, Props}
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.util.LogUtil
+import org.apache.gearpump.{Message, TimeStamp}
+
+/**
+ * This provides TaskContext for user defined tasks
+ *
+ * @param taskClass task class
+ * @param context context class
+ * @param userConf user config
+ */
+class TaskWrapper(
+    val taskId: TaskId, taskClass: Class[_ <: Task], context: TaskContextData,
+    userConf: UserConfig) extends TaskContext with TaskInterface {
+
+  private val LOG = LogUtil.getLogger(taskClass, task = taskId)
+
+  private var actor: TaskActor = null
+
+  private var task: Option[Task] = None
+
+  def setTaskActor(actor: TaskActor): Unit = this.actor = actor
+
+  override def appId: Int = context.appId
+
+  override def appName: String = context.appName
+
+  override def executorId: Int = context.executorId
+
+  override def parallelism: Int = context.parallelism
+
+  override def appMaster: ActorRef = context.appMaster
+
+  override def output(msg: Message): Unit = actor.output(msg)
+
+  /**
+   * See [[org.apache.gearpump.streaming.task.TaskActor]]
+   *   output(arrayIndex: Int, msg: Message): Unit
+   *
+   * @param index, not same as ProcessorId
+   */
+  def output(index: Int, msg: Message): Unit = actor.output(index, msg)
+
+  /**
+   * Use with caution, output unmanaged message to target tasks
+   *
+   * @param msg  message to output
+   * @param tasks  the tasks to output to
+   */
+  def outputUnManaged(msg: AnyRef, tasks: TaskId*): Unit = {
+    actor.transport(msg, tasks: _*)
+  }
+
+  override def self: ActorRef = actor.context.self
+
+  override def sender: ActorRef = actor.context.sender()
+
+  def system: ActorSystem = actor.context.system
+
+  override def actorOf(props: Props): ActorRef = actor.context.actorOf(props)
+
+  override def actorOf(props: Props, name: String): ActorRef = 
actor.context.actorOf(props, name)
+
+  override def onStart(startTime: StartTime): Unit = {
+    if (None != task) {
+      LOG.error(s"Task.onStart should not be called multiple times... 
${task.getClass}")
+    }
+    val constructor = taskClass.getConstructor(classOf[TaskContext], 
classOf[UserConfig])
+    task = Some(constructor.newInstance(this, userConf))
+    task.foreach(_.onStart(startTime))
+  }
+
+  override def onNext(msg: Message): Unit = task.foreach(_.onNext(msg))
+
+  override def onStop(): Unit = {
+    task.foreach(_.onStop())
+    task = None
+  }
+
+  override def receiveUnManagedMessage: Receive = {
+    task.map(_.receiveUnManagedMessage).getOrElse(defaultMessageHandler)
+  }
+
+  override def upstreamMinClock: TimeStamp = {
+    actor.getUpstreamMinClock
+  }
+
+  def schedule(initialDelay: FiniteDuration, interval: FiniteDuration)(f: => 
Unit): Cancellable = {
+    val dispatcher = actor.context.system.dispatcher
+    actor.context.system.scheduler.schedule(initialDelay, 
interval)(f)(dispatcher)
+  }
+
+  def scheduleOnce(initialDelay: FiniteDuration)(f: => Unit): Cancellable = {
+    val dispatcher = actor.context.system.dispatcher
+    actor.context.system.scheduler.scheduleOnce(initialDelay)(f)(dispatcher)
+  }
+
+  private def defaultMessageHandler: Receive = {
+    case msg =>
+      LOG.error("Failed! Received unknown message " + "taskId: " + taskId + ", 
" + msg.toString)
+  }
+
+  /**
+   * Logger is environment dependant, it should be provided by
+   * containing environment.
+   */
+  override def logger: Logger = LOG
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointStore.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointStore.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointStore.scala
new file mode 100644
index 0000000..2ef8610
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointStore.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.transaction.api
+
+import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+/**
+ * CheckpointStore persistently stores mapping of timestamp to checkpoint
+ * it's possible that two checkpoints have the same timestamp
+ * CheckpointStore needs to handle this either during write or read
+ */
+trait CheckpointStore {
+
+  def persist(timeStamp: TimeStamp, checkpoint: Array[Byte]): Unit
+
+  def recover(timestamp: TimeStamp): Option[Array[Byte]]
+
+  def close(): Unit
+}
+
+trait CheckpointStoreFactory extends java.io.Serializable {
+  def getCheckpointStore(conf: UserConfig, taskContext: TaskContext): 
CheckpointStore
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/MessageDecoder.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/MessageDecoder.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/MessageDecoder.scala
new file mode 100644
index 0000000..0615078
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/MessageDecoder.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.transaction.api
+
+import org.apache.gearpump.Message
+
+/**
+ * MessageDecoder decodes raw bytes to Message It is usually written by end 
user and
+ * passed into TimeReplayableSource
+ */
+trait MessageDecoder extends java.io.Serializable {
+  def fromBytes(bytes: Array[Byte]): Message
+}


Reply via email to