Repository: incubator-gearpump
Updated Branches:
  refs/heads/master 9bb9ca5d8 -> 83b36ef76


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala
new file mode 100644
index 0000000..dd286de
--- /dev/null
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.dsl
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.cluster.TestUtil
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.streaming.dsl.plan.OpTranslator.SourceTask
+import org.mockito.Mockito.when
+import org.scalatest._
+import org.scalatest.mock.MockitoSugar
+
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+class StreamAppSpec extends FlatSpec with Matchers with BeforeAndAfterAll with 
MockitoSugar {
+
+  implicit var system: ActorSystem = null
+
+  override def beforeAll(): Unit = {
+    system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
+  }
+
+  override def afterAll(): Unit = {
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
+  }
+
+  it should "be able to generate multiple new streams" in {
+    val context: ClientContext = mock[ClientContext]
+    when(context.system).thenReturn(system)
+
+    val app = StreamApp("dsl", context)
+    app.source(List("A"), 1, "")
+    app.source(List("B"), 1, "")
+
+    assert(app.graph.vertices.size == 2)
+  }
+
+  it should "plan the dsl to Processsor(TaskDescription) DAG" in {
+    val context: ClientContext = mock[ClientContext]
+    when(context.system).thenReturn(system)
+
+    val app = StreamApp("dsl", context)
+    val parallism = 3
+    app.source(List("A", "B", "C"), parallism, "").flatMap(Array(_)).reduce(_ 
+ _)
+    val task = app.plan.dag.vertices.iterator.next()
+    assert(task.taskClass == classOf[SourceTask[_, _]].getName)
+    assert(task.parallelism == parallism)
+  }
+
+  it should "produce 3 messages" in {
+    val context: ClientContext = mock[ClientContext]
+    when(context.system).thenReturn(system)
+    val app = StreamApp("dsl", context)
+    val list = List[String](
+      "0",
+      "1",
+      "2"
+    )
+    val producer = app.source(list, 1, "producer").flatMap(Array(_)).reduce(_ 
+ _)
+    val task = app.plan.dag.vertices.iterator.next()
+      /*
+      val task = app.plan.dag.vertices.iterator.map(desc => {
+        LOG.info(s"${desc.taskClass}")
+      })
+      val sum = producer.flatMap(msg => {
+        LOG.info("in flatMap")
+        assert(msg.msg.isInstanceOf[String])
+        val num = msg.msg.asInstanceOf[String].toInt
+        Array(num)
+      }).reduce(_+_)
+      val task = app.plan.dag.vertices.iterator.map(desc => {
+        LOG.info(s"${desc.taskClass}")
+      })
+     */
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
new file mode 100644
index 0000000..6bdd8aa
--- /dev/null
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.dsl
+
+import akka.actor._
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.{TestUtil, UserConfig}
+import org.apache.gearpump.partitioner.{CoLocationPartitioner, HashPartitioner}
+import org.apache.gearpump.streaming.dsl.StreamSpec.Join
+import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner
+import org.apache.gearpump.streaming.dsl.plan.OpTranslator._
+import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.util.Graph
+import org.apache.gearpump.util.Graph._
+import org.mockito.Mockito.when
+import org.scalatest._
+import org.scalatest.mock.MockitoSugar
+
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+import scala.util.{Either, Left, Right}
+
+class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with 
MockitoSugar {
+
+  implicit var system: ActorSystem = null
+
+  override def beforeAll(): Unit = {
+    system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
+  }
+
+  override def afterAll(): Unit = {
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
+  }
+
+  it should "translate the DSL to a DAG" in {
+    val context: ClientContext = mock[ClientContext]
+    when(context.system).thenReturn(system)
+
+    val app = StreamApp("dsl", context)
+
+    val data =
+      """
+        five  four three  two    one
+        five  four three  two
+        five  four three
+        five  four
+        five
+      """
+    val stream = app.source(data.lines.toList, 1, "").
+      flatMap(line => line.split("[\\s]+")).filter(_.nonEmpty).
+      map(word => (word, 1)).
+      groupBy(_._1, parallelism = 2).
+      reduce((left, right) => (left._1, left._2 + right._2)).
+      map[Either[(String, Int), String]](Left(_))
+
+    val query = app.source(List("two"), 1, "").map[Either[(String, Int), 
String]](Right(_))
+    stream.merge(query).process[(String, Int)](classOf[Join], 1)
+
+    val appDescription = app.plan
+
+    val dagTopology = appDescription.dag.mapVertex(_.taskClass).mapEdge { 
(node1, edge, node2) =>
+      edge.partitionerFactory.partitioner.getClass.getName
+    }
+    val expectedDagTopology = getExpectedDagTopology
+
+    
assert(dagTopology.vertices.toSet.equals(expectedDagTopology.vertices.toSet))
+    assert(dagTopology.edges.toSet.equals(expectedDagTopology.edges.toSet))
+  }
+
+  private def getExpectedDagTopology: Graph[String, String] = {
+    val source = classOf[SourceTask[_, _]].getName
+    val group = classOf[GroupByTask[_, _, _]].getName
+    val merge = classOf[TransformTask[_, _]].getName
+    val join = classOf[Join].getName
+
+    val hash = classOf[HashPartitioner].getName
+    val groupBy = classOf[GroupByPartitioner[_, _]].getName
+    val colocation = classOf[CoLocationPartitioner].getName
+
+    val expectedDagTopology = Graph(
+      source ~ groupBy ~> group ~ colocation ~> merge ~ hash ~> join,
+      source ~ hash ~> merge
+    )
+    expectedDagTopology
+  }
+}
+
+object StreamSpec {
+
+  class Join(taskContext: TaskContext, userConf: UserConfig) extends 
Task(taskContext, userConf) {
+
+    var query: String = null
+    override def onStart(startTime: StartTime): Unit = {}
+
+    override def onNext(msg: Message): Unit = {
+      msg.msg match {
+        case Left(wordCount: (String @unchecked, Int @unchecked)) =>
+          if (query != null && wordCount._1 == query) {
+            taskContext.output(new Message(wordCount))
+          }
+
+        case Right(query: String) =>
+          this.query = query
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala
new file mode 100644
index 0000000..fcc646d
--- /dev/null
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.dsl.partitioner
+
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
+import org.apache.gearpump.Message
+import 
org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitionerSpec.People
+
+class GroupByPartitionerSpec extends FlatSpec with Matchers with 
BeforeAndAfterAll {
+  it should "use the outpout of groupBy function to do partition" in {
+    val mark = People("Mark", "male")
+    val tom = People("Tom", "male")
+    val michelle = People("Michelle", "female")
+
+    val partitionNum = 10
+    val groupBy = new GroupByPartitioner[People, String](_.gender)
+    assert(groupBy.getPartition(Message(mark), partitionNum)
+      == groupBy.getPartition(Message(tom), partitionNum))
+
+    assert(groupBy.getPartition(Message(mark), partitionNum)
+      != groupBy.getPartition(Message(michelle), partitionNum))
+  }
+}
+
+object GroupByPartitionerSpec {
+  case class People(name: String, gender: String)
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala
new file mode 100644
index 0000000..ecc5352
--- /dev/null
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.dsl.plan
+
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+import akka.actor.ActorSystem
+import org.mockito.ArgumentCaptor
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalatest._
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.{TestUtil, UserConfig}
+import org.apache.gearpump.streaming.Constants._
+import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.dsl.CollectionDataSource
+import org.apache.gearpump.streaming.dsl.plan.OpTranslator._
+import org.apache.gearpump.streaming.task.StartTime
+
+class OpTranslatorSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
+
+  "andThen" should "chain multiple single input function" in {
+    val dummy = new DummyInputFunction[String]
+    val split = new FlatMapFunction[String, String](line => line.split("\\s"), 
"split")
+
+    val filter = new FlatMapFunction[String, String](word =>
+      if (word.isEmpty) None else Some(word), "filter")
+
+    val map = new FlatMapFunction[String, Int](word => Some(1), "map")
+
+    val sum = new ReduceFunction[Int]({ (left, right) => left + right }, "sum")
+
+    val all = dummy.andThen(split).andThen(filter).andThen(map).andThen(sum)
+
+    assert(all.description == "split.filter.map.sum")
+
+    val data =
+      """
+      five  four three  two    one
+      five  four three  two
+      five  four three
+      five  four
+      five
+      """
+    val count = all.process(data).toList.last
+    assert(count == 15)
+  }
+
+  "Source" should "iterate over input source and apply attached operator" in {
+
+    val taskContext = MockUtil.mockTaskContext
+
+    val conf = UserConfig.empty
+    val data = "one two three".split("\\s")
+
+    // Source with no transformer
+    val source = new SourceTask[String, String](new 
CollectionDataSource[String](data), None,
+      taskContext, conf)
+    source.onStart(StartTime(0))
+    source.onNext(Message("next"))
+    verify(taskContext, times(1)).output(anyObject())
+
+    // Source with transformer
+    val anotherTaskContext = MockUtil.mockTaskContext
+    val double = new FlatMapFunction[String, String](word => List(word, word), 
"double")
+    val another = new SourceTask(new CollectionDataSource[String](data), 
Some(double),
+      anotherTaskContext, conf)
+    another.onStart(StartTime(0))
+    another.onNext(Message("next"))
+    verify(anotherTaskContext, times(2)).output(anyObject())
+  }
+
+  "GroupByTask" should "group input by groupBy Function and " +
+    "apply attached operator for each group" in {
+
+    val data = "1 2  2  3 3  3"
+
+    var map = Map.empty[String, Int]
+
+    val concat = new ReduceFunction[String]({ (left, right) =>
+      left + right
+    }, "concat")
+
+    implicit val system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
+    val config = UserConfig.empty.withValue[SingleInputFunction[String, 
String]](
+    GEARPUMP_STREAMING_OPERATOR, concat)
+
+    val taskContext = MockUtil.mockTaskContext
+
+    val task = new GroupByTask[String, String, String](input => input, 
taskContext, config)
+    task.onStart(StartTime(0))
+
+    val peopleCaptor = ArgumentCaptor.forClass(classOf[Message])
+
+    data.split("\\s+").foreach { word =>
+      task.onNext(Message(word))
+    }
+    verify(taskContext, times(6)).output(peopleCaptor.capture())
+
+    import scala.collection.JavaConverters._
+
+    val values = peopleCaptor.getAllValues().asScala.map(input => 
input.msg.asInstanceOf[String])
+    assert(values.mkString(",") == "1,2,22,3,33,333")
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
+  }
+
+  "MergeTask" should "accept two stream and apply the attached operator" in {
+
+    // Source with transformer
+    val taskContext = MockUtil.mockTaskContext
+    val conf = UserConfig.empty
+    val double = new FlatMapFunction[String, String](word => List(word, word), 
"double")
+    val task = new TransformTask[String, String](Some(double), taskContext, 
conf)
+    task.onStart(StartTime(0))
+
+    val data = "1 2  2  3 3  3".split("\\s+")
+
+    data.foreach { input =>
+      task.onNext(Message(input))
+    }
+
+    verify(taskContext, times(data.length * 2)).output(anyObject())
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/executor/ExecutorSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/executor/ExecutorSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/executor/ExecutorSpec.scala
new file mode 100644
index 0000000..4c4b37b
--- /dev/null
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/executor/ExecutorSpec.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.executor
+
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+import akka.actor.{ActorSystem, Props}
+import akka.testkit.TestProbe
+import org.mockito.Matchers._
+import org.mockito.Mockito.{times, _}
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
+import org.apache.gearpump.cluster.appmaster.WorkerInfo
+import org.apache.gearpump.cluster.scheduler.Resource
+import org.apache.gearpump.cluster.worker.WorkerId
+import org.apache.gearpump.cluster.{ExecutorContext, TestUtil, UserConfig}
+import org.apache.gearpump.streaming.AppMasterToExecutor._
+import org.apache.gearpump.streaming.ExecutorToAppMaster.RegisterTask
+import org.apache.gearpump.streaming.appmaster.TaskRegistry.TaskLocations
+import org.apache.gearpump.streaming.executor.TaskLauncherSpec.MockTask
+import org.apache.gearpump.streaming.task.{Subscriber, TaskId}
+import org.apache.gearpump.streaming.{LifeTime, ProcessorDescription}
+import org.apache.gearpump.transport.HostPort
+
+class ExecutorSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
+  val appId = 0
+  val executorId = 0
+  val workerId = WorkerId(0, 0L)
+  var appMaster: TestProbe = null
+  implicit var system: ActorSystem = null
+  val userConf = UserConfig.empty
+
+  override def beforeAll(): Unit = {
+    system = ActorSystem("TaskLauncherSpec", TestUtil.DEFAULT_CONFIG)
+    appMaster = TestProbe()
+  }
+
+  override def afterAll(): Unit = {
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
+  }
+
+  it should "call launcher to launch task" in {
+    val worker = TestProbe()
+    val workerInfo = WorkerInfo(workerId, worker.ref)
+    val executorContext = ExecutorContext(executorId, workerInfo, appId, "app",
+      appMaster.ref, Resource(2))
+    val taskLauncher = mock(classOf[ITaskLauncher])
+    val executor = system.actorOf(Props(new Executor(executorContext, 
userConf, taskLauncher)))
+    val processor = ProcessorDescription(id = 0, taskClass = 
classOf[MockTask].getName,
+      parallelism = 2)
+    val taskIds = List(TaskId(0, 0), TaskId(0, 1))
+    val launchTasks = LaunchTasks(taskIds, dagVersion = 0, processor, 
List.empty[Subscriber])
+
+    val task = TestProbe()
+    when(taskLauncher.launch(any(), any(), any(), any(), any()))
+      .thenReturn(taskIds.map((_, task.ref)).toMap)
+
+    val client = TestProbe()
+    client.send(executor, launchTasks)
+    client.expectMsg(TasksLaunched)
+
+    verify(taskLauncher, times(1)).launch(any(), any(), any(), any(), any())
+
+    executor ! RegisterTask(TaskId(0, 0), executorId, HostPort("localhost:80"))
+    executor ! RegisterTask(TaskId(0, 1), executorId, HostPort("localhost:80"))
+
+    executor ! TaskRegistered(TaskId(0, 0), 0, 0)
+
+    task.expectMsgType[TaskRegistered]
+
+    executor ! TaskRegistered(TaskId(0, 1), 0, 0)
+
+    task.expectMsgType[TaskRegistered]
+
+    executor ! TaskLocationsReady(TaskLocations(Map.empty), dagVersion = 0)
+    executor ! StartAllTasks(dagVersion = 0)
+
+    task.expectMsgType[StartTask]
+    task.expectMsgType[StartTask]
+
+    val changeTasks = ChangeTasks(taskIds, dagVersion = 1, life = LifeTime(0, 
Long.MaxValue),
+      List.empty[Subscriber])
+
+    client.send(executor, changeTasks)
+    client.expectMsgType[TasksChanged]
+
+    executor ! TaskLocationsReady(TaskLocations(Map.empty), 1)
+    executor ! StartAllTasks(dagVersion = 1)
+
+    task.expectMsgType[ChangeTask]
+    task.expectMsgType[ChangeTask]
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/executor/TaskArgumentStoreSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/executor/TaskArgumentStoreSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/executor/TaskArgumentStoreSpec.scala
new file mode 100644
index 0000000..bd45121
--- /dev/null
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/executor/TaskArgumentStoreSpec.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.executor
+
+import org.scalatest._
+
+import org.apache.gearpump.streaming.executor.Executor.TaskArgumentStore
+import org.apache.gearpump.streaming.executor.TaskLauncher.TaskArgument
+import org.apache.gearpump.streaming.task.TaskId
+
+class TaskArgumentStoreSpec extends FlatSpec with Matchers with 
BeforeAndAfterEach {
+  it should "retain all history of taskArgument" in {
+    val version0 = TaskArgument(0, null, null)
+    val version2 = version0.copy(dagVersion = 2)
+    val store = new TaskArgumentStore
+    val task = TaskId(0, 0)
+    store.add(task, version0)
+    store.add(task, version2)
+
+    // Should return a version which is same or older than expected version
+    assert(store.get(dagVersion = 1, task) == Some(version0))
+    assert(store.get(dagVersion = 0, task) == Some(version0))
+    assert(store.get(dagVersion = 2, task) == Some(version2))
+
+    store.removeObsoleteVersion()
+    assert(store.get(dagVersion = 1, task) == None)
+    assert(store.get(dagVersion = 0, task) == None)
+    assert(store.get(dagVersion = 2, task) == Some(version2))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/executor/TaskLauncherSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/executor/TaskLauncherSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/executor/TaskLauncherSpec.scala
new file mode 100644
index 0000000..074866a
--- /dev/null
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/executor/TaskLauncherSpec.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.executor
+
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
+import akka.actor.{Actor, ActorSystem}
+import akka.testkit.TestProbe
+import org.scalatest._
+
+import org.apache.gearpump.cluster.{TestUtil, UserConfig}
+import org.apache.gearpump.serializer.SerializationFramework
+import org.apache.gearpump.streaming.ProcessorDescription
+import org.apache.gearpump.streaming.executor.TaskLauncher.TaskArgument
+import org.apache.gearpump.streaming.executor.TaskLauncherSpec.{MockTask, 
MockTaskActor}
+import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskContextData, 
TaskId, TaskWrapper}
+
+class TaskLauncherSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
+  val appId = 0
+  val executorId = 0
+  var appMaster: TestProbe = null
+  implicit var system: ActorSystem = null
+  val userConf = UserConfig.empty
+
+  override def beforeAll(): Unit = {
+    system = ActorSystem("TaskLauncherSpec", TestUtil.DEFAULT_CONFIG)
+    appMaster = TestProbe()
+  }
+
+  override def afterAll(): Unit = {
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
+  }
+
+  it should "able to launch tasks" in {
+    val launcher = new TaskLauncher(appId, "app", executorId, appMaster.ref,
+      userConf, classOf[MockTaskActor])
+    val taskIds = List(TaskId(0, 0), TaskId(0, 1))
+    val processor = ProcessorDescription(id = 0, taskClass = 
classOf[MockTask].getName,
+      parallelism = 2)
+    val argument = TaskArgument(0, processor, null)
+
+    val tasks = launcher.launch(taskIds, argument, system, null,
+      "gearpump.shared-thread-pool-dispatcher")
+    tasks.keys.toSet shouldBe taskIds.toSet
+  }
+}
+
+object TaskLauncherSpec {
+  class MockTaskActor(
+      val taskId: TaskId,
+      val taskContextData : TaskContextData,
+      userConf : UserConfig,
+      val task: TaskWrapper,
+      serializer: SerializationFramework) extends Actor {
+    def receive: Receive = null
+  }
+
+  class MockTask(taskContext: TaskContext, userConf: UserConfig)
+    extends Task(taskContext, userConf) {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregatorSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregatorSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregatorSpec.scala
new file mode 100644
index 0000000..5804b00
--- /dev/null
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregatorSpec.scala
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.metrics
+
+import scala.collection.JavaConverters._
+import scala.util.Random
+
+import org.scalatest.{FlatSpec, Matchers}
+
+import org.apache.gearpump.cluster.ClientToMaster.ReadOption
+import org.apache.gearpump.cluster.MasterToClient.HistoryMetricsItem
+import org.apache.gearpump.metrics.Metrics.{Gauge, Histogram, Meter}
+import 
org.apache.gearpump.streaming.metrics.ProcessorAggregator.{AggregatorFactory, 
HistogramAggregator, MeterAggregator, MultiLayerMap}
+import org.apache.gearpump.streaming.task.TaskId
+import org.apache.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
+
+class ProcessorAggregatorSpec extends FlatSpec with Matchers {
+
+  "MultiLayerMap" should "maintain multiple layers HashMap" in {
+    val layers = 3
+    val map = new MultiLayerMap[String](layers)
+
+    assert(map.get(layer = 0, "key") == null)
+
+    // Illegal, handle safely
+    assert(map.get(layer = 10, "key") == null)
+
+    map.put(layer = 0, "key", "value")
+    assert(map.get(layer = 0, "key") == "value")
+
+    map.put(layer = 1, "key2", "value2")
+    map.put(layer = 2, "key3", "value3")
+    map.put(layer = 2, "key4", "value4")
+
+    // Illegal, should be ignored
+    map.put(layer = 4, "key5", "value5")
+
+    assert(map.size == 4)
+    assert(map.valueIterator.asScala.toSet == Set("value", "value2", "value3", 
"value4"))
+  }
+
+  "HistogramAggregator" should "aggregate by calculating average" in {
+    val aggregator = new HistogramAggregator("processor")
+
+    val a = Histogram("processor.task1", 1, 2, 3, 4, 5, 6)
+    val b = Histogram("processor.task2", 5, 6, 7, 8, 9, 10)
+    val expect = Histogram("processor.task2", 3, 4, 5, 6, 7, 8)
+
+    val olderTime = 100
+    val newerTime = 200
+
+    aggregator.aggregate(HistoryMetricsItem(time = newerTime, a))
+    aggregator.aggregate(HistoryMetricsItem(time = olderTime, b))
+
+    val result = aggregator.result
+
+    // Picks old time as aggregated time
+    assert(result.time == olderTime)
+
+    // Does average
+    val check = result.value.asInstanceOf[Histogram]
+
+    assert(check.mean - expect.mean < 0.01)
+    assert(check.stddev - expect.stddev < 0.01)
+    assert(check.median - expect.median < 0.01)
+    assert(check.p95 - expect.p95 < 0.01)
+    assert(check.p99 - expect.p99 < 0.01)
+    assert(check.p999 - expect.p999 < 0.01)
+  }
+
+  "MeterAggregator" should "aggregate by summing" in {
+    val aggregator = new MeterAggregator("processor")
+
+    val a = Meter("processor.task1", count = 1, 1, 3, "s")
+    val b = Meter("processor.task2", count = 2, 5, 7, "s")
+    val expect = Meter("processor.task2", count = 3, 6, 10, "s")
+
+    val olderTime = 100
+    val newerTime = 200
+
+    aggregator.aggregate(HistoryMetricsItem(time = newerTime, a))
+    aggregator.aggregate(HistoryMetricsItem(time = olderTime, b))
+
+    val result = aggregator.result
+
+    // Picks old time
+    assert(result.time == olderTime)
+
+    // Does summing
+    val check = result.value.asInstanceOf[Meter]
+
+    assert(check.count == expect.count)
+    assert(check.m1 - expect.m1 < 0.01)
+    assert(check.meanRate - expect.meanRate < 0.01)
+    assert(check.rateUnit == expect.rateUnit)
+  }
+
+  "AggregatorFactory" should "create aggregator" in {
+    val factory = new AggregatorFactory()
+
+    val a = Meter("processor.task1", count = 1, 1, 3, "s")
+    val b = Histogram("processor.task1", 1, 2, 3, 4, 5, 6)
+
+    val aggegator1 = factory.create(HistoryMetricsItem(time = 0, a), "name1")
+    assert(aggegator1.isInstanceOf[MeterAggregator])
+
+    val aggegator2 = factory.create(HistoryMetricsItem(time = 0, b), "name2")
+    assert(aggegator2.isInstanceOf[HistogramAggregator])
+  }
+
+  "ProcessorAggregator" should "aggregate on different read options" in {
+    val hours = 2 // Maintains 2 hours history
+    val seconds = 2 // Maintains 2 seconds recent data
+    val taskCount = 5 // For each processor
+    val metricCount = 100 // For each task, have metricCount metrics
+    val range = new HistoryMetricsConfig(hours, hours / 2 * 3600 * 1000,
+      seconds, seconds / 2 * 1000)
+
+    val aggregator = new ProcessorAggregator(range)
+
+    def count(value: Int): Int = value
+
+    def inputs(timeRange: Long): List[HistoryMetricsItem] = {
+      (0 until taskCount).map(TaskId(processorId = 0, _))
+        .flatMap(histogram(_, "receiveLatency", timeRange, 
metricCount)).toList ++
+        (0 until taskCount).map(TaskId(processorId = 0, _))
+          .flatMap(histogram(_, "processTime", timeRange, metricCount)).toList 
++
+        (0 until taskCount).map(TaskId(processorId = 1, _))
+          .flatMap(histogram(_, "receiveLatency", timeRange, 
metricCount)).toList ++
+        (0 until taskCount).map(TaskId(processorId = 1, _))
+          .flatMap(histogram(_, "processTime", timeRange, metricCount)).toList 
++
+        (0 until taskCount).map(TaskId(processorId = 0, _))
+          .flatMap(meter(_, "sendThroughput", timeRange, metricCount)).toList 
++
+        (0 until taskCount).map(TaskId(processorId = 0, _))
+          .flatMap(meter(_, "receiveThroughput", timeRange, 
metricCount)).toList ++
+        (0 until taskCount).map(TaskId(processorId = 1, _))
+          .flatMap(meter(_, "sendThroughput", timeRange, metricCount)).toList 
++
+        (0 until taskCount).map(TaskId(processorId = 1, _))
+          .flatMap(meter(_, "receiveThroughput", timeRange, 
metricCount)).toList
+    }
+
+    def check(list: List[HistoryMetricsItem], countMap: Map[String, Int]): 
Boolean = {
+      val nameCount = list.map(_.value.name).groupBy(key => 
key).mapValues(_.size).toList.toMap
+      nameCount sameElements countMap
+    }
+
+    // Aggregates on processor and meterNames,
+    val input = inputs(Long.MaxValue)
+    val readLatest = aggregator.aggregate(ReadOption.ReadLatest,
+      input.iterator, now = Long.MaxValue)
+    assert(readLatest.size == 8) // 2 processor * 4 metrics type
+    assert(check(readLatest, Map(
+      "app0.processor0:receiveLatency" -> count(1),
+      "app0.processor0:processTime" -> count(1),
+      "app0.processor0:sendThroughput" -> count(1),
+      "app0.processor0:receiveThroughput" -> count(1),
+      "app0.processor1:receiveLatency" -> count(1),
+      "app0.processor1:processTime" -> count(1),
+      "app0.processor1:sendThroughput" -> count(1),
+      "app0.processor1:receiveThroughput" -> count(1)
+    )))
+
+    // Aggregates on processor and meterNames and time range,
+    val readRecent = aggregator.aggregate(ReadOption.ReadRecent,
+      inputs(seconds * 1000).iterator, now = seconds * 1000)
+    assert(readRecent.size == 16) // 2 processor * 4 metrics type * 2 time 
range
+    assert(check(readRecent, Map(
+      "app0.processor0:receiveLatency" -> count(2),
+      "app0.processor0:processTime" -> count(2),
+      "app0.processor0:sendThroughput" -> count(2),
+      "app0.processor0:receiveThroughput" -> count(2),
+      "app0.processor1:receiveLatency" -> count(2),
+      "app0.processor1:processTime" -> count(2),
+      "app0.processor1:sendThroughput" -> count(2),
+      "app0.processor1:receiveThroughput" -> count(2)
+    )))
+
+    // Aggregates on processor and meterNames and time range,
+    val readHistory = aggregator.aggregate(ReadOption.ReadHistory,
+      inputs(hours * 3600 * 1000).iterator, now = hours * 3600 * 1000)
+    assert(readHistory.size == 16) // 2 processor * 4 metrics type * 2 time 
ranges
+    assert(check(readHistory, Map(
+      "app0.processor0:receiveLatency" -> count(2),
+      "app0.processor0:processTime" -> count(2),
+      "app0.processor0:sendThroughput" -> count(2),
+      "app0.processor0:receiveThroughput" -> count(2),
+      "app0.processor1:receiveLatency" -> count(2),
+      "app0.processor1:processTime" -> count(2),
+      "app0.processor1:sendThroughput" -> count(2),
+      "app0.processor1:receiveThroughput" -> count(2)
+    )))
+  }
+
+  private def histogram(
+      taskId: TaskId, metricName: String = "latency", timeRange: Long = 
Long.MaxValue,
+      repeat: Int = 1): List[HistoryMetricsItem] = {
+    val random = new Random()
+    (0 until repeat).map { _ =>
+      new HistoryMetricsItem(Math.abs(random.nextLong() % timeRange),
+        new 
Histogram(s"app0.processor${taskId.processorId}.task${taskId.index}:$metricName",
+          Math.abs(random.nextDouble()),
+          Math.abs(random.nextDouble()),
+          Math.abs(random.nextDouble()),
+          Math.abs(random.nextDouble()),
+          Math.abs(random.nextDouble()),
+          Math.abs(random.nextDouble())
+        ))
+    }.toList
+  }
+
+  private def meter(taskId: TaskId, metricName: String, timeRange: Long, 
repeat: Int)
+    : List[HistoryMetricsItem] = {
+    val random = new Random()
+    (0 until repeat).map { _ =>
+      new HistoryMetricsItem(Math.abs(random.nextLong() % timeRange),
+        new 
Meter(s"app0.processor${taskId.processorId}.task${taskId.index}:$metricName",
+          Math.abs(random.nextInt()),
+          Math.abs(random.nextDouble()),
+          Math.abs(random.nextDouble()),
+          "event/s")
+      )
+    }.toList
+  }
+
+  "ProcessorAggregator" should "handle smoothly for unsupported metric type 
and " +
+    "error formatted metric name" in {
+    val invalid = List(
+      // Unsupported metric type
+      HistoryMetricsItem(0, new Gauge("app0.processor0.task0:gauge", 100)),
+
+      // Wrong format: should be app0.processor0.task0:throughput
+      HistoryMetricsItem(0, new Meter("app0.processor0.task0/throughput", 100, 
0, 0, ""))
+    )
+
+    val valid = histogram(TaskId(0, 0), repeat = 10)
+
+    val aggregator = new ProcessorAggregator(new HistoryMetricsConfig(0, 0, 0, 
0))
+    val result = aggregator.aggregate(ReadOption.ReadLatest, (valid ++ 
invalid).toIterator,
+      now = Long.MaxValue)
+
+    // For one taskId, will only use one data point.
+    assert(result.size == 1)
+    assert(result.head.value.name == "app0.processor0:latency")
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/metrics/TaskFilterAggregatorSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/metrics/TaskFilterAggregatorSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/metrics/TaskFilterAggregatorSpec.scala
new file mode 100644
index 0000000..ee9402b
--- /dev/null
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/metrics/TaskFilterAggregatorSpec.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.metrics
+
+import scala.util.Random
+
+import org.scalatest.{FlatSpec, Matchers}
+
+import org.apache.gearpump.cluster.MasterToClient.HistoryMetricsItem
+import org.apache.gearpump.metrics.Metrics.{Histogram, Meter}
+import org.apache.gearpump.streaming.metrics.TaskFilterAggregator.Options
+import org.apache.gearpump.streaming.task.TaskId
+
+class TaskFilterAggregatorSpec extends FlatSpec with Matchers {
+
+  def metric(taskId: TaskId): HistoryMetricsItem = {
+    val random = new Random()
+    new HistoryMetricsItem(Math.abs(random.nextLong()),
+      new 
Histogram(s"app0.processor${taskId.processorId}.task${taskId.index}:latency",
+        0, 0, 0, 0, 0, 0))
+  }
+
+  it should "filter data on processor range, task range combination" in {
+    val inputs = (0 until 10).flatMap { processor =>
+      (0 until 10).map { task =>
+        metric(TaskId(processor, task))
+      }
+    }.toList
+
+    val globalLimit = 10
+    val aggregator = new TaskFilterAggregator(globalLimit)
+
+    // Limit not met, return all matches in this matrix
+    var options = new Options(limit = 20, startTask = 3, endTask = 6,
+      startProcessor = 3, endProcessor = 6)
+    assert(aggregator.aggregate(options, inputs.iterator).size == 9)
+
+    // User limit reached
+    options = new Options(limit = 3, startTask = 3, endTask = 5,
+      startProcessor = 3, endProcessor = 5)
+    assert(aggregator.aggregate(options, inputs.iterator).size == 3)
+
+    // Global limit reached
+    options = new Options(limit = 20, startTask = 3, endTask = 8,
+      startProcessor = 3, endProcessor = 8)
+    assert(aggregator.aggregate(options, inputs.iterator).size == globalLimit)
+  }
+
+  it should "reject wrong format options" in {
+    val options = Map(TaskFilterAggregator.StartTask -> "not a number")
+    assert(Options.parse(options) == null)
+  }
+
+  it should "skip wrong format metrics" in {
+    val invalid = List {
+      // Wrong format: should be app0.processor0.task0:throughput
+      HistoryMetricsItem(0, new Meter("app0.processor0.task0/throughput", 100, 
0, 0, ""))
+    }
+    val options = Options.acceptAll
+
+    val aggregator = new TaskFilterAggregator(Int.MaxValue)
+    assert(aggregator.aggregate(options, invalid.iterator).size == 0)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/source/DefaultTimeStampFilterSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DefaultTimeStampFilterSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DefaultTimeStampFilterSpec.scala
new file mode 100644
index 0000000..d6fa443
--- /dev/null
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DefaultTimeStampFilterSpec.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.source
+
+import org.scalacheck.Gen
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+import org.apache.gearpump.{Message, TimeStamp}
+
+class DefaultTimeStampFilterSpec extends PropSpec with PropertyChecks with 
Matchers {
+  property("DefaultTimeStampFilter should filter message against give 
timestamp") {
+    val timestampGen = Gen.chooseNum[Long](0L, 1000L)
+    val messageGen = for {
+      msg <- Gen.alphaStr
+      time <- timestampGen
+    } yield Message(msg, time)
+
+    val filter = new DefaultTimeStampFilter()
+
+    forAll(timestampGen, messageGen) {
+      (predicate: TimeStamp, message: Message) =>
+        if (message.timestamp >= predicate) {
+          filter.filter(message, predicate) shouldBe Some(message)
+        } else {
+          filter.filter(message, predicate) shouldBe None
+        }
+
+        filter.filter(null, predicate) shouldBe None
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/CheckpointManagerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/CheckpointManagerSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/CheckpointManagerSpec.scala
new file mode 100644
index 0000000..9e42e85
--- /dev/null
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/CheckpointManagerSpec.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.state.impl
+
+import org.mockito.Mockito._
+import org.mockito.{Matchers => MockitoMatchers}
+import org.scalacheck.Gen
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.streaming.transaction.api.CheckpointStore
+
+class CheckpointManagerSpec extends PropSpec with PropertyChecks with Matchers 
with MockitoSugar {
+
+  val timestampGen = Gen.chooseNum[Long](0L, 1000L)
+  val checkpointIntervalGen = Gen.chooseNum[Long](100L, 10000L)
+  property("CheckpointManager should recover from CheckpointStore") {
+    forAll(timestampGen, checkpointIntervalGen) {
+      (timestamp: TimeStamp, checkpointInterval: Long) =>
+        val checkpointStore = mock[CheckpointStore]
+        val checkpointManager =
+          new CheckpointManager(checkpointInterval, checkpointStore)
+        checkpointManager.recover(timestamp)
+
+        verify(checkpointStore).recover(timestamp)
+    }
+  }
+
+  property("CheckpointManager should write checkpoint to CheckpointStore") {
+    val checkpointGen = Gen.alphaStr.map(_.getBytes("UTF-8"))
+    forAll(timestampGen, checkpointIntervalGen, checkpointGen) {
+      (timestamp: TimeStamp, checkpointInterval: Long, checkpoint: 
Array[Byte]) =>
+        val checkpointStore = mock[CheckpointStore]
+        val checkpointManager =
+          new CheckpointManager(checkpointInterval, checkpointStore)
+        checkpointManager.checkpoint(timestamp, checkpoint)
+
+        verify(checkpointStore).persist(timestamp, checkpoint)
+    }
+  }
+
+  property("CheckpointManager should close CheckpointStore") {
+    forAll(checkpointIntervalGen) {
+      (checkpointInterval: Long) =>
+        val checkpointStore = mock[CheckpointStore]
+        val checkpointManager =
+          new CheckpointManager(checkpointInterval, checkpointStore)
+        checkpointManager.close()
+        verify(checkpointStore).close()
+    }
+  }
+
+  property("CheckpointManager should update checkpoint time according to max 
message timestamp") {
+    forAll(timestampGen, checkpointIntervalGen) {
+      (timestamp: TimeStamp, checkpointInterval: Long) =>
+        val checkpointStore = mock[CheckpointStore]
+        val checkpointManager =
+          new CheckpointManager(checkpointInterval, checkpointStore)
+        checkpointManager.update(timestamp)
+        checkpointManager.getMaxMessageTime shouldBe timestamp
+
+        val checkpointTime = checkpointManager.getCheckpointTime.get
+        timestamp should (be < checkpointTime and be >= (checkpointTime - 
checkpointInterval))
+
+        checkpointManager.checkpoint(checkpointTime, Array.empty[Byte])
+        verify(checkpointStore).persist(MockitoMatchers.eq(checkpointTime),
+          MockitoMatchers.anyObject[Array[Byte]]())
+        checkpointManager.getCheckpointTime shouldBe empty
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/InMemoryCheckpointStoreSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/InMemoryCheckpointStoreSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/InMemoryCheckpointStoreSpec.scala
new file mode 100644
index 0000000..8d2f515
--- /dev/null
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/InMemoryCheckpointStoreSpec.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.state.impl
+
+import org.scalacheck.Gen
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+class InMemoryCheckpointStoreSpec extends PropSpec with PropertyChecks with 
Matchers {
+
+  property("InMemoryCheckpointStore should provide read / write checkpoint") {
+    val timestampGen = Gen.chooseNum[Long](1, 1000)
+    val checkpointGen = Gen.alphaStr.map(_.getBytes("UTF-8"))
+    forAll(timestampGen, checkpointGen) { (timestamp: Long, checkpoint: 
Array[Byte]) =>
+      val store = new InMemoryCheckpointStore
+      store.recover(timestamp) shouldBe None
+      store.persist(timestamp, checkpoint)
+      store.recover(timestamp) shouldBe Some(checkpoint)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/NonWindowStateSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/NonWindowStateSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/NonWindowStateSpec.scala
new file mode 100644
index 0000000..4cdff95
--- /dev/null
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/NonWindowStateSpec.scala
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.state.impl
+
+import scala.util.Success
+
+import org.mockito.Mockito._
+import org.scalacheck.Gen
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.streaming.state.api.{Monoid, Serializer}
+
+class NonWindowStateSpec extends PropSpec with PropertyChecks with Matchers 
with MockitoSugar {
+
+  val longGen = Gen.chooseNum[Long](100L, System.currentTimeMillis())
+
+  property("NonWindowState should recover checkpointed state at given 
timestamp") {
+    forAll(longGen) {
+      (timestamp: TimeStamp) =>
+        val monoid = mock[Monoid[AnyRef]]
+        val serializer = mock[Serializer[AnyRef]]
+        val bytes = Array.empty[Byte]
+        val checkpoint = mock[AnyRef]
+        val zero = mock[AnyRef]
+        when(monoid.zero).thenReturn(zero, zero)
+        when(monoid.plus(zero, zero)).thenReturn(zero, Nil: _*)
+        when(monoid.plus(checkpoint, zero)).thenReturn(checkpoint, Nil: _*)
+
+        val state = new NonWindowState[AnyRef](monoid, serializer)
+        state.left shouldBe zero
+        state.right shouldBe zero
+        state.get shouldBe Some(zero)
+
+        when(serializer.deserialize(bytes)).thenReturn(Success(checkpoint))
+        state.recover(timestamp, bytes)
+
+        state.left shouldBe checkpoint
+        state.right shouldBe zero
+        state.get shouldBe Some(checkpoint)
+    }
+  }
+
+  property("NonWindowState checkpoints state") {
+    forAll(longGen) {
+      (checkpointTime: TimeStamp) =>
+        val monoid = mock[Monoid[AnyRef]]
+        val serializer = mock[Serializer[AnyRef]]
+
+        val left = mock[AnyRef]
+        val right = mock[AnyRef]
+        val zero = mock[AnyRef]
+        val plus = mock[AnyRef]
+
+        when(monoid.zero).thenReturn(zero, zero)
+        when(monoid.plus(zero, zero)).thenReturn(zero, Nil: _*)
+
+        val state = new NonWindowState[AnyRef](monoid, serializer)
+        state.left shouldBe zero
+        state.right shouldBe zero
+        state.get shouldBe Some(zero)
+
+        state.left = left
+        state.right = right
+
+        when(monoid.zero).thenReturn(zero, Nil: _*)
+        when(monoid.plus(left, right)).thenReturn(plus, Nil: _*)
+        when(monoid.plus(plus, zero)).thenReturn(plus, Nil: _*)
+        state.checkpoint()
+
+        verify(serializer).serialize(left)
+        state.left shouldBe plus
+        state.right shouldBe zero
+        state.get shouldBe Some(plus)
+    }
+  }
+
+  property("NonWindowState updates state") {
+    forAll(longGen) {
+      (checkpointTime: TimeStamp) =>
+        val monoid = mock[Monoid[AnyRef]]
+        val serializer = mock[Serializer[AnyRef]]
+
+        val left = mock[AnyRef]
+        val right = mock[AnyRef]
+        val zero = mock[AnyRef]
+        val plus = mock[AnyRef]
+
+        when(monoid.zero).thenReturn(zero, zero)
+        when(monoid.plus(zero, zero)).thenReturn(zero, Nil: _*)
+
+        val state = new NonWindowState[AnyRef](monoid, serializer)
+        state.left shouldBe zero
+        state.right shouldBe zero
+        state.get shouldBe Some(zero)
+
+        when(monoid.plus(zero, left)).thenReturn(left, Nil: _*)
+        when(monoid.plus(left, zero)).thenReturn(left, Nil: _*)
+        state.setNextCheckpointTime(checkpointTime)
+        state.update(checkpointTime - 1, left)
+        state.left shouldBe left
+        state.right shouldBe zero
+        state.get shouldBe Some(left)
+
+        when(monoid.plus(zero, right)).thenReturn(right, Nil: _*)
+        when(monoid.plus(left, right)).thenReturn(plus, Nil: _*)
+        state.setNextCheckpointTime(checkpointTime)
+        state.update(checkpointTime + 1, right)
+        state.left shouldBe left
+        state.right shouldBe right
+        state.get shouldBe Some(plus)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowSpec.scala
new file mode 100644
index 0000000..d9282ae
--- /dev/null
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowSpec.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.state.impl
+
+import org.scalacheck.Gen
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+import org.apache.gearpump.TimeStamp
+
+class WindowSpec extends PropSpec with PropertyChecks with Matchers with 
MockitoSugar {
+
+  val windowSizeGen = Gen.chooseNum[Long](1L, 1000L)
+  val windowStepGen = Gen.chooseNum[Long](1L, 1000L)
+  val timestampGen = Gen.chooseNum[Long](0L, 1000L)
+  property("Window should only slide when time passes window end") {
+    forAll(timestampGen, windowSizeGen, windowStepGen) {
+      (timestamp: TimeStamp, windowSize: Long, windowStep: Long) =>
+        val window = new Window(windowSize, windowStep)
+        window.shouldSlide shouldBe false
+        window.update(timestamp)
+        window.shouldSlide shouldBe timestamp >= windowSize
+    }
+  }
+
+  property("Window should slide by one or to given timestamp") {
+    forAll(timestampGen, windowSizeGen, windowStepGen) {
+      (timestamp: TimeStamp, windowSize: Long, windowStep: Long) =>
+        val window = new Window(windowSize, windowStep)
+        window.range shouldBe(0L, windowSize)
+
+        window.slideOneStep()
+        window.range shouldBe(windowStep, windowSize + windowStep)
+
+        window.slideTo(timestamp)
+        val (startTime, endTime) = window.range
+        if (windowStep > windowSize) {
+          timestamp should (be >= startTime and be < (startTime + windowStep))
+        } else {
+          timestamp should (be >= startTime and be < endTime)
+        }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowStateSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowStateSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowStateSpec.scala
new file mode 100644
index 0000000..299a626
--- /dev/null
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowStateSpec.scala
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.state.impl
+
+import scala.collection.immutable.TreeMap
+import scala.util.Success
+
+import org.mockito.Mockito._
+import org.scalacheck.Gen
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+import org.apache.gearpump._
+import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.state.api.{Group, Serializer}
+
+class WindowStateSpec extends PropSpec with PropertyChecks with Matchers with 
MockitoSugar {
+
+  val longGen = Gen.chooseNum[Long](100L, 10000L)
+
+  val intervalGen = for {
+    st <- longGen
+    et <- Gen.chooseNum[Long](st + 1, 100000L)
+  } yield Interval(st, et)
+
+  property("WindowState init should recover checkpointed state") {
+    forAll(intervalGen) {
+      (interval: Interval) =>
+        val window = mock[Window]
+        val taskContext = MockUtil.mockTaskContext
+        val group = mock[Group[AnyRef]]
+        val serializer = mock[Serializer[TreeMap[Interval, AnyRef]]]
+
+        val timestamp = interval.startTime
+        val zero = mock[AnyRef]
+        val bytes = Array.empty[Byte]
+        val data = mock[AnyRef]
+        val checkpoint = TreeMap(interval -> data)
+        when(group.zero).thenReturn(zero, zero)
+        when(group.plus(zero, data)).thenReturn(data, Nil: _*)
+        when(group.plus(data, zero)).thenReturn(data, Nil: _*)
+        when(group.plus(zero, zero)).thenReturn(zero, Nil: _*)
+        when(serializer.deserialize(bytes)).thenReturn(Success(checkpoint))
+
+        val state = new WindowState[AnyRef](group, serializer, taskContext, 
window)
+        state.left shouldBe zero
+        state.right shouldBe zero
+        state.get shouldBe Some(zero)
+
+        state.recover(timestamp, bytes)
+
+        state.left shouldBe data
+        state.right shouldBe zero
+        state.get shouldBe Some(data)
+        state.getIntervalStates(interval.startTime, interval.endTime) shouldBe 
checkpoint
+    }
+  }
+
+  property("WindowState checkpoints") {
+    forAll(longGen) { (checkpointTime: TimeStamp) =>
+      val window = mock[Window]
+      val taskContext = MockUtil.mockTaskContext
+      val group = mock[Group[AnyRef]]
+      val serializer = mock[Serializer[TreeMap[Interval, AnyRef]]]
+
+      val zero = mock[AnyRef]
+      val left = mock[AnyRef]
+      val right = mock[AnyRef]
+      val plus = mock[AnyRef]
+
+      when(group.zero).thenReturn(zero, zero)
+      when(group.plus(zero, zero)).thenReturn(zero, Nil: _*)
+      val state = new WindowState[AnyRef](group, serializer, taskContext, 
window)
+      state.left shouldBe zero
+      state.right shouldBe zero
+      state.get shouldBe Some(zero)
+
+      val start = checkpointTime - 1
+      val end = checkpointTime + 1
+      val size = end - start
+      val step = 1L
+
+      when(window.range).thenReturn((start, end))
+      when(window.windowSize).thenReturn(size)
+      when(window.windowStep).thenReturn(step)
+      when(group.zero).thenReturn(zero, zero)
+      when(group.plus(zero, left)).thenReturn(left, Nil: _*)
+      when(group.plus(zero, right)).thenReturn(right, Nil: _*)
+      when(group.plus(left, right)).thenReturn(plus, Nil: _*)
+
+      state.left = left
+      state.updateIntervalStates(start, left, checkpointTime)
+      state.right = right
+      state.updateIntervalStates(checkpointTime, right, checkpointTime)
+
+      state.setNextCheckpointTime(checkpointTime)
+      state.checkpoint()
+
+      state.left shouldBe plus
+      state.right shouldBe zero
+      verify(serializer).serialize(TreeMap(Interval(start, checkpointTime) -> 
left))
+    }
+  }
+
+  property("WindowState updates state") {
+    forAll(longGen) { (checkpointTime: TimeStamp) =>
+      val window = mock[Window]
+      val taskContext = MockUtil.mockTaskContext
+      val group = mock[Group[AnyRef]]
+      val serializer = mock[Serializer[TreeMap[Interval, AnyRef]]]
+
+      val zero = mock[AnyRef]
+      val left = mock[AnyRef]
+      val right = mock[AnyRef]
+      val plus = mock[AnyRef]
+
+      when(group.zero).thenReturn(zero, zero)
+      val state = new WindowState[AnyRef](group, serializer, taskContext, 
window)
+
+      val start = checkpointTime - 1
+      val end = checkpointTime + 1
+      val size = end - start
+      val step = 1L
+
+      when(window.range).thenReturn((start, end))
+      when(window.windowSize).thenReturn(size)
+      when(window.windowStep).thenReturn(step)
+      when(window.shouldSlide).thenReturn(false)
+      when(group.plus(zero, left)).thenReturn(left, left)
+      when(group.plus(left, zero)).thenReturn(left, Nil: _*)
+      when(taskContext.upstreamMinClock).thenReturn(0L)
+
+      // Time < checkpointTime
+      // Update left in current window
+      state.setNextCheckpointTime(checkpointTime)
+      state.update(start, left)
+
+      verify(window).update(0L)
+      state.left shouldBe left
+      state.right shouldBe zero
+      state.get shouldBe Some(left)
+      state.getIntervalStates(start, end) shouldBe TreeMap(Interval(start, 
checkpointTime) -> left)
+
+      when(window.range).thenReturn((start, end))
+      when(window.windowSize).thenReturn(size)
+      when(window.windowStep).thenReturn(step)
+      when(window.shouldSlide).thenReturn(false)
+      when(group.plus(zero, right)).thenReturn(right, right)
+      when(group.plus(left, right)).thenReturn(plus, Nil: _*)
+      when(taskContext.upstreamMinClock).thenReturn(0L)
+
+      // Time >= checkpointTime
+      // Update right in current window
+      state.setNextCheckpointTime(checkpointTime)
+      state.update(checkpointTime, right)
+
+      verify(window, times(2)).update(0L)
+      state.left shouldBe left
+      state.right shouldBe right
+      state.get shouldBe Some(plus)
+      state.getIntervalStates(start, end) shouldBe
+        TreeMap(Interval(start, start + step) -> left, Interval(start + step, 
end) -> right)
+
+      // Slides window forward
+      when(window.range).thenReturn((start, end), (start + step, end + step))
+      when(window.shouldSlide).thenReturn(true)
+      when(taskContext.upstreamMinClock).thenReturn(checkpointTime)
+      when(group.minus(left, left)).thenReturn(zero, Nil: _*)
+      when(group.plus(zero, right)).thenReturn(right, Nil: _*)
+      when(group.plus(right, right)).thenReturn(plus, Nil: _*)
+      when(group.plus(zero, plus)).thenReturn(plus, Nil: _*)
+
+      state.setNextCheckpointTime(checkpointTime)
+      state.update(end, right)
+
+      verify(window).slideOneStep()
+      verify(window).update(checkpointTime)
+      state.left shouldBe zero
+      state.right shouldBe plus
+      state.get shouldBe Some(plus)
+      state.getIntervalStates(start, end + step) shouldBe
+        TreeMap(
+          Interval(start, start + step) -> left,
+          Interval(start + step, end) -> right,
+          Interval(end, end + step) -> right)
+    }
+  }
+
+  property("WindowState gets interval for timestamp") {
+    forAll(longGen, longGen, longGen, longGen) {
+      (timestamp: TimeStamp, checkpointTime: TimeStamp, windowSize: Long, 
windowStep: Long) =>
+        val windowManager = new Window(windowSize, windowStep)
+        val taskContext = MockUtil.mockTaskContext
+        val group = mock[Group[AnyRef]]
+        val serializer = mock[Serializer[TreeMap[Interval, AnyRef]]]
+
+        val zero = mock[AnyRef]
+        when(group.zero).thenReturn(zero, zero)
+        val state = new WindowState[AnyRef](group, serializer, taskContext, 
windowManager)
+
+        val interval = state.getInterval(timestamp, checkpointTime)
+        intervalSpec(interval, timestamp, checkpointTime, windowSize, 
windowStep)
+
+        val nextTimeStamp = interval.endTime
+        val nextInterval = state.getInterval(nextTimeStamp, checkpointTime)
+        intervalSpec(nextInterval, nextTimeStamp, checkpointTime, windowSize, 
windowStep)
+
+        interval.endTime shouldBe nextInterval.startTime
+    }
+
+    def intervalSpec(interval: Interval, timestamp: TimeStamp,
+        checkpointTime: TimeStamp, windowSize: Long, windowStep: Long): Unit = 
{
+      interval.startTime should be <= interval.endTime
+      timestamp / windowStep * windowStep should (be <= interval.startTime)
+      (timestamp - windowSize) / windowStep * windowStep should (be <= 
interval.startTime)
+      (timestamp / windowStep + 1) * windowStep should (be >= interval.endTime)
+      ((timestamp - windowSize) / windowStep + 1) * windowStep + windowSize 
should
+        (be >= interval.endTime)
+      checkpointTime should (be <= interval.startTime or be >= 
interval.endTime)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/storage/InMemoryAppStoreOnMasterSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/storage/InMemoryAppStoreOnMasterSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/storage/InMemoryAppStoreOnMasterSpec.scala
new file mode 100644
index 0000000..71901a4
--- /dev/null
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/storage/InMemoryAppStoreOnMasterSpec.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.storage
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec}
+
+import org.apache.gearpump.cluster.{MasterHarness, MiniCluster}
+import org.apache.gearpump.streaming.StreamingTestUtil
+import org.apache.gearpump.util.Constants
+
+class InMemoryAppStoreOnMasterSpec extends WordSpec with Matchers with 
BeforeAndAfterAll {
+  implicit val timeout = Constants.FUTURE_TIMEOUT
+  implicit val dispatcher = MasterHarness.cachedPool
+
+  "InMemoryAppStoreOnMaster" should {
+    "save and return the data properly" in {
+      val appId = 0
+      val miniCluster = new MiniCluster
+      val master = miniCluster.mockMaster
+      StreamingTestUtil.startAppMaster(miniCluster, appId)
+      val store = new InMemoryAppStoreOnMaster(appId, master)
+
+      Thread.sleep(500)
+
+      store.put("String_type", "this is a string")
+      store.put("Int_type", 1024)
+      store.put("Tuple2_type", ("element1", 1024))
+
+      val future1 = store.get("String_type").map { value =>
+        value.asInstanceOf[String] should be("this is a string")
+      }
+      val future2 = store.get("Int_type").map { value => 
value.asInstanceOf[Int] should be(1024) }
+      val future3 = store.get("Tuple2_type").map { value =>
+        value.asInstanceOf[(String, Int)] should be(("element1", 1024))
+      }
+      val future4 = store.get("key").map { value => value.asInstanceOf[Object] 
should be(null) }
+      Await.result(future1, 15.seconds)
+      Await.result(future2, 15.seconds)
+      Await.result(future3, 15.seconds)
+      Await.result(future4, 15.seconds)
+      miniCluster.shutDown
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriberSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriberSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriberSpec.scala
new file mode 100644
index 0000000..cfe47eb
--- /dev/null
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriberSpec.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.task
+
+import org.scalatest.{FlatSpec, Matchers}
+
+import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner}
+import org.apache.gearpump.streaming.task.SubscriberSpec.TestTask
+import org.apache.gearpump.streaming.{DAG, ProcessorDescription}
+import org.apache.gearpump.util.Graph
+import org.apache.gearpump.util.Graph._
+
+class SubscriberSpec extends FlatSpec with Matchers {
+  "Subscriber.of" should "return all subscriber for a processor" in {
+
+    val sourceProcessorId = 0
+    val task1 = ProcessorDescription(id = sourceProcessorId, taskClass =
+      classOf[TestTask].getName, parallelism = 1)
+    val task2 = ProcessorDescription(id = 1, taskClass = 
classOf[TestTask].getName, parallelism = 1)
+    val task3 = ProcessorDescription(id = 2, taskClass = 
classOf[TestTask].getName, parallelism = 1)
+    val partitioner = Partitioner[HashPartitioner]
+    val dag = DAG(Graph(task1 ~ partitioner ~> task2, task1 ~ partitioner ~> 
task3,
+      task2 ~ partitioner ~> task3))
+
+    val subscribers = Subscriber.of(sourceProcessorId, dag)
+    assert(subscribers.size == 2)
+
+    assert(subscribers.toSet ==
+      Set(Subscriber(1, partitioner, task2.parallelism, task2.life), 
Subscriber(2, partitioner,
+        task3.parallelism, task3.life)))
+  }
+}
+
+object SubscriberSpec {
+  class TestTask
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
new file mode 100644
index 0000000..4afee8b
--- /dev/null
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.task
+
+import java.util.Random
+
+import org.mockito.Mockito._
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{FlatSpec, Matchers}
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner}
+import org.apache.gearpump.streaming.task.SubscriptionSpec.NextTask
+import org.apache.gearpump.streaming.{LifeTime, ProcessorDescription}
+
+class SubscriptionSpec extends FlatSpec with Matchers with MockitoSugar {
+  val appId = 0
+  val executorId = 0
+  val taskId = TaskId(0, 0)
+  val session = new Random().nextInt()
+
+  val downstreamProcessorId = 1
+  val partitioner = Partitioner[HashPartitioner]
+
+  val parallism = 2
+  val downstreamProcessor = ProcessorDescription(downstreamProcessorId, 
classOf[NextTask].getName,
+    parallism)
+  val subscriber = Subscriber(downstreamProcessorId, partitioner, 
downstreamProcessor.parallelism,
+    downstreamProcessor.life)
+
+  private def prepare: (Subscription, ExpressTransport) = {
+    val transport = mock[ExpressTransport]
+    val subscription = new Subscription(appId, executorId, taskId, subscriber, 
session, transport)
+    subscription.start()
+
+    val expectedAckRequest = InitialAckRequest(taskId, session)
+    verify(transport, times(1)).transport(expectedAckRequest, TaskId(1, 0), 
TaskId(1, 1))
+
+    (subscription, transport)
+  }
+
+  it should "not send any more message when its life ends" in {
+    val (subscription, transport) = prepare
+    subscription.changeLife(LifeTime(0, 0))
+    val count = subscription.sendMessage(Message("some"))
+    assert(count == 0)
+  }
+
+  it should "send message and handle ack correctly" in {
+    val (subscription, transport) = prepare
+    val msg1 = new Message("1", timestamp = 70)
+    subscription.sendMessage(msg1)
+
+    verify(transport, times(1)).transport(msg1, TaskId(1, 1))
+    assert(subscription.minClock == 70)
+
+    val msg2 = new Message("0", timestamp = 50)
+    subscription.sendMessage(msg2)
+    verify(transport, times(1)).transport(msg2, TaskId(1, 0))
+
+    // minClock has been set to smaller one
+    assert(subscription.minClock == 50)
+
+    val initialMinClock = subscription.minClock
+
+    // Acks initial AckRequest(0)
+    subscription.receiveAck(Ack(TaskId(1, 1), 0, 0, session))
+    subscription.receiveAck(Ack(TaskId(1, 0), 0, 0, session))
+
+    // Sends 100 messages
+    100 until 200 foreach { clock =>
+      subscription.sendMessage(Message("1", clock))
+      subscription.sendMessage(Message("2", clock))
+    }
+
+    // Ack not received, minClock no change
+    assert(subscription.minClock == initialMinClock)
+
+    subscription.receiveAck(Ack(TaskId(1, 1), 100, 100, session))
+    subscription.receiveAck(Ack(TaskId(1, 0), 100, 100, session))
+
+    // Ack received, minClock changed
+    assert(subscription.minClock > initialMinClock)
+
+    // Expects to receive two ackRequest for two downstream tasks
+    val ackRequestForTask0 = AckRequest(taskId, 200, session)
+    verify(transport, times(1)).transport(ackRequestForTask0, TaskId(1, 0))
+
+    val ackRequestForTask1 = AckRequest(taskId, 200, session)
+    verify(transport, times(1)).transport(ackRequestForTask1, TaskId(1, 1))
+  }
+
+  it should "disallow more message sending if there is no ack back" in {
+    val (subscription, transport) = prepare
+    // send 100 messages
+    0 until (Subscription.MAX_PENDING_MESSAGE_COUNT * 2 + 1) foreach { clock =>
+      subscription.sendMessage(Message(randomMessage, clock))
+    }
+
+    assert(subscription.allowSendingMoreMessages() == false)
+  }
+
+  it should "report minClock as Long.MaxValue when there is no pending 
message" in {
+    val (subscription, transport) = prepare
+    val msg1 = new Message("1", timestamp = 70)
+    subscription.sendMessage(msg1)
+    assert(subscription.minClock == 70)
+    subscription.receiveAck(Ack(TaskId(1, 1), 1, 1, session))
+    assert(subscription.minClock == Long.MaxValue)
+  }
+
+  private def randomMessage: String = new Random().nextInt.toString
+}
+
+object SubscriptionSpec {
+
+  class NextTask(taskContext: TaskContext, conf: UserConfig) extends 
Task(taskContext, conf) {
+
+    override def onStart(startTime: StartTime): Unit = {
+    }
+
+    override def onNext(msg: Message): Unit = {
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/org/apache/gearpump/streaming/task/TaskActorSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/task/TaskActorSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/task/TaskActorSpec.scala
new file mode 100644
index 0000000..48901d2
--- /dev/null
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/task/TaskActorSpec.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.task
+
+import akka.actor.{ExtendedActorSystem, Props}
+import akka.testkit._
+import com.typesafe.config.{Config, ConfigFactory}
+import org.mockito.Mockito.{mock, times, verify, when}
+import org.scalatest.{BeforeAndAfterEach, Matchers, WordSpec}
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.{MasterHarness, TestUtil, UserConfig}
+import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner}
+import org.apache.gearpump.serializer.{FastKryoSerializer, 
SerializationFramework}
+import org.apache.gearpump.streaming.AppMasterToExecutor.{ChangeTask, 
MsgLostException, StartTask, TaskChanged, TaskRegistered}
+import org.apache.gearpump.streaming.task.TaskActorSpec.TestTask
+import org.apache.gearpump.streaming.{DAG, LifeTime, ProcessorDescription}
+import org.apache.gearpump.util.Graph._
+import org.apache.gearpump.util.{Graph, Util}
+
+class TaskActorSpec extends WordSpec with Matchers with BeforeAndAfterEach 
with MasterHarness {
+  protected override def config: Config = {
+    ConfigFactory.parseString(
+      """ akka.loggers = ["akka.testkit.TestEventListener"]
+        | akka.test.filter-leeway = 20000
+      """.stripMargin).
+      withFallback(TestUtil.DEFAULT_CONFIG)
+  }
+
+  val appId = 0
+  val task1 = ProcessorDescription(id = 0, taskClass = 
classOf[TestTask].getName, parallelism = 1)
+  val task2 = ProcessorDescription(id = 1, taskClass = 
classOf[TestTask].getName, parallelism = 1)
+  val dag: DAG = DAG(Graph(task1 ~ Partitioner[HashPartitioner] ~> task2))
+  val taskId1 = TaskId(0, 0)
+  val taskId2 = TaskId(1, 0)
+  val executorId1 = 1
+  val executorId2 = 2
+
+  var mockMaster: TestProbe = null
+  var taskContext1: TaskContextData = null
+
+  var mockSerializerPool: SerializationFramework = null
+
+  override def beforeEach(): Unit = {
+    startActorSystem()
+    mockMaster = TestProbe()(getActorSystem)
+
+    mockSerializerPool = mock(classOf[SerializationFramework])
+    val serializer = new 
FastKryoSerializer(getActorSystem.asInstanceOf[ExtendedActorSystem])
+    when(mockSerializerPool.get()).thenReturn(serializer)
+
+    taskContext1 = TaskContextData(executorId1, appId,
+      "appName", mockMaster.ref, 1,
+      LifeTime.Immortal,
+      Subscriber.of(processorId = 0, dag))
+  }
+
+  "TaskActor" should {
+    "register itself to AppMaster when started" in {
+      val mockTask = mock(classOf[TaskWrapper])
+      val testActor = TestActorRef[TaskActor](Props(
+        new TaskActor(taskId1, taskContext1, UserConfig.empty,
+          mockTask, mockSerializerPool)))(getActorSystem)
+      testActor ! TaskRegistered(taskId1, 0, Util.randInt())
+      testActor ! StartTask(taskId1)
+
+      implicit val system = getActorSystem
+      val ack = Ack(taskId2, 100, 99, testActor.underlyingActor.sessionId)
+      EventFilter[MsgLostException](occurrences = 1) intercept {
+        testActor ! ack
+      }
+    }
+
+    "respond to ChangeTask" in {
+      val mockTask = mock(classOf[TaskWrapper])
+      val testActor = TestActorRef[TaskActor](Props(new TaskActor(taskId1, 
taskContext1,
+      UserConfig.empty, mockTask, mockSerializerPool)))(getActorSystem)
+      testActor ! TaskRegistered(taskId1, 0, Util.randInt())
+      testActor ! StartTask(taskId1)
+      mockMaster.expectMsgType[GetUpstreamMinClock]
+
+      mockMaster.send(testActor, ChangeTask(taskId1, 1, LifeTime.Immortal, 
List.empty[Subscriber]))
+      mockMaster.expectMsgType[TaskChanged]
+    }
+
+    "handle received message correctly" in {
+      val mockTask = mock(classOf[TaskWrapper])
+      val msg = Message("test")
+
+      val testActor = TestActorRef[TaskActor](Props(new TaskActor(taskId1, 
taskContext1,
+      UserConfig.empty, mockTask, mockSerializerPool)))(getActorSystem)
+      testActor.tell(TaskRegistered(taskId1, 0, Util.randInt()), 
mockMaster.ref)
+      testActor.tell(StartTask(taskId1), mockMaster.ref)
+
+      testActor.tell(msg, testActor)
+
+      verify(mockTask, times(1)).onNext(msg)
+    }
+  }
+
+  override def afterEach(): Unit = {
+    shutdownActorSystem()
+  }
+}
+
+object TaskActorSpec {
+  class TestTask
+}
\ No newline at end of file

Reply via email to