http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceTask.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceTask.scala 
b/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceTask.scala
deleted file mode 100644
index eab74aa..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/source/DataSourceTask.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.source
-
-import io.gearpump._
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
-
-object DataSourceTask {
-  val DATA_SOURCE = "data_source"
-}
-
-/**
- * Default Task container for [[io.gearpump.streaming.source.DataSource]] that
- * reads from DataSource in batch
- * See [[io.gearpump.streaming.source.DataSourceProcessor]] for its usage
- *
- * DataSourceTask calls:
- *  - `DataSource.open()` in `onStart` and pass in 
[[io.gearpump.streaming.task.TaskContext]]
- * and application start time
- *  - `DataSource.read()` in each `onNext`, which reads a batch of messages
- *  - `DataSource.close()` in `onStop`
- */
-class DataSourceTask(context: TaskContext, conf: UserConfig) extends 
Task(context, conf) {
-  import io.gearpump.streaming.source.DataSourceTask._
-
-  private val source = conf.getValue[DataSource](DATA_SOURCE).get
-  private val batchSize = 
conf.getInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE).getOrElse(1000)
-  private var startTime = 0L
-
-  override def onStart(newStartTime: StartTime): Unit = {
-    startTime = newStartTime.startTime
-    LOG.info(s"opening data source at $startTime")
-    source.open(context, startTime)
-    self ! Message("start", System.currentTimeMillis())
-  }
-
-  override def onNext(message: Message): Unit = {
-    0.until(batchSize).foreach { _ =>
-      Option(source.read()).foreach(context.output)
-    }
-    self ! Message("continue", System.currentTimeMillis())
-  }
-
-  override def onStop(): Unit = {
-    LOG.info("closing data source...")
-    source.close()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/source/DefaultTimeStampFilter.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/source/DefaultTimeStampFilter.scala
 
b/streaming/src/main/scala/io/gearpump/streaming/source/DefaultTimeStampFilter.scala
deleted file mode 100644
index b7d4e90..0000000
--- 
a/streaming/src/main/scala/io/gearpump/streaming/source/DefaultTimeStampFilter.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.source
-
-import io.gearpump.streaming.transaction.api.TimeStampFilter
-import io.gearpump.{Message, TimeStamp}
-
-/**
- * TimeStampFilter filters out messages which have obsolete (smaller) 
timestamp.
- */
-class DefaultTimeStampFilter extends TimeStampFilter {
-  override def filter(msg: Message, predicate: TimeStamp): Option[Message] = {
-    Option(msg).find(_.timestamp >= predicate)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/state/api/Monoid.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/state/api/Monoid.scala 
b/streaming/src/main/scala/io/gearpump/streaming/state/api/Monoid.scala
deleted file mode 100644
index dfe3e93..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/state/api/Monoid.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.state.api
-
-trait Monoid[T] extends java.io.Serializable {
-  def plus(l: T, r: T): T
-  def zero: T
-}
-
-trait Group[T] extends Monoid[T] {
-  def minus(l: T, r: T): T
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/state/api/MonoidState.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/state/api/MonoidState.scala 
b/streaming/src/main/scala/io/gearpump/streaming/state/api/MonoidState.scala
deleted file mode 100644
index 238eab4..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/state/api/MonoidState.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.state.api
-
-import io.gearpump.TimeStamp
-
-/**
- * MonoidState uses Algebird Monoid to aggregate state
- *
- * on start, state value is initialized to monoid.zero
- * on each new message, existing state value is aggregated with
- * the incoming value using monoid.plus to get a new state value
- */
-abstract class MonoidState[T](monoid: Monoid[T]) extends PersistentState[T] {
-  // Left state updated by messages before checkpoint time
-  private[state] var left: T = monoid.zero
-  // Right state updated by message after checkpoint time
-  private[state] var right: T = monoid.zero
-
-  protected var checkpointTime = Long.MaxValue
-
-  override def get: Option[T] = Option(monoid.plus(left, right))
-
-  override def setNextCheckpointTime(nextCheckpointTime: TimeStamp): Unit = {
-    checkpointTime = nextCheckpointTime
-  }
-
-  protected def updateState(timestamp: TimeStamp, t: T): Unit = {
-    if (timestamp < checkpointTime) {
-      left = monoid.plus(left, t)
-    } else {
-      right = monoid.plus(right, t)
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentState.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentState.scala
 
b/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentState.scala
deleted file mode 100644
index f1b923a..0000000
--- 
a/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentState.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.state.api
-
-import io.gearpump._
-
-/**
- * PersistentState is part of the transaction API
- *
- * Users could get transaction support from the framework by
- * conforming to PersistentState APIs and extending PersistentTask
- * to manage the state
- */
-trait PersistentState[T] {
-
-  /**
-   * Recovers state to a previous checkpoint
-   * usually invoked by the framework
-   */
-  def recover(timestamp: TimeStamp, bytes: Array[Byte]): Unit
-
-  /**
-   * Updates state on a new message
-   * this is invoked by user
-   */
-  def update(timestamp: TimeStamp, t: T): Unit
-
-  /**
-   * Sets next checkpoint time
-   * should be invoked by the framework
-   */
-  def setNextCheckpointTime(timeStamp: TimeStamp): Unit
-
-  /**
-   * Gets a binary snapshot of state
-   * usually invoked by the framework
-   */
-  def checkpoint(): Array[Byte]
-
-  /**
-   * Unwraps the raw value of state
-   */
-  def get: Option[T]
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentTask.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentTask.scala 
b/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentTask.scala
deleted file mode 100644
index a8a5d5d..0000000
--- 
a/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentTask.scala
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.state.api
-
-import java.util.concurrent.TimeUnit
-import scala.concurrent.duration.FiniteDuration
-
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.state.impl.{CheckpointManager, 
PersistentStateConfig}
-import io.gearpump.streaming.task.{ReportCheckpointClock, StartTime, Task, 
TaskContext}
-import io.gearpump.streaming.transaction.api.CheckpointStoreFactory
-import io.gearpump.util.LogUtil
-import io.gearpump.{Message, TimeStamp}
-
-object PersistentTask {
-  val CHECKPOINT = Message("checkpoint")
-  val LOG = LogUtil.getLogger(getClass)
-}
-
-/**
- * PersistentTask is part of the transaction API
- *
- * Users should extend this task if they want to get transaction support
- * from the framework
- */
-abstract class PersistentTask[T](taskContext: TaskContext, conf: UserConfig)
-  extends Task(taskContext, conf) {
-  import taskContext._
-
-  import io.gearpump.streaming.state.api.PersistentTask._
-
-  val checkpointStoreFactory = conf.getValue[CheckpointStoreFactory](
-    PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY).get
-  val checkpointStore = checkpointStoreFactory.getCheckpointStore(conf, 
taskContext)
-  val checkpointInterval = 
conf.getLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS).get
-  val checkpointManager = new CheckpointManager(checkpointInterval, 
checkpointStore)
-  // System time interval to attempt checkpoint
-  private val checkpointAttemptInterval = 1000L
-
-  /**
-   * Subclass should override this method to pass in a PersistentState. the 
framework has already
-   * offered two states:
-   *  - NonWindowState: state with no time or other boundary
-   *  - WindowState:  each state is bounded by a time window
-   */
-  def persistentState: PersistentState[T]
-
-  /**
-   * Subclass should override this method to specify how a new message should 
update state
-   */
-  def processMessage(state: PersistentState[T], message: Message): Unit
-
-  /** Persistent state that will be stored (by checkpointing) automatically to 
storage like HDFS */
-  val state = persistentState
-
-  final override def onStart(startTime: StartTime): Unit = {
-    val timestamp = startTime.startTime
-    checkpointManager
-      .recover(timestamp)
-      .foreach(state.recover(timestamp, _))
-
-    reportCheckpointClock(timestamp)
-    scheduleCheckpoint(checkpointAttemptInterval)
-  }
-
-  final override def onNext(message: Message): Unit = {
-    message match {
-      case CHECKPOINT =>
-        val upstreamMinClock = taskContext.upstreamMinClock
-        if (checkpointManager.shouldCheckpoint(upstreamMinClock)) {
-          checkpointManager.getCheckpointTime.foreach { checkpointTime =>
-            val serialized = state.checkpoint()
-            checkpointManager.checkpoint(checkpointTime, serialized)
-              .foreach(state.setNextCheckpointTime)
-            taskContext.output(Message(serialized, checkpointTime))
-            reportCheckpointClock(checkpointTime)
-          }
-        }
-        scheduleCheckpoint(checkpointAttemptInterval)
-      case _ =>
-        checkpointManager.update(message.timestamp)
-          .foreach(state.setNextCheckpointTime)
-        processMessage(state, message)
-    }
-  }
-
-  final override def onStop(): Unit = {
-    checkpointManager.close()
-  }
-
-  private def scheduleCheckpoint(interval: Long): Unit = {
-    scheduleOnce(new FiniteDuration(interval, TimeUnit.MILLISECONDS))(self ! 
CHECKPOINT)
-  }
-
-  private def reportCheckpointClock(timestamp: TimeStamp): Unit = {
-    appMaster ! ReportCheckpointClock(taskContext.taskId, timestamp)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/state/api/Serializer.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/state/api/Serializer.scala 
b/streaming/src/main/scala/io/gearpump/streaming/state/api/Serializer.scala
deleted file mode 100644
index f87b224..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/state/api/Serializer.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.state.api
-
-import scala.util.Try
-
-trait Serializer[T] extends java.io.Serializable {
-  def serialize(t: T): Array[Byte]
-  def deserialize(bytes: Array[Byte]): Try[T]
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/state/impl/CheckpointManager.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/state/impl/CheckpointManager.scala
 
b/streaming/src/main/scala/io/gearpump/streaming/state/impl/CheckpointManager.scala
deleted file mode 100644
index 76c91c7..0000000
--- 
a/streaming/src/main/scala/io/gearpump/streaming/state/impl/CheckpointManager.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.state.impl
-
-import io.gearpump.TimeStamp
-import io.gearpump.streaming.transaction.api.CheckpointStore
-
-/** Manage physical checkpoints to persitent storage like HDFS */
-class CheckpointManager(checkpointInterval: Long,
-    checkpointStore: CheckpointStore) {
-
-  private var maxMessageTime: Long = 0L
-  private var checkpointTime: Option[Long] = None
-
-  def recover(timestamp: TimeStamp): Option[Array[Byte]] = {
-    checkpointStore.recover(timestamp)
-  }
-
-  def checkpoint(timestamp: TimeStamp, checkpoint: Array[Byte]): 
Option[TimeStamp] = {
-    checkpointStore.persist(timestamp, checkpoint)
-    checkpointTime = checkpointTime.collect { case time if maxMessageTime > 
time =>
-      time + (1 + (maxMessageTime - time) / checkpointInterval) * 
checkpointInterval
-    }
-
-    checkpointTime
-  }
-
-  def update(messageTime: TimeStamp): Option[TimeStamp] = {
-    maxMessageTime = Math.max(maxMessageTime, messageTime)
-    if (checkpointTime.isEmpty) {
-      checkpointTime = Some((1 + messageTime / checkpointInterval) * 
checkpointInterval)
-    }
-
-    checkpointTime
-  }
-
-  def shouldCheckpoint(upstreamMinClock: TimeStamp): Boolean = {
-    checkpointTime.exists(time => upstreamMinClock >= time)
-  }
-
-  def getCheckpointTime: Option[TimeStamp] = checkpointTime
-
-  def close(): Unit = {
-    checkpointStore.close()
-  }
-
-  private[impl] def getMaxMessageTime: TimeStamp = maxMessageTime
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala
 
b/streaming/src/main/scala/io/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala
deleted file mode 100644
index 21623e3..0000000
--- 
a/streaming/src/main/scala/io/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.state.impl
-
-import io.gearpump.TimeStamp
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.task.TaskContext
-import io.gearpump.streaming.transaction.api.{CheckpointStore, 
CheckpointStoreFactory}
-
-/**
- * an in memory store provided for test
- * should not be used in real cases
- */
-class InMemoryCheckpointStore extends CheckpointStore {
-  private var checkpoints = Map.empty[TimeStamp, Array[Byte]]
-
-  override def persist(timestamp: TimeStamp, checkpoint: Array[Byte]): Unit = {
-    checkpoints += timestamp -> checkpoint
-  }
-
-  override def recover(timestamp: TimeStamp): Option[Array[Byte]] = {
-    checkpoints.get(timestamp)
-  }
-
-  override def close(): Unit = {
-    checkpoints = Map.empty[TimeStamp, Array[Byte]]
-  }
-}
-
-class InMemoryCheckpointStoreFactory extends CheckpointStoreFactory {
-  override def getCheckpointStore(conf: UserConfig, taskContext: TaskContext): 
CheckpointStore = {
-    new InMemoryCheckpointStore
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/state/impl/NonWindowState.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/state/impl/NonWindowState.scala
 
b/streaming/src/main/scala/io/gearpump/streaming/state/impl/NonWindowState.scala
deleted file mode 100644
index dcd3918..0000000
--- 
a/streaming/src/main/scala/io/gearpump/streaming/state/impl/NonWindowState.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.state.impl
-
-import org.slf4j.Logger
-
-import io.gearpump.TimeStamp
-import io.gearpump.streaming.state.api.{Monoid, MonoidState, Serializer}
-import io.gearpump.streaming.state.impl.NonWindowState._
-import io.gearpump.util.LogUtil
-
-object NonWindowState {
-  val LOG: Logger = LogUtil.getLogger(classOf[NonWindowState[_]])
-}
-
-/**
- * a MonoidState storing non-window state
- */
-class NonWindowState[T](monoid: Monoid[T], serializer: Serializer[T])
-  extends MonoidState[T](monoid) {
-
-  override def recover(timestamp: TimeStamp, bytes: Array[Byte]): Unit = {
-    serializer.deserialize(bytes).foreach(left = _)
-  }
-
-  override def update(timestamp: TimeStamp, t: T): Unit = {
-    updateState(timestamp, t)
-  }
-
-  override def checkpoint(): Array[Byte] = {
-    val serialized = serializer.serialize(left)
-    LOG.debug(s"checkpoint time: $checkpointTime; checkpoint value: 
($checkpointTime, $left)")
-    left = monoid.plus(left, right)
-    right = monoid.zero
-    serialized
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/state/impl/PersistentStateConfig.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/state/impl/PersistentStateConfig.scala
 
b/streaming/src/main/scala/io/gearpump/streaming/state/impl/PersistentStateConfig.scala
deleted file mode 100644
index d7488d7..0000000
--- 
a/streaming/src/main/scala/io/gearpump/streaming/state/impl/PersistentStateConfig.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.state.impl
-
-object PersistentStateConfig {
-
-  val STATE_CHECKPOINT_ENABLE = "state.checkpoint.enable"
-  val STATE_CHECKPOINT_INTERVAL_MS = "state.checkpoint.interval.ms"
-  val STATE_CHECKPOINT_STORE_FACTORY = "state.checkpoint.store.factory"
-  val STATE_WINDOW_SIZE = "state.window.size"
-  val STATE_WINDOW_STEP = "state.window.step"
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/state/impl/Window.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/state/impl/Window.scala 
b/streaming/src/main/scala/io/gearpump/streaming/state/impl/Window.scala
deleted file mode 100644
index 63cdf06..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/state/impl/Window.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.state.impl
-
-import io.gearpump.TimeStamp
-
-/**
- * Used in window applications
- * it keeps the current window and slide ahead when the window expires
- */
-class Window(val windowSize: Long, val windowStep: Long) {
-
-  def this(windowConfig: WindowConfig) = {
-    this(windowConfig.windowSize, windowConfig.windowStep)
-  }
-
-  private var clock: TimeStamp = 0L
-  private var startTime = 0L
-
-  def update(clock: TimeStamp): Unit = {
-    this.clock = clock
-  }
-
-  def slideOneStep(): Unit = {
-    startTime += windowStep
-  }
-
-  def slideTo(timestamp: TimeStamp): Unit = {
-    startTime = timestamp / windowStep * windowStep
-  }
-
-  def shouldSlide: Boolean = {
-    clock >= (startTime + windowSize)
-  }
-
-  def range: (TimeStamp, TimeStamp) = {
-    startTime -> (startTime + windowSize)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/state/impl/WindowConfig.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/state/impl/WindowConfig.scala 
b/streaming/src/main/scala/io/gearpump/streaming/state/impl/WindowConfig.scala
deleted file mode 100644
index d7d3776..0000000
--- 
a/streaming/src/main/scala/io/gearpump/streaming/state/impl/WindowConfig.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.state.impl
-
-object WindowConfig {
-  val NAME = "window_config"
-}
-
-case class WindowConfig(windowSize: Long, windowStep: Long)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/state/impl/WindowState.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/state/impl/WindowState.scala 
b/streaming/src/main/scala/io/gearpump/streaming/state/impl/WindowState.scala
deleted file mode 100644
index 30382c0..0000000
--- 
a/streaming/src/main/scala/io/gearpump/streaming/state/impl/WindowState.scala
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.state.impl
-
-import scala.collection.immutable.TreeMap
-
-import org.slf4j.Logger
-
-import io.gearpump.TimeStamp
-import io.gearpump.streaming.state.api.{Group, MonoidState, Serializer}
-import io.gearpump.streaming.state.impl.WindowState._
-import io.gearpump.streaming.task.TaskContext
-import io.gearpump.util.LogUtil
-
-/**
- * an interval is a dynamic time range that is divided by window boundary and 
checkpoint time
- */
-case class Interval(startTime: TimeStamp, endTime: TimeStamp) extends 
Ordered[Interval] {
-  override def compare(that: Interval): Int = {
-    if (startTime < that.startTime) -1
-    else if (startTime > that.startTime) 1
-    else 0
-  }
-}
-
-object WindowState {
-  val LOG: Logger = LogUtil.getLogger(classOf[WindowState[_]])
-}
-
-/**
- * this is a list of states, each of which is bounded by a time window
- * state of each window doesn't affect each other
- *
- * WindowState requires a Algebird Group to be passed in
- * Group augments Monoid with a minus function which makes it
- * possible to undo the update by messages that have left the window
- */
-class WindowState[T](group: Group[T],
-    serializer: Serializer[TreeMap[Interval, T]],
-    taskContext: TaskContext,
-    window: Window) extends MonoidState[T](group) {
-  /**
-   * each interval has a state updated by message with timestamp in
-   * [interval.startTime, interval.endTime)
-   */
-  private var intervalStates = TreeMap.empty[Interval, T]
-
-  private var lastCheckpointTime = 0L
-
-  override def recover(timestamp: TimeStamp, bytes: Array[Byte]): Unit = {
-    window.slideTo(timestamp)
-    serializer.deserialize(bytes)
-      .foreach { states =>
-        intervalStates = states
-        left = states.foldLeft(left) { case (accum, iter) =>
-          group.plus(accum, iter._2)
-        }
-      }
-  }
-
-  override def update(timestamp: TimeStamp, t: T): Unit = {
-    val (startTime, endTime) = window.range
-    if (timestamp >= startTime && timestamp < endTime) {
-      updateState(timestamp, t)
-    }
-
-    updateIntervalStates(timestamp, t, checkpointTime)
-
-    val upstreamMinClock = taskContext.upstreamMinClock
-    window.update(upstreamMinClock)
-
-    if (window.shouldSlide) {
-      window.slideOneStep()
-
-      val (newStartTime, newEndTime) = window.range
-      getIntervalStates(startTime, newStartTime).foreach { case (_, st) =>
-        left = group.minus(left, st)
-      }
-      if (checkpointTime > endTime) {
-        getIntervalStates(endTime, checkpointTime).foreach { case (_, st) =>
-          left = group.plus(left, st)
-        }
-      } else {
-        getIntervalStates(endTime, newEndTime).foreach { case (_, st) =>
-          right = group.plus(right, st)
-        }
-      }
-    }
-  }
-
-  override def checkpoint(): Array[Byte] = {
-    left = group.plus(left, right)
-    right = group.zero
-
-    val states = getIntervalStates(window.range._1, checkpointTime)
-    lastCheckpointTime = checkpointTime
-    LOG.debug(s"checkpoint time: $checkpointTime; checkpoint value: 
($checkpointTime, $states)")
-    serializer.serialize(states)
-  }
-
-  /**
-   * Each message will update state in corresponding Interval[StartTime, 
endTime),
-   *
-   * which is decided by the message's timestamp t where
-   * {{{
-   * startTime = Math.max(lowerBound1, lowerBound2, checkpointTime)
-   * endTime = Math.min(upperBound1, upperBound2, checkpointTime)
-   * lowerBound1 = step * Nmax1 <= t
-   * lowerBound2 = step * Nmax2 + size <= t
-   * upperBound1 = step * Nmin1 > t
-   * upperBound2 = step * Nmin2 + size > t
-   * }}}
-   */
-  private[impl] def getInterval(timestamp: TimeStamp, checkpointTime: 
TimeStamp): Interval = {
-    val windowSize = window.windowSize
-    val windowStep = window.windowStep
-    val lowerBound1 = timestamp / windowStep * windowStep
-    val lowerBound2 =
-      if (timestamp < windowSize) 0L
-      else (timestamp - windowSize) / windowStep * windowStep + windowSize
-    val upperBound1 = (timestamp / windowStep + 1) * windowStep
-    val upperBound2 =
-      if (timestamp < windowSize) windowSize
-      else ((timestamp - windowSize) / windowStep + 1) * windowStep + 
windowSize
-    val lowerBound = Math.max(lowerBound1, lowerBound2)
-    val upperBound = Math.min(upperBound1, upperBound2)
-    if (checkpointTime > timestamp) {
-      Interval(Math.max(lowerBound, lastCheckpointTime), Math.min(upperBound, 
checkpointTime))
-    } else {
-      Interval(Math.max(lowerBound, checkpointTime), upperBound)
-    }
-  }
-
-  private[impl] def updateIntervalStates(timestamp: TimeStamp, t: T, 
checkpointTime: TimeStamp)
-  : Unit = {
-    val interval = getInterval(timestamp, checkpointTime)
-    intervalStates.get(interval) match {
-      case Some(st) =>
-        intervalStates += interval -> group.plus(st, t)
-      case None =>
-        intervalStates += interval -> group.plus(group.zero, t)
-    }
-  }
-
-  private[impl] def getIntervalStates(startTime: TimeStamp, endTime: TimeStamp)
-  : TreeMap[Interval, T] = {
-    intervalStates.dropWhile(_._1.endTime <= startTime).takeWhile(_._1.endTime 
<= endTime)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/storage/AppDataStore.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/storage/AppDataStore.scala 
b/streaming/src/main/scala/io/gearpump/streaming/storage/AppDataStore.scala
deleted file mode 100644
index 962f48f..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/storage/AppDataStore.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.storage
-
-import scala.concurrent._
-
-/**
- * Generic storage to store KV Data.
- */
-trait AppDataStore {
-  def put(key: String, value: Any): Future[Any]
-
-  def get(key: String): Future[Any]
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/storage/InMemoryAppStoreOnMaster.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/storage/InMemoryAppStoreOnMaster.scala
 
b/streaming/src/main/scala/io/gearpump/streaming/storage/InMemoryAppStoreOnMaster.scala
deleted file mode 100644
index f1a19a8..0000000
--- 
a/streaming/src/main/scala/io/gearpump/streaming/storage/InMemoryAppStoreOnMaster.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.storage
-
-import scala.concurrent.Future
-
-import akka.actor.ActorRef
-import akka.pattern.ask
-
-import io.gearpump.cluster.AppMasterToMaster.{GetAppData, GetAppDataResult, 
SaveAppData}
-import io.gearpump.util.Constants
-
-/**
- * In memory application storage located on master nodes
- */
-class InMemoryAppStoreOnMaster(appId: Int, master: ActorRef) extends 
AppDataStore {
-  implicit val timeout = Constants.FUTURE_TIMEOUT
-  import scala.concurrent.ExecutionContext.Implicits.global
-
-  override def put(key: String, value: Any): Future[Any] = {
-    master.ask(SaveAppData(appId, key, value))
-  }
-
-  override def get(key: String): Future[Any] = {
-    master.ask(GetAppData(appId, 
key)).asInstanceOf[Future[GetAppDataResult]].map { result =>
-      if (result.key.equals(key)) {
-        result.value
-      } else {
-        null
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/task/ExpressTransport.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/task/ExpressTransport.scala 
b/streaming/src/main/scala/io/gearpump/streaming/task/ExpressTransport.scala
deleted file mode 100644
index a3cb6e1..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/task/ExpressTransport.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.task
-
-import akka.actor.{ActorRef, ExtendedActorSystem}
-
-import io.gearpump.Message
-import io.gearpump.transport.netty.TaskMessage
-import io.gearpump.transport.{Express, HostPort}
-import io.gearpump.util.AkkaHelper
-/**
- * ExpressTransport wire the networking function from default akka
- * networking to customized implementation [[io.gearpump.transport.Express]].
- *
- * See [[io.gearpump.transport.Express]] for more information.
- */
-trait ExpressTransport {
-  this: TaskActor =>
-
-  final val express = Express(context.system)
-  implicit val system = context.system.asInstanceOf[ExtendedActorSystem]
-
-  final def local: HostPort = express.localHost
-  lazy val sourceId = TaskId.toLong(taskId)
-
-  lazy val sessionRef: ActorRef = {
-    AkkaHelper.actorFor(system, s"/session#$sessionId")
-  }
-
-  def transport(msg: AnyRef, remotes: TaskId*): Unit = {
-    var serializedMessage: AnyRef = null
-
-    remotes.foreach { remote =>
-      val transportId = TaskId.toLong(remote)
-      val localActor = express.lookupLocalActor(transportId)
-      if (localActor.isDefined) {
-        localActor.get.tell(msg, sessionRef)
-      } else {
-        if (null == serializedMessage) {
-          msg match {
-            case message: Message =>
-              val bytes = serializerPool.get().serialize(message.msg)
-              serializedMessage = SerializedMessage(message.timestamp, bytes)
-            case _ => serializedMessage = msg
-          }
-        }
-        val taskMessage = new TaskMessage(sessionId, transportId, sourceId, 
serializedMessage)
-
-        val remoteAddress = express.lookupRemoteAddress(transportId)
-        if (remoteAddress.isDefined) {
-          express.transport(taskMessage, remoteAddress.get)
-        } else {
-          LOG.error(
-            s"Can not find target task $remote, maybe the application is 
undergoing recovery")
-        }
-      }
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/task/SerializedMessage.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/task/SerializedMessage.scala 
b/streaming/src/main/scala/io/gearpump/streaming/task/SerializedMessage.scala
deleted file mode 100644
index 9f9bf1b..0000000
--- 
a/streaming/src/main/scala/io/gearpump/streaming/task/SerializedMessage.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.task
-
-import java.io.{DataInput, DataOutput}
-
-import io.gearpump.TimeStamp
-
-case class SerializedMessage(timeStamp: TimeStamp, bytes: Array[Byte])
-
-class SerializedMessageSerializer extends 
TaskMessageSerializer[SerializedMessage] {
-  override def getLength(obj: SerializedMessage): Int = 12 + obj.bytes.length
-
-  override def write(dataOutput: DataOutput, obj: SerializedMessage): Unit = {
-    dataOutput.writeLong(obj.timeStamp)
-    dataOutput.writeInt(obj.bytes.length)
-    dataOutput.write(obj.bytes)
-  }
-
-  override def read(dataInput: DataInput): SerializedMessage = {
-    val timestamp = dataInput.readLong()
-    val length = dataInput.readInt()
-    val bytes = new Array[Byte](length)
-    dataInput.readFully(bytes)
-    SerializedMessage(timestamp, bytes)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/task/SerializerResolver.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/task/SerializerResolver.scala 
b/streaming/src/main/scala/io/gearpump/streaming/task/SerializerResolver.scala
deleted file mode 100644
index 72bc7db..0000000
--- 
a/streaming/src/main/scala/io/gearpump/streaming/task/SerializerResolver.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.task
-
-import io.gearpump.esotericsoftware.kryo.util.{IntMap, ObjectMap}
-import io.gearpump.streaming.task.SerializerResolver.Registration
-
-private[task] class SerializerResolver {
-  private var classId = 0
-  private val idToRegistration = new IntMap[Registration]()
-  private val classToRegistration = new ObjectMap[Class[_], Registration]()
-
-  def register[T](clazz: Class[T], serializer: TaskMessageSerializer[T]): Unit 
= {
-    val registration = new Registration(classId, clazz, serializer)
-    idToRegistration.put(classId, registration)
-    classToRegistration.put(clazz, registration)
-    classId += 1
-  }
-
-  def getRegistration(clazz: Class[_]): Registration = {
-    classToRegistration.get(clazz)
-  }
-
-  def getRegistration(clazzId: Int): Registration = {
-    idToRegistration.get(clazzId)
-  }
-}
-
-object SerializerResolver {
-  class Registration(val id: Int, val clazz: Class[_], val serializer: 
TaskMessageSerializer[_])
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/task/StartTime.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/task/StartTime.scala 
b/streaming/src/main/scala/io/gearpump/streaming/task/StartTime.scala
deleted file mode 100644
index 6bc8b15..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/task/StartTime.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.task
-
-import io.gearpump.TimeStamp
-
-/** Start time of streaming application. All message older than start time 
will be dropped */
-case class StartTime(startTime: TimeStamp = 0)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/task/StreamingTransportSerializer.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/task/StreamingTransportSerializer.scala
 
b/streaming/src/main/scala/io/gearpump/streaming/task/StreamingTransportSerializer.scala
deleted file mode 100644
index 17d0b1b..0000000
--- 
a/streaming/src/main/scala/io/gearpump/streaming/task/StreamingTransportSerializer.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.task
-
-import java.io.{DataInput, DataOutput}
-
-import org.slf4j.Logger
-
-import io.gearpump.streaming.{AckRequestSerializer, AckSerializer, 
InitialAckRequestSerializer, LatencyProbeSerializer}
-import io.gearpump.transport.netty.ITransportMessageSerializer
-import io.gearpump.util.LogUtil
-
-class StreamingTransportSerializer extends ITransportMessageSerializer {
-  private val log: Logger = LogUtil.getLogger(getClass)
-  private val serializers = new SerializerResolver
-
-  serializers.register(classOf[Ack], new AckSerializer)
-  serializers.register(classOf[AckRequest], new AckRequestSerializer)
-  serializers.register(classOf[InitialAckRequest], new 
InitialAckRequestSerializer)
-  serializers.register(classOf[LatencyProbe], new LatencyProbeSerializer)
-  serializers.register(classOf[SerializedMessage], new 
SerializedMessageSerializer)
-
-  override def serialize(dataOutput: DataOutput, obj: Object): Unit = {
-    val registration = serializers.getRegistration(obj.getClass)
-    if (registration != null) {
-      dataOutput.writeInt(registration.id)
-      
registration.serializer.asInstanceOf[TaskMessageSerializer[AnyRef]].write(dataOutput,
 obj)
-    } else {
-      log.error(s"Can not find serializer for class type ${obj.getClass}")
-    }
-  }
-
-  override def deserialize(dataInput: DataInput, length: Int): Object = {
-    val classID = dataInput.readInt()
-    val registration = serializers.getRegistration(classID)
-    if (registration != null) {
-      
registration.serializer.asInstanceOf[TaskMessageSerializer[AnyRef]].read(dataInput)
-    } else {
-      log.error(s"Can not find serializer for class id $classID")
-      null
-    }
-  }
-
-  override def getLength(obj: Object): Int = {
-    val registration = serializers.getRegistration(obj.getClass)
-    if (registration != null) {
-      
registration.serializer.asInstanceOf[TaskMessageSerializer[AnyRef]].getLength(obj)
 + 4
-    } else {
-      log.error(s"Can not find serializer for class type ${obj.getClass}")
-      0
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/task/Subscriber.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/task/Subscriber.scala 
b/streaming/src/main/scala/io/gearpump/streaming/task/Subscriber.scala
deleted file mode 100644
index d20074b..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/task/Subscriber.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.task
-
-import io.gearpump.partitioner.PartitionerDescription
-import io.gearpump.streaming.{DAG, LifeTime}
-
-/**
- * Each processor can have multiple downstream subscribers.
- *
- * For example: When processor A subscribe to processor B, then the output of 
B will be
- * pushed to processor A.
- *
- * @param processorId subscriber processor Id
- * @param partitionerDescription subscriber partitioner
- */
-case class Subscriber(processorId: Int, partitionerDescription: 
PartitionerDescription,
-    parallelism: Int, lifeTime: LifeTime)
-
-object Subscriber {
-
-  /**
-   *
-   * List subscriptions of a processor.
-   * The topology information is retrieved from dag
-   *
-   * @param processorId   the processor to list
-   * @param dag     the DAG
-   * @return   the subscribers of this processor
-   */
-  def of(processorId: Int, dag: DAG): List[Subscriber] = {
-    val edges = dag.graph.outgoingEdgesOf(processorId)
-
-    edges.foldLeft(List.empty[Subscriber]) { (list, nodeEdgeNode) =>
-      val (_, partitioner, downstreamProcessorId) = nodeEdgeNode
-      val downstreamProcessor = dag.processors(downstreamProcessorId)
-      list :+ Subscriber(downstreamProcessorId, partitioner,
-        downstreamProcessor.parallelism, downstreamProcessor.life)
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/task/Subscription.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/task/Subscription.scala 
b/streaming/src/main/scala/io/gearpump/streaming/task/Subscription.scala
deleted file mode 100644
index 155edf4..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/task/Subscription.scala
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.task
-
-import org.slf4j.Logger
-
-import io.gearpump.google.common.primitives.Shorts
-import io.gearpump.partitioner.{MulticastPartitioner, Partitioner, 
UnicastPartitioner}
-import io.gearpump.streaming.AppMasterToExecutor.MsgLostException
-import io.gearpump.streaming.LifeTime
-import io.gearpump.streaming.task.Subscription._
-import io.gearpump.util.LogUtil
-import io.gearpump.{Message, TimeStamp}
-
-/**
- * Manges the output and message clock for single downstream processor
- *
- * @param subscriber downstream processor
- * @param maxPendingMessageCount trigger flow control. Should be bigger than
- *                               maxPendingMessageCountPerAckRequest
- * @param ackOnceEveryMessageCount send on AckRequest to the target
- */
-class Subscription(
-    appId: Int,
-    executorId: Int,
-    taskId: TaskId,
-    subscriber: Subscriber, sessionId: Int,
-    transport: ExpressTransport,
-    maxPendingMessageCount: Int = MAX_PENDING_MESSAGE_COUNT,
-    ackOnceEveryMessageCount: Int = ONE_ACKREQUEST_EVERY_MESSAGE_COUNT) {
-
-  assert(maxPendingMessageCount >= ackOnceEveryMessageCount)
-  assert(maxPendingMessageCount < Short.MaxValue / 2)
-
-  private val LOG: Logger = LogUtil.getLogger(getClass, app = appId,
-    executor = executorId, task = taskId)
-
-  import subscriber.{parallelism, partitionerDescription, processorId}
-
-  // Don't worry if this store negative number. We will wrap the Short
-  private val messageCount: Array[Short] = new Array[Short](parallelism)
-  private val pendingMessageCount: Array[Short] = new Array[Short](parallelism)
-  private val candidateMinClockSince: Array[Short] = new 
Array[Short](parallelism)
-
-  private val minClockValue: Array[TimeStamp] = 
Array.fill(parallelism)(Long.MaxValue)
-  private val candidateMinClock: Array[TimeStamp] = 
Array.fill(parallelism)(Long.MaxValue)
-
-  private var maxPendingCount: Short = 0
-
-  private var life = subscriber.lifeTime
-
-  private val partitioner = 
partitionerDescription.partitionerFactory.partitioner
-  private val sendFn = partitioner match {
-    case up: UnicastPartitioner =>
-      (msg: Message) => {
-        val partition = up.getPartition(msg, parallelism, taskId.index)
-        sendMessage(msg, partition)
-      }
-    case mp: MulticastPartitioner =>
-      (msg: Message) => {
-        val partitions = mp.getPartitions(msg, parallelism, taskId.index)
-        partitions.map(partition => sendMessage(msg, partition)).sum
-      }
-  }
-
-  def changeLife(life: LifeTime): Unit = {
-    this.life = life
-  }
-
-  def start(): Unit = {
-    val ackRequest = InitialAckRequest(taskId, sessionId)
-    transport.transport(ackRequest, allTasks: _*)
-  }
-
-  def sendMessage(msg: Message): Int = {
-    sendFn(msg)
-  }
-
-  /**
-   * Returns how many message is actually sent by this subscription
-   *
-   * @param msg  the message to send
-   * @param partition  the target partition to send message to
-   * @return 1 if success
-   */
-  def sendMessage(msg: Message, partition: Int): Int = {
-
-    var count = 0
-    // Only sends message whose timestamp matches the lifeTime
-    if (partition != Partitioner.UNKNOWN_PARTITION_ID && 
life.contains(msg.timestamp)) {
-
-      val targetTask = TaskId(processorId, partition)
-      transport.transport(msg, targetTask)
-
-      this.minClockValue(partition) = Math.min(this.minClockValue(partition), 
msg.timestamp)
-      this.candidateMinClock(partition) = 
Math.min(this.candidateMinClock(partition), msg.timestamp)
-
-      incrementMessageCount(partition, 1)
-
-      if (messageCount(partition) % ackOnceEveryMessageCount == 0) {
-        sendAckRequest(partition)
-      }
-
-      if (messageCount(partition) / maxPendingMessageCount !=
-        (messageCount(partition) + ackOnceEveryMessageCount) / 
maxPendingMessageCount) {
-        sendLatencyProbe(partition)
-      }
-      count = 1
-      count
-    } else {
-      if (needFlush) {
-        flush()
-      }
-      count = 0
-      count
-    }
-  }
-
-  private var lastFlushTime: Long = 0L
-  private val FLUSH_INTERVAL = 5 * 1000 // ms
-  private def needFlush: Boolean = {
-    System.currentTimeMillis() - lastFlushTime > FLUSH_INTERVAL &&
-      Shorts.max(pendingMessageCount: _*) > 0
-  }
-
-  private def flush(): Unit = {
-    lastFlushTime = System.currentTimeMillis()
-    allTasks.foreach { targetTaskId =>
-      sendAckRequest(targetTaskId.index)
-    }
-  }
-
-  private def allTasks: scala.collection.Seq[TaskId] = {
-    (0 until parallelism).map { taskIndex =>
-      TaskId(processorId, taskIndex)
-    }
-  }
-
-  /**
-   * Handles acknowledge message. Throw MessageLossException if required.
-   *
-   * @param ack acknowledge message received
-   */
-  def receiveAck(ack: Ack): Unit = {
-
-    val index = ack.taskId.index
-
-    if (ack.sessionId == sessionId) {
-      if (ack.actualReceivedNum == ack.seq) {
-        if ((ack.seq - candidateMinClockSince(index)).toShort >= 0) {
-          if (ack.seq == messageCount(index)) {
-            // All messages have been acked.
-            minClockValue(index) = Long.MaxValue
-          } else {
-            minClockValue(index) = candidateMinClock(index)
-          }
-          candidateMinClock(index) = Long.MaxValue
-          candidateMinClockSince(index) = messageCount(index)
-        }
-
-        pendingMessageCount(ack.taskId.index) = 
(messageCount(ack.taskId.index) - ack.seq).toShort
-        updateMaxPendingCount()
-      } else {
-        LOG.error(s"Failed! received ack: $ack, received: 
${ack.actualReceivedNum}, " +
-          s"sent: ${ack.seq}, try to replay...")
-        throw new MsgLostException
-      }
-    }
-  }
-
-  def minClock: TimeStamp = {
-    minClockValue.min
-  }
-
-  def allowSendingMoreMessages(): Boolean = {
-    maxPendingCount < maxPendingMessageCount
-  }
-
-  def sendAckRequestOnStallingTime(stallingTime: TimeStamp): Unit = {
-    minClockValue.indices.foreach { i =>
-      if (minClockValue(i) == stallingTime && pendingMessageCount(i) > 0
-        && allowSendingMoreMessages) {
-        sendAckRequest(i)
-        sendLatencyProbe(i)
-      }
-    }
-  }
-
-  private def sendAckRequest(partition: Int): Unit = {
-    // Increments more count for each AckRequest
-    // to throttle the number of unacked AckRequest
-    incrementMessageCount(partition, ackOnceEveryMessageCount)
-    val targetTask = TaskId(processorId, partition)
-    val ackRequest = AckRequest(taskId, messageCount(partition), sessionId)
-    transport.transport(ackRequest, targetTask)
-  }
-
-  private def incrementMessageCount(partition: Int, count: Int): Unit = {
-    messageCount(partition) = (messageCount(partition) + count).toShort
-    pendingMessageCount(partition) = (pendingMessageCount(partition) + 
count).toShort
-    updateMaxPendingCount()
-  }
-
-  private def updateMaxPendingCount(): Unit = {
-    maxPendingCount = Shorts.max(pendingMessageCount: _*)
-  }
-
-  private def sendLatencyProbe(partition: Int): Unit = {
-    val probeLatency = LatencyProbe(System.currentTimeMillis())
-    val targetTask = TaskId(processorId, partition)
-    transport.transport(probeLatency, targetTask)
-  }
-}
-
-object Subscription {
-  // Makes sure it is smaller than MAX_PENDING_MESSAGE_COUNT
-  final val ONE_ACKREQUEST_EVERY_MESSAGE_COUNT = 100
-  final val MAX_PENDING_MESSAGE_COUNT = 1000
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/task/Task.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/Task.scala 
b/streaming/src/main/scala/io/gearpump/streaming/task/Task.scala
deleted file mode 100644
index 212a659..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/task/Task.scala
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.task
-
-import scala.concurrent.duration.FiniteDuration
-
-import akka.actor.Actor.Receive
-import akka.actor.{ActorRef, ActorSystem, Cancellable, Props}
-import org.slf4j.Logger
-
-import io.gearpump.cluster.UserConfig
-import io.gearpump.util.LogUtil
-import io.gearpump.{Message, TimeStamp}
-
-/**
- * This provides context information for a task.
- */
-trait TaskContext {
-
-  def taskId: TaskId
-
-  def executorId: Int
-
-  def appId: Int
-
-  def appName: String
-
-  /**
-   * The actorRef of AppMaster
-   * @return application master's actor reference
-   */
-  def appMaster: ActorRef
-
-  /**
-   * The task parallelism
-   *
-   * For example, we can create 3 source tasks, and 3 sink tasks,
-   * the task parallelism is 3 for each.
-   *
-   * This can be useful when reading from partitioned data source.
-   * For example, for kafka, there may be 10 partitions, if we have
-   * parallelism of 2 for this task, then each task will be responsible
-   * to read data from 5 partitions.
-   *
-   * @return  the parallelism level
-   */
-  def parallelism: Int
-
-  /**
-   * Please don't use this if possible.
-   * @return  self actor ref
-   */
-  // TODO: We should remove the self from TaskContext
-  def self: ActorRef
-
-  /**
-   * Please don't use this if possible
-   * @return the actor system
-   */
-  // TODO: we should remove this in future
-  def system: ActorSystem
-
-  /**
-   * This can be used to output messages to downstream tasks. The data 
shuffling rule
-   * can be decided by Partitioner.
-   *
-   * @param msg message to output
-   */
-  def output(msg: Message): Unit
-
-  def actorOf(props: Props): ActorRef
-
-  def actorOf(props: Props, name: String): ActorRef
-
-  def schedule(initialDelay: FiniteDuration, interval: FiniteDuration)(f: => 
Unit): Cancellable
-
-  /**
-   * akka.actor.ActorRefProvider.scheduleOnce
-   *
-   * @param initialDelay  the initial delay
-   * @param f  the function to execute after initial delay
-   * @return the executable
-   */
-  def scheduleOnce(initialDelay: FiniteDuration)(f: => Unit): Cancellable
-
-  /**
-   * For managed message(type of Message), the sender only serve as a unique 
Id,
-   * It's address is not something meaningful, you should not use this directly
-   *
-   * For unmanaged message, the sender represent the sender ActorRef
-   * @return sender
-   */
-  def sender: ActorRef
-
-  /**
-   * Retrieves upstream min clock from TaskActor
-   *
-   * @return the min clock
-   */
-  def upstreamMinClock: TimeStamp
-
-  /**
-   * Logger is environment dependant, it should be provided by
-   * containing environment.
-   */
-  def logger: Logger
-}
-
-/**
- * Streaming Task interface
- */
-trait TaskInterface {
-
-  /**
-   * Method called with the task is initialized.
-   * @param startTime startTime that can be used to decide from when a source 
producer task should
-   *                  replay the data source, or from when a processor task 
should recover its
-   *                  checkpoint data in to in-memory state.
-   */
-  def onStart(startTime: StartTime): Unit
-
-  /**
-   * Method called for each message received.
-   *
-   * @param msg Message send by upstream tasks
-   */
-  def onNext(msg: Message): Unit
-
-  /**
-   * Method called when task is under clean up.
-   *
-   * This can be used to cleanup resource when the application finished.
-   */
-  def onStop(): Unit
-
-  /**
-   * Handlers unmanaged messages
-   *
-   * @return the handler
-   */
-  def receiveUnManagedMessage: Receive = null
-}
-
-abstract class Task(taskContext: TaskContext, userConf: UserConfig) extends 
TaskInterface {
-
-  import taskContext.{appId, executorId, taskId}
-
-  val LOG: Logger = LogUtil.getLogger(getClass, app = appId, executor = 
executorId, task = taskId)
-
-  protected implicit val system = taskContext.system
-
-  implicit val self = taskContext.self
-
-  /**
-   * For managed message(type of Message), the sender mean nothing,
-   * you should not use this directory
-   *
-   * For unmanaged message, the sender represent the sender actor
-   * @return the sender
-   */
-  protected def sender: ActorRef = taskContext.sender
-
-  def onStart(startTime: StartTime): Unit = {}
-
-  def onNext(msg: Message): Unit = {}
-
-  def onStop(): Unit = {}
-
-  override def receiveUnManagedMessage: Receive = {
-    case msg =>
-      LOG.error("Failed! Received unknown message " + "taskId: " + taskId + ", 
" + msg.toString)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/task/TaskActor.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/task/TaskActor.scala 
b/streaming/src/main/scala/io/gearpump/streaming/task/TaskActor.scala
deleted file mode 100644
index b9b8829..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/task/TaskActor.scala
+++ /dev/null
@@ -1,397 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.task
-
-import java.util
-import java.util.concurrent.TimeUnit
-
-import akka.actor._
-import org.slf4j.Logger
-
-import io.gearpump.cluster.UserConfig
-import io.gearpump.gs.collections.impl.map.mutable.primitive.IntShortHashMap
-import io.gearpump.metrics.Metrics
-import io.gearpump.serializer.SerializationFramework
-import io.gearpump.streaming.AppMasterToExecutor._
-import io.gearpump.streaming.ExecutorToAppMaster._
-import io.gearpump.streaming.{Constants, ProcessorId}
-import io.gearpump.util.{LogUtil, TimeOutScheduler}
-import io.gearpump.{Message, TimeStamp}
-
-/**
- *
- * All tasks of Gearpump runs inside a Actor. TaskActor is the Actor container 
for a task.
- */
-class TaskActor(
-    val taskId: TaskId,
-    val taskContextData: TaskContextData,
-    userConf: UserConfig,
-    val task: TaskWrapper,
-    inputSerializerPool: SerializationFramework)
-    extends Actor with ExpressTransport with TimeOutScheduler {
-  var upstreamMinClock: TimeStamp = 0L
-  private var _minClock: TimeStamp = 0L
-
-  def serializerPool: SerializationFramework = inputSerializerPool
-
-  import taskContextData._
-
-  import io.gearpump.streaming.Constants._
-  import io.gearpump.streaming.task.TaskActor._
-  val config = context.system.settings.config
-
-  val LOG: Logger = LogUtil.getLogger(getClass, app = appId, executor = 
executorId, task = taskId)
-
-  // Metrics
-  private val metricName = 
s"app${appId}.processor${taskId.processorId}.task${taskId.index}"
-  private val receiveLatency = Metrics(context.system).histogram(
-    s"$metricName:receiveLatency", sampleRate = 1)
-  private val processTime = 
Metrics(context.system).histogram(s"$metricName:processTime")
-  private val sendThroughput = 
Metrics(context.system).meter(s"$metricName:sendThroughput")
-  private val receiveThroughput = 
Metrics(context.system).meter(s"$metricName:receiveThroughput")
-
-  private val maxPendingMessageCount = 
config.getInt(GEARPUMP_STREAMING_MAX_PENDING_MESSAGE_COUNT)
-  private val ackOnceEveryMessageCount = config.getInt(
-    GEARPUMP_STREAMING_ACK_ONCE_EVERY_MESSAGE_COUNT)
-
-  private val executor = context.parent
-  private var life = taskContextData.life
-
-  // Latency probe
-  import scala.concurrent.duration._
-
-  import context.dispatcher
-  final val LATENCY_PROBE_INTERVAL = FiniteDuration(1, TimeUnit.SECONDS)
-
-  // Clock report interval
-  final val CLOCK_REPORT_INTERVAL = FiniteDuration(1, TimeUnit.SECONDS)
-
-  // Flush interval
-  final val FLUSH_INTERVAL = FiniteDuration(100, TimeUnit.MILLISECONDS)
-
-  private val queue = new util.LinkedList[AnyRef]()
-
-  private var subscriptions = List.empty[(Int, Subscription)]
-
-  // SecurityChecker will be responsible of dropping messages from
-  // unknown sources
-  private val securityChecker = new SecurityChecker(taskId, self)
-  private[task] var sessionId = NONE_SESSION
-
-  // Reports to appMaster with my address
-  express.registerLocalActor(TaskId.toLong(taskId), self)
-
-  final def receive: Receive = null
-
-  task.setTaskActor(this)
-
-  def onStart(startTime: StartTime): Unit = {
-    task.onStart(startTime)
-  }
-
-  def onNext(msg: Message): Unit = task.onNext(msg)
-
-  def onUnManagedMessage(msg: Any): Unit = 
task.receiveUnManagedMessage.apply(msg)
-
-  def onStop(): Unit = task.onStop()
-
-  /**
-   * output to a downstream by specifying a arrayIndex
-   * @param arrayIndex this is not same as ProcessorId
-   */
-  def output(arrayIndex: Int, msg: Message): Unit = {
-    var count = 0
-    count += this.subscriptions(arrayIndex)._2.sendMessage(msg)
-    sendThroughput.mark(count)
-  }
-
-  def output(msg: Message): Unit = {
-    var count = 0
-    this.subscriptions.foreach { subscription =>
-      count += subscription._2.sendMessage(msg)
-    }
-    sendThroughput.mark(count)
-  }
-
-  final override def postStop(): Unit = {
-    onStop()
-  }
-
-  final override def preStart(): Unit = {
-    val register = RegisterTask(taskId, executorId, local)
-    LOG.info(s"$register")
-    executor ! register
-    context.become(waitForTaskRegistered)
-  }
-
-  private def allowSendingMoreMessages(): Boolean = {
-    subscriptions.forall(_._2.allowSendingMoreMessages())
-  }
-
-  private def doHandleMessage(): Unit = {
-    var done = false
-
-    var count = 0
-    val start = System.currentTimeMillis()
-
-    while (allowSendingMoreMessages() && !done) {
-      val msg = queue.poll()
-      if (msg != null) {
-        msg match {
-          case SendAck(ack, targetTask) =>
-            transport(ack, targetTask)
-          case m: Message =>
-            count += 1
-            onNext(m)
-          case other =>
-            // un-managed message
-            onUnManagedMessage(other)
-        }
-      } else {
-        done = true
-      }
-    }
-
-    receiveThroughput.mark(count)
-    if (count > 0) {
-      processTime.update((System.currentTimeMillis() - start) / count)
-    }
-  }
-
-  private def onStartClock(): Unit = {
-    LOG.info(s"received start, clock: $upstreamMinClock, sessionId: 
$sessionId")
-    subscriptions = subscribers.map { subscriber =>
-      (subscriber.processorId,
-        new Subscription(appId, executorId, taskId, subscriber, sessionId, 
this,
-          maxPendingMessageCount, ackOnceEveryMessageCount))
-    }.sortBy(_._1)
-
-    subscriptions.foreach(_._2.start())
-
-    import scala.collection.JavaConverters._
-    stashQueue.asScala.foreach { item =>
-      handleMessages(item.sender).apply(item.msg)
-    }
-    stashQueue.clear()
-
-    // Put this as the last step so that the subscription is already 
initialized.
-    // Message sending in current Task before onStart will not be delivered to
-    // target
-    onStart(new StartTime(upstreamMinClock))
-
-    appMaster ! GetUpstreamMinClock(taskId)
-    context.become(handleMessages(sender))
-  }
-
-  def waitForTaskRegistered: Receive = {
-    case start@TaskRegistered(_, sessionId, startClock) =>
-      this.sessionId = sessionId
-      this.upstreamMinClock = startClock
-      context.become(waitForStartClock)
-  }
-
-  private val stashQueue = new util.LinkedList[MessageAndSender]()
-
-  def waitForStartClock: Receive = {
-    case start: StartTask =>
-      onStartClock()
-    case other: AnyRef =>
-      stashQueue.add(MessageAndSender(other, sender()))
-  }
-
-  def handleMessages(sender: => ActorRef): Receive = {
-    case ackRequest: InitialAckRequest =>
-      val ackResponse = securityChecker.handleInitialAckRequest(ackRequest)
-      if (null != ackResponse) {
-        queue.add(SendAck(ackResponse, ackRequest.taskId))
-        doHandleMessage()
-      }
-    case ackRequest: AckRequest =>
-      // Enqueue to handle the ackRequest and send back ack later
-      val ackResponse = securityChecker.generateAckResponse(ackRequest, sender,
-        ackOnceEveryMessageCount)
-      if (null != ackResponse) {
-        queue.add(SendAck(ackResponse, ackRequest.taskId))
-        doHandleMessage()
-      }
-    case ack: Ack =>
-      subscriptions.find(_._1 == 
ack.taskId.processorId).foreach(_._2.receiveAck(ack))
-      doHandleMessage()
-    case inputMessage: SerializedMessage =>
-      val message = 
Message(serializerPool.get().deserialize(inputMessage.bytes),
-        inputMessage.timeStamp)
-      receiveMessage(message, sender)
-    case inputMessage: Message =>
-      receiveMessage(inputMessage, sender)
-    case upstream@UpstreamMinClock(upstreamClock) =>
-      this.upstreamMinClock = upstreamClock
-
-      val subMinClock = subscriptions.foldLeft(Long.MaxValue) { (min, sub) =>
-        val subMin = sub._2.minClock
-        // A subscription is holding back the _minClock;
-        // we send AckRequest to its tasks to push _minClock forward
-        if (subMin == _minClock) {
-          sub._2.sendAckRequestOnStallingTime(_minClock)
-        }
-        Math.min(min, subMin)
-      }
-
-      _minClock = Math.max(life.birth, Math.min(upstreamMinClock, subMinClock))
-
-      val update = UpdateClock(taskId, _minClock)
-      context.system.scheduler.scheduleOnce(CLOCK_REPORT_INTERVAL) {
-        appMaster ! update
-      }
-
-      // Checks whether current task is dead.
-      if (_minClock > life.death) {
-        // There will be no more message received...
-        val unRegister = UnRegisterTask(taskId, executorId)
-        executor ! unRegister
-
-        LOG.info(s"Sending $unRegister, current minclock: ${_minClock}, life: 
$life")
-      }
-
-    case ChangeTask(_, dagVersion, life, subscribers) =>
-      this.life = life
-      subscribers.foreach { subscriber =>
-        val processorId = subscriber.processorId
-        val subscription = getSubscription(processorId)
-        subscription match {
-          case Some(subscription) =>
-            subscription.changeLife(subscriber.lifeTime cross this.life)
-          case None =>
-            val subscription = new Subscription(appId, executorId, taskId, 
subscriber,
-              sessionId, this, maxPendingMessageCount, 
ackOnceEveryMessageCount)
-            subscription.start()
-            subscriptions :+=(subscriber.processorId, subscription)
-            // Sorting, keep the order
-            subscriptions = subscriptions.sortBy(_._1)
-        }
-      }
-      sender ! TaskChanged(taskId, dagVersion)
-    case LatencyProbe(timeStamp) =>
-      receiveLatency.update(System.currentTimeMillis() - timeStamp)
-    case send: SendMessageLoss =>
-      LOG.info("received SendMessageLoss")
-      throw new MsgLostException
-    case other: AnyRef =>
-      queue.add(other)
-      doHandleMessage()
-  }
-
-  /**
-   * Returns min clock of this task
-   */
-  def minClock: TimeStamp = _minClock
-
-  /**
-   * Returns min clock of upstream task
-   */
-  def getUpstreamMinClock: TimeStamp = upstreamMinClock
-
-  private def receiveMessage(msg: Message, sender: ActorRef): Unit = {
-    val messageAfterCheck = securityChecker.checkMessage(msg, sender)
-    messageAfterCheck match {
-      case Some(msg) =>
-        queue.add(msg)
-        doHandleMessage()
-      case None =>
-      // TODO: Indicate the error and avoid the LOG flood
-      // LOG.error(s"Task $taskId drop message $msg")
-    }
-  }
-
-  private def getSubscription(processorId: ProcessorId): Option[Subscription] 
= {
-    subscriptions.find(_._1 == processorId).map(_._2)
-  }
-}
-
-object TaskActor {
-  // 3 seconds
-  val CLOCK_SYNC_TIMEOUT_INTERVAL = 3 * 1000
-
-  // If the message comes from an unknown source, securityChecker will drop it
-  class SecurityChecker(task_id: TaskId, self: ActorRef) {
-
-    private val LOG: Logger = LogUtil.getLogger(getClass, task = task_id)
-
-    // Uses mutable HashMap for performance optimization
-    private val receivedMsgCount = new IntShortHashMap()
-
-    // Tricky performance optimization to save memory.
-    // We store the session Id in the uid of ActorPath
-    // ActorPath.hashCode is same as uid.
-    private def getSessionId(actor: ActorRef): Int = {
-      // TODO: As method uid is protected in [akka] package. We
-      // are using hashCode instead of uid.
-      actor.hashCode()
-    }
-
-    def handleInitialAckRequest(ackRequest: InitialAckRequest): Ack = {
-      LOG.debug(s"Handle InitialAckRequest for session $ackRequest")
-      val sessionId = ackRequest.sessionId
-      if (sessionId == NONE_SESSION) {
-        LOG.error(s"SessionId is not initialized, ackRequest: $ackRequest")
-        null
-      } else {
-        receivedMsgCount.put(sessionId, 0)
-        Ack(task_id, 0, 0, sessionId)
-      }
-    }
-
-    def generateAckResponse(ackRequest: AckRequest, sender: ActorRef, 
incrementCount: Int): Ack = {
-      val sessionId = ackRequest.sessionId
-      if (receivedMsgCount.containsKey(sessionId)) {
-        // Increments more count for each AckRequest
-        // to throttle the number of unacked AckRequest
-        receivedMsgCount.put(sessionId, (receivedMsgCount.get(sessionId) + 
incrementCount).toShort)
-        Ack(task_id, ackRequest.seq, receivedMsgCount.get(sessionId), 
ackRequest.sessionId)
-      } else {
-        LOG.error(s"get unknown AckRequest $ackRequest from 
${sender.toString()}")
-        null
-      }
-    }
-
-    // If the message comes from an unknown source, then drop it
-    def checkMessage(message: Message, sender: ActorRef): Option[Message] = {
-      if (sender.equals(self)) {
-        Some(message)
-      } else {
-        val sessionId = getSessionId(sender)
-        if (receivedMsgCount.containsKey(sessionId)) {
-          receivedMsgCount.put(sessionId, (receivedMsgCount.get(sessionId) + 
1).toShort)
-          Some(message)
-        } else {
-          // This is an illegal message,
-          LOG.debug(s"received message before receive the first AckRequest, 
session $sessionId")
-          None
-        }
-      }
-    }
-  }
-
-  case class SendAck(ack: Ack, targetTask: TaskId)
-
-  case object FLUSH
-
-  val NONE_SESSION = -1
-
-  case class MessageAndSender(msg: AnyRef, sender: ActorRef)
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/main/scala/io/gearpump/streaming/task/TaskContextData.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/task/TaskContextData.scala 
b/streaming/src/main/scala/io/gearpump/streaming/task/TaskContextData.scala
deleted file mode 100644
index 28605cf..0000000
--- a/streaming/src/main/scala/io/gearpump/streaming/task/TaskContextData.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.task
-
-import akka.actor.ActorRef
-
-import io.gearpump.streaming.LifeTime
-
-case class TaskContextData(
-    executorId: Int,
-    appId: Int,
-    appName: String,
-    appMaster: ActorRef,
-    parallelism: Int,
-    life: LifeTime,
-    subscribers: List[Subscriber])

Reply via email to