http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
 
b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
deleted file mode 100644
index 2c64133..0000000
--- 
a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.appmaster
-
-import scala.collection.mutable.ArrayBuffer
-
-import com.typesafe.config.ConfigFactory
-import org.scalatest.{Matchers, WordSpec}
-
-import io.gearpump.Message
-import io.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceRequest}
-import io.gearpump.cluster.worker.WorkerId
-import io.gearpump.cluster.{TestUtil, UserConfig}
-import io.gearpump.partitioner.{HashPartitioner, Partitioner}
-import io.gearpump.streaming.appmaster.TaskLocator.Localities
-import io.gearpump.streaming.appmaster.TaskSchedulerSpec.{TestTask1, TestTask2}
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
-import io.gearpump.streaming.{Constants, DAG, ProcessorDescription}
-import io.gearpump.util.Graph
-import io.gearpump.util.Graph._
-
-class TaskSchedulerSpec extends WordSpec with Matchers {
-  val task1 = ProcessorDescription(id = 0, taskClass = 
classOf[TestTask1].getName, parallelism = 4)
-  val task2 = ProcessorDescription(id = 1, taskClass = 
classOf[TestTask2].getName, parallelism = 2)
-
-  val dag = DAG(Graph(task1 ~ Partitioner[HashPartitioner] ~> task2))
-
-  val config = TestUtil.DEFAULT_CONFIG
-
-  "TaskScheduler" should {
-    "schedule tasks on different workers properly according user's 
configuration" in {
-
-      val localities = Localities(
-        Map(WorkerId(1, 0L) -> Array(TaskId(0, 0), TaskId(0, 1), TaskId(1, 0), 
TaskId(1, 1)),
-          WorkerId(2, 0L) -> Array(TaskId(0, 2), TaskId(0, 3))
-        ))
-
-      val localityConfig = 
ConfigFactory.parseString(Localities.toJson(localities))
-
-      import io.gearpump.streaming.Constants.GEARPUMP_STREAMING_LOCALITIES
-      val appName = "app"
-      val taskScheduler = new TaskSchedulerImpl(appId = 0, appName,
-        config.withValue(s"$GEARPUMP_STREAMING_LOCALITIES.$appName", 
localityConfig.root))
-
-      val expectedRequests =
-        Array(ResourceRequest(Resource(4), WorkerId(1, 0L), relaxation = 
Relaxation.SPECIFICWORKER),
-          ResourceRequest(Resource(2), WorkerId(2, 0L), relaxation = 
Relaxation.SPECIFICWORKER))
-
-      taskScheduler.setDAG(dag)
-      val resourceRequests = taskScheduler.getResourceRequests()
-
-      val acturalRequests = resourceRequests.sortBy(_.resource.slots)
-      
assert(acturalRequests.sameElements(expectedRequests.sortBy(_.resource.slots)))
-
-      val tasksOnWorker1 = ArrayBuffer[Int]()
-      val tasksOnWorker2 = ArrayBuffer[Int]()
-      for (i <- 0 until 4) {
-        tasksOnWorker1.append(taskScheduler.schedule(WorkerId(1, 0L),
-          executorId = 0, Resource(1)).head.processorId)
-      }
-      for (i <- 0 until 2) {
-        tasksOnWorker2.append(taskScheduler.schedule(WorkerId(2, 0L), 
executorId = 1,
-          Resource(1)).head.processorId)
-      }
-
-      // Allocates more resource, and no tasks to launch
-      assert(taskScheduler.schedule(WorkerId(3, 0L), executorId = 3,
-        Resource(1)) == List.empty[TaskId])
-
-      // On worker1, executor 0
-      assert(tasksOnWorker1.sorted.sameElements(Array(0, 0, 1, 1)))
-
-      // On worker2, executor 1, Task(0, 0), Task(0, 1)
-      assert(tasksOnWorker2.sorted.sameElements(Array(0, 0)))
-
-      val rescheduledResources = taskScheduler.executorFailed(executorId = 1)
-
-      
assert(rescheduledResources.sameElements(Array(ResourceRequest(Resource(2),
-        WorkerId.unspecified, relaxation = Relaxation.ONEWORKER))))
-
-      val launchedTask = taskScheduler.schedule(WorkerId(3, 0L), executorId = 
3, Resource(2))
-
-      // Starts the failed 2 tasks Task(0, 0) and Task(0, 1)
-      assert(launchedTask.length == 2)
-    }
-
-    "schedule task fairly" in {
-      val appName = "app"
-      val taskScheduler = new TaskSchedulerImpl(appId = 0, appName, config)
-
-      val expectedRequests =
-        Array(ResourceRequest(Resource(4), WorkerId(1, 0L), relaxation = 
Relaxation.SPECIFICWORKER),
-          ResourceRequest(Resource(2), WorkerId(2, 0L), relaxation = 
Relaxation.SPECIFICWORKER))
-
-      taskScheduler.setDAG(dag)
-      val tasks = taskScheduler.schedule(WorkerId(1, 0L), executorId = 0, 
Resource(4))
-      assert(tasks.filter(_.processorId == 0).length == 2)
-      assert(tasks.filter(_.processorId == 1).length == 2)
-    }
-  }
-}
-
-object TaskSchedulerSpec {
-  class TestTask1(taskContext: TaskContext, userConf: UserConfig)
-    extends Task(taskContext, userConf) {
-    override def onStart(startTime: StartTime): Unit = Unit
-    override def onNext(msg: Message): Unit = Unit
-  }
-
-  class TestTask2(taskContext: TaskContext, userConf: UserConfig)
-    extends Task(taskContext, userConf) {
-    override def onStart(startTime: StartTime): Unit = Unit
-    override def onNext(msg: Message): Unit = Unit
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamAppSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamAppSpec.scala 
b/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamAppSpec.scala
deleted file mode 100644
index 132d46c..0000000
--- a/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamAppSpec.scala
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.dsl
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.ActorSystem
-import org.mockito.Mockito.when
-import org.scalatest._
-import org.scalatest.mock.MockitoSugar
-
-import io.gearpump.cluster.TestUtil
-import io.gearpump.cluster.client.ClientContext
-import io.gearpump.streaming.dsl.plan.OpTranslator.SourceTask
-class StreamAppSpec extends FlatSpec with Matchers with BeforeAndAfterAll with 
MockitoSugar {
-
-  implicit var system: ActorSystem = null
-
-  override def beforeAll(): Unit = {
-    system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
-  }
-
-  override def afterAll(): Unit = {
-    system.terminate()
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-
-  it should "be able to generate multiple new streams" in {
-    val context: ClientContext = mock[ClientContext]
-    when(context.system).thenReturn(system)
-
-    val app = StreamApp("dsl", context)
-    app.source(List("A"), 1, "")
-    app.source(List("B"), 1, "")
-
-    assert(app.graph.vertices.size == 2)
-  }
-
-  it should "plan the dsl to Processsor(TaskDescription) DAG" in {
-    val context: ClientContext = mock[ClientContext]
-    when(context.system).thenReturn(system)
-
-    val app = StreamApp("dsl", context)
-    val parallism = 3
-    app.source(List("A", "B", "C"), parallism, "").flatMap(Array(_)).reduce(_ 
+ _)
-    val task = app.plan.dag.vertices.iterator.next()
-    assert(task.taskClass == classOf[SourceTask[_, _]].getName)
-    assert(task.parallelism == parallism)
-  }
-
-  it should "produce 3 messages" in {
-    val context: ClientContext = mock[ClientContext]
-    when(context.system).thenReturn(system)
-    val app = StreamApp("dsl", context)
-    val list = List[String](
-      "0",
-      "1",
-      "2"
-    )
-    val producer = app.source(list, 1, "producer").flatMap(Array(_)).reduce(_ 
+ _)
-    val task = app.plan.dag.vertices.iterator.next()
-      /*
-      val task = app.plan.dag.vertices.iterator.map(desc => {
-        LOG.info(s"${desc.taskClass}")
-      })
-      val sum = producer.flatMap(msg => {
-        LOG.info("in flatMap")
-        assert(msg.msg.isInstanceOf[String])
-        val num = msg.msg.asInstanceOf[String].toInt
-        Array(num)
-      }).reduce(_+_)
-      val task = app.plan.dag.vertices.iterator.map(desc => {
-        LOG.info(s"${desc.taskClass}")
-      })
-     */
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamSpec.scala 
b/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamSpec.scala
deleted file mode 100644
index c8a44f6..0000000
--- a/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamSpec.scala
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.dsl
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-import scala.util.{Either, Left, Right}
-
-import akka.actor._
-import org.mockito.Mockito.when
-import org.scalatest._
-import org.scalatest.mock.MockitoSugar
-
-import io.gearpump.Message
-import io.gearpump.cluster.client.ClientContext
-import io.gearpump.cluster.{TestUtil, UserConfig}
-import io.gearpump.partitioner.{CoLocationPartitioner, HashPartitioner}
-import io.gearpump.streaming.dsl.StreamSpec.Join
-import io.gearpump.streaming.dsl.partitioner.GroupByPartitioner
-import io.gearpump.streaming.dsl.plan.OpTranslator._
-import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
-import io.gearpump.util.Graph
-import io.gearpump.util.Graph._
-
-class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with 
MockitoSugar {
-
-  implicit var system: ActorSystem = null
-
-  override def beforeAll(): Unit = {
-    system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
-  }
-
-  override def afterAll(): Unit = {
-    system.terminate()
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-
-  it should "translate the DSL to a DAG" in {
-    val context: ClientContext = mock[ClientContext]
-    when(context.system).thenReturn(system)
-
-    val app = StreamApp("dsl", context)
-
-    val data =
-      """
-        five  four three  two    one
-        five  four three  two
-        five  four three
-        five  four
-        five
-      """
-    val stream = app.source(data.lines.toList, 1, "").
-      flatMap(line => line.split("[\\s]+")).filter(_.nonEmpty).
-      map(word => (word, 1)).
-      groupBy(_._1, parallelism = 2).
-      reduce((left, right) => (left._1, left._2 + right._2)).
-      map[Either[(String, Int), String]](Left(_))
-
-    val query = app.source(List("two"), 1, "").map[Either[(String, Int), 
String]](Right(_))
-    stream.merge(query).process[(String, Int)](classOf[Join], 1)
-
-    val appDescription = app.plan
-
-    val dagTopology = appDescription.dag.mapVertex(_.taskClass).mapEdge { 
(node1, edge, node2) =>
-      edge.partitionerFactory.partitioner.getClass.getName
-    }
-    val expectedDagTopology = getExpectedDagTopology
-
-    
assert(dagTopology.vertices.toSet.equals(expectedDagTopology.vertices.toSet))
-    assert(dagTopology.edges.toSet.equals(expectedDagTopology.edges.toSet))
-  }
-
-  private def getExpectedDagTopology: Graph[String, String] = {
-    val source = classOf[SourceTask[_, _]].getName
-    val group = classOf[GroupByTask[_, _, _]].getName
-    val merge = classOf[TransformTask[_, _]].getName
-    val join = classOf[Join].getName
-
-    val hash = classOf[HashPartitioner].getName
-    val groupBy = classOf[GroupByPartitioner[_, _]].getName
-    val colocation = classOf[CoLocationPartitioner].getName
-
-    val expectedDagTopology = Graph(
-      source ~ groupBy ~> group ~ colocation ~> merge ~ hash ~> join,
-      source ~ hash ~> merge
-    )
-    expectedDagTopology
-  }
-}
-
-object StreamSpec {
-
-  class Join(taskContext: TaskContext, userConf: UserConfig) extends 
Task(taskContext, userConf) {
-
-    var query: String = null
-    override def onStart(startTime: StartTime): Unit = {}
-
-    override def onNext(msg: Message): Unit = {
-      msg.msg match {
-        case Left(wordCount: (String @unchecked, Int @unchecked)) =>
-          if (query != null && wordCount._1 == query) {
-            taskContext.output(new Message(wordCount))
-          }
-
-        case Right(query: String) =>
-          this.query = query
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/io/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala
 
b/streaming/src/test/scala/io/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala
deleted file mode 100644
index 03dd242..0000000
--- 
a/streaming/src/test/scala/io/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.dsl.partitioner
-
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-
-import io.gearpump.Message
-import io.gearpump.streaming.dsl.partitioner.GroupByPartitionerSpec.People
-
-class GroupByPartitionerSpec extends FlatSpec with Matchers with 
BeforeAndAfterAll {
-  it should "use the outpout of groupBy function to do partition" in {
-    val mark = People("Mark", "male")
-    val tom = People("Tom", "male")
-    val michelle = People("Michelle", "female")
-
-    val partitionNum = 10
-    val groupBy = new GroupByPartitioner[People, String](_.gender)
-    assert(groupBy.getPartition(Message(mark), partitionNum)
-      == groupBy.getPartition(Message(tom), partitionNum))
-
-    assert(groupBy.getPartition(Message(mark), partitionNum)
-      != groupBy.getPartition(Message(michelle), partitionNum))
-  }
-}
-
-object GroupByPartitionerSpec {
-  case class People(name: String, gender: String)
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/io/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala
 
b/streaming/src/test/scala/io/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala
deleted file mode 100644
index 62a0f95..0000000
--- 
a/streaming/src/test/scala/io/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.dsl.plan
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.ActorSystem
-import org.mockito.ArgumentCaptor
-import org.mockito.Matchers._
-import org.mockito.Mockito._
-import org.scalatest._
-
-import io.gearpump.Message
-import io.gearpump.cluster.{TestUtil, UserConfig}
-import io.gearpump.streaming.Constants._
-import io.gearpump.streaming.MockUtil
-import io.gearpump.streaming.dsl.CollectionDataSource
-import io.gearpump.streaming.dsl.plan.OpTranslator._
-import io.gearpump.streaming.task.StartTime
-
-class OpTranslatorSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
-
-  "andThen" should "chain multiple single input function" in {
-    val dummy = new DummyInputFunction[String]
-    val split = new FlatMapFunction[String, String](line => line.split("\\s"), 
"split")
-
-    val filter = new FlatMapFunction[String, String](word =>
-      if (word.isEmpty) None else Some(word), "filter")
-
-    val map = new FlatMapFunction[String, Int](word => Some(1), "map")
-
-    val sum = new ReduceFunction[Int]({ (left, right) => left + right }, "sum")
-
-    val all = dummy.andThen(split).andThen(filter).andThen(map).andThen(sum)
-
-    assert(all.description == "split.filter.map.sum")
-
-    val data =
-      """
-      five  four three  two    one
-      five  four three  two
-      five  four three
-      five  four
-      five
-      """
-    val count = all.process(data).toList.last
-    assert(count == 15)
-  }
-
-  "Source" should "iterate over input source and apply attached operator" in {
-
-    val taskContext = MockUtil.mockTaskContext
-
-    val conf = UserConfig.empty
-    val data = "one two three".split("\\s")
-
-    // Source with no transformer
-    val source = new SourceTask[String, String](new 
CollectionDataSource[String](data), None,
-      taskContext, conf)
-    source.onStart(StartTime(0))
-    source.onNext(Message("next"))
-    verify(taskContext, times(1)).output(anyObject())
-
-    // Source with transformer
-    val anotherTaskContext = MockUtil.mockTaskContext
-    val double = new FlatMapFunction[String, String](word => List(word, word), 
"double")
-    val another = new SourceTask(new CollectionDataSource[String](data), 
Some(double),
-      anotherTaskContext, conf)
-    another.onStart(StartTime(0))
-    another.onNext(Message("next"))
-    verify(anotherTaskContext, times(2)).output(anyObject())
-  }
-
-  "GroupByTask" should "group input by groupBy Function and " +
-    "apply attached operator for each group" in {
-
-    val data = "1 2  2  3 3  3"
-
-    var map = Map.empty[String, Int]
-
-    val concat = new ReduceFunction[String]({ (left, right) =>
-      left + right
-    }, "concat")
-
-    implicit val system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
-    val config = UserConfig.empty.withValue[SingleInputFunction[String, 
String]](
-    GEARPUMP_STREAMING_OPERATOR, concat)
-
-    val taskContext = MockUtil.mockTaskContext
-
-    val task = new GroupByTask[String, String, String](input => input, 
taskContext, config)
-    task.onStart(StartTime(0))
-
-    val peopleCaptor = ArgumentCaptor.forClass(classOf[Message])
-
-    data.split("\\s+").foreach { word =>
-      task.onNext(Message(word))
-    }
-    verify(taskContext, times(6)).output(peopleCaptor.capture())
-
-    import scala.collection.JavaConverters._
-
-    val values = peopleCaptor.getAllValues().asScala.map(input => 
input.msg.asInstanceOf[String])
-    assert(values.mkString(",") == "1,2,22,3,33,333")
-    system.terminate()
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-
-  "MergeTask" should "accept two stream and apply the attached operator" in {
-
-    // Source with transformer
-    val taskContext = MockUtil.mockTaskContext
-    val conf = UserConfig.empty
-    val double = new FlatMapFunction[String, String](word => List(word, word), 
"double")
-    val task = new TransformTask[String, String](Some(double), taskContext, 
conf)
-    task.onStart(StartTime(0))
-
-    val data = "1 2  2  3 3  3".split("\\s+")
-
-    data.foreach { input =>
-      task.onNext(Message(input))
-    }
-
-    verify(taskContext, times(data.length * 2)).output(anyObject())
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/executor/ExecutorSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/io/gearpump/streaming/executor/ExecutorSpec.scala 
b/streaming/src/test/scala/io/gearpump/streaming/executor/ExecutorSpec.scala
deleted file mode 100644
index 1d94776..0000000
--- a/streaming/src/test/scala/io/gearpump/streaming/executor/ExecutorSpec.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.executor
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.{ActorSystem, Props}
-import akka.testkit.TestProbe
-import org.mockito.Matchers._
-import org.mockito.Mockito.{times, _}
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-
-import io.gearpump.cluster.appmaster.WorkerInfo
-import io.gearpump.cluster.scheduler.Resource
-import io.gearpump.cluster.worker.WorkerId
-import io.gearpump.cluster.{ExecutorContext, TestUtil, UserConfig}
-import io.gearpump.streaming.AppMasterToExecutor._
-import io.gearpump.streaming.ExecutorToAppMaster.RegisterTask
-import io.gearpump.streaming.appmaster.TaskRegistry.TaskLocations
-import io.gearpump.streaming.executor.TaskLauncherSpec.MockTask
-import io.gearpump.streaming.task.{Subscriber, TaskId}
-import io.gearpump.streaming.{LifeTime, ProcessorDescription}
-import io.gearpump.transport.HostPort
-
-class ExecutorSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
-  val appId = 0
-  val executorId = 0
-  val workerId = WorkerId(0, 0L)
-  var appMaster: TestProbe = null
-  implicit var system: ActorSystem = null
-  val userConf = UserConfig.empty
-
-  override def beforeAll(): Unit = {
-    system = ActorSystem("TaskLauncherSpec", TestUtil.DEFAULT_CONFIG)
-    appMaster = TestProbe()
-  }
-
-  override def afterAll(): Unit = {
-    system.terminate()
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-
-  it should "call launcher to launch task" in {
-    val worker = TestProbe()
-    val workerInfo = WorkerInfo(workerId, worker.ref)
-    val executorContext = ExecutorContext(executorId, workerInfo, appId, "app",
-      appMaster.ref, Resource(2))
-    val taskLauncher = mock(classOf[ITaskLauncher])
-    val executor = system.actorOf(Props(new Executor(executorContext, 
userConf, taskLauncher)))
-    val processor = ProcessorDescription(id = 0, taskClass = 
classOf[MockTask].getName,
-      parallelism = 2)
-    val taskIds = List(TaskId(0, 0), TaskId(0, 1))
-    val launchTasks = LaunchTasks(taskIds, dagVersion = 0, processor, 
List.empty[Subscriber])
-
-    val task = TestProbe()
-    when(taskLauncher.launch(any(), any(), any(), any(), any()))
-      .thenReturn(taskIds.map((_, task.ref)).toMap)
-
-    val client = TestProbe()
-    client.send(executor, launchTasks)
-    client.expectMsg(TasksLaunched)
-
-    verify(taskLauncher, times(1)).launch(any(), any(), any(), any(), any())
-
-    executor ! RegisterTask(TaskId(0, 0), executorId, HostPort("localhost:80"))
-    executor ! RegisterTask(TaskId(0, 1), executorId, HostPort("localhost:80"))
-
-    executor ! TaskRegistered(TaskId(0, 0), 0, 0)
-
-    task.expectMsgType[TaskRegistered]
-
-    executor ! TaskRegistered(TaskId(0, 1), 0, 0)
-
-    task.expectMsgType[TaskRegistered]
-
-    executor ! TaskLocationsReady(TaskLocations(Map.empty), dagVersion = 0)
-    executor ! StartAllTasks(dagVersion = 0)
-
-    task.expectMsgType[StartTask]
-    task.expectMsgType[StartTask]
-
-    val changeTasks = ChangeTasks(taskIds, dagVersion = 1, life = LifeTime(0, 
Long.MaxValue),
-      List.empty[Subscriber])
-
-    client.send(executor, changeTasks)
-    client.expectMsgType[TasksChanged]
-
-    executor ! TaskLocationsReady(TaskLocations(Map.empty), 1)
-    executor ! StartAllTasks(dagVersion = 1)
-
-    task.expectMsgType[ChangeTask]
-    task.expectMsgType[ChangeTask]
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/executor/TaskArgumentStoreSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/io/gearpump/streaming/executor/TaskArgumentStoreSpec.scala
 
b/streaming/src/test/scala/io/gearpump/streaming/executor/TaskArgumentStoreSpec.scala
deleted file mode 100644
index c97ae4c..0000000
--- 
a/streaming/src/test/scala/io/gearpump/streaming/executor/TaskArgumentStoreSpec.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.executor
-
-import org.scalatest._
-
-import io.gearpump.streaming.executor.Executor.TaskArgumentStore
-import io.gearpump.streaming.executor.TaskLauncher.TaskArgument
-import io.gearpump.streaming.task.TaskId
-
-class TaskArgumentStoreSpec extends FlatSpec with Matchers with 
BeforeAndAfterEach {
-  it should "retain all history of taskArgument" in {
-    val version0 = TaskArgument(0, null, null)
-    val version2 = version0.copy(dagVersion = 2)
-    val store = new TaskArgumentStore
-    val task = TaskId(0, 0)
-    store.add(task, version0)
-    store.add(task, version2)
-
-    // Should return a version which is same or older than expected version
-    assert(store.get(dagVersion = 1, task) == Some(version0))
-    assert(store.get(dagVersion = 0, task) == Some(version0))
-    assert(store.get(dagVersion = 2, task) == Some(version2))
-
-    store.removeObsoleteVersion()
-    assert(store.get(dagVersion = 1, task) == None)
-    assert(store.get(dagVersion = 0, task) == None)
-    assert(store.get(dagVersion = 2, task) == Some(version2))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/executor/TaskLauncherSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/io/gearpump/streaming/executor/TaskLauncherSpec.scala
 
b/streaming/src/test/scala/io/gearpump/streaming/executor/TaskLauncherSpec.scala
deleted file mode 100644
index c135c5b..0000000
--- 
a/streaming/src/test/scala/io/gearpump/streaming/executor/TaskLauncherSpec.scala
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.executor
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.{Actor, ActorSystem}
-import akka.testkit.TestProbe
-import org.scalatest._
-
-import io.gearpump.cluster.{TestUtil, UserConfig}
-import io.gearpump.serializer.SerializationFramework
-import io.gearpump.streaming.ProcessorDescription
-import io.gearpump.streaming.executor.TaskLauncher.TaskArgument
-import io.gearpump.streaming.executor.TaskLauncherSpec.{MockTask, 
MockTaskActor}
-import io.gearpump.streaming.task.{Task, TaskContext, TaskContextData, TaskId, 
TaskWrapper}
-
-class TaskLauncherSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
-  val appId = 0
-  val executorId = 0
-  var appMaster: TestProbe = null
-  implicit var system: ActorSystem = null
-  val userConf = UserConfig.empty
-
-  override def beforeAll(): Unit = {
-    system = ActorSystem("TaskLauncherSpec", TestUtil.DEFAULT_CONFIG)
-    appMaster = TestProbe()
-  }
-
-  override def afterAll(): Unit = {
-    system.terminate()
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-
-  it should "able to launch tasks" in {
-    val launcher = new TaskLauncher(appId, "app", executorId, appMaster.ref,
-      userConf, classOf[MockTaskActor])
-    val taskIds = List(TaskId(0, 0), TaskId(0, 1))
-    val processor = ProcessorDescription(id = 0, taskClass = 
classOf[MockTask].getName,
-      parallelism = 2)
-    val argument = TaskArgument(0, processor, null)
-
-    val tasks = launcher.launch(taskIds, argument, system, null,
-      "gearpump.shared-thread-pool-dispatcher")
-    tasks.keys.toSet shouldBe taskIds.toSet
-  }
-}
-
-object TaskLauncherSpec {
-  class MockTaskActor(
-      val taskId: TaskId,
-      val taskContextData : TaskContextData,
-      userConf : UserConfig,
-      val task: TaskWrapper,
-      serializer: SerializationFramework) extends Actor {
-    def receive: Receive = null
-  }
-
-  class MockTask(taskContext: TaskContext, userConf: UserConfig)
-    extends Task(taskContext, userConf) {
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/metrics/ProcessorAggregatorSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/io/gearpump/streaming/metrics/ProcessorAggregatorSpec.scala
 
b/streaming/src/test/scala/io/gearpump/streaming/metrics/ProcessorAggregatorSpec.scala
deleted file mode 100644
index da1ca9f..0000000
--- 
a/streaming/src/test/scala/io/gearpump/streaming/metrics/ProcessorAggregatorSpec.scala
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.metrics
-
-import scala.collection.JavaConverters._
-import scala.util.Random
-
-import org.scalatest.{FlatSpec, Matchers}
-
-import io.gearpump.cluster.ClientToMaster.ReadOption
-import io.gearpump.cluster.MasterToClient.HistoryMetricsItem
-import io.gearpump.metrics.Metrics.{Gauge, Histogram, Meter}
-import io.gearpump.streaming.metrics.ProcessorAggregator.{AggregatorFactory, 
HistogramAggregator, MeterAggregator, MultiLayerMap}
-import io.gearpump.streaming.task.TaskId
-import io.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
-
-class ProcessorAggregatorSpec extends FlatSpec with Matchers {
-
-  "MultiLayerMap" should "maintain multiple layers HashMap" in {
-    val layers = 3
-    val map = new MultiLayerMap[String](layers)
-
-    assert(map.get(layer = 0, "key") == null)
-
-    // Illegal, handle safely
-    assert(map.get(layer = 10, "key") == null)
-
-    map.put(layer = 0, "key", "value")
-    assert(map.get(layer = 0, "key") == "value")
-
-    map.put(layer = 1, "key2", "value2")
-    map.put(layer = 2, "key3", "value3")
-    map.put(layer = 2, "key4", "value4")
-
-    // Illegal, should be ignored
-    map.put(layer = 4, "key5", "value5")
-
-    assert(map.size == 4)
-    assert(map.valueIterator.asScala.toSet == Set("value", "value2", "value3", 
"value4"))
-  }
-
-  "HistogramAggregator" should "aggregate by calculating average" in {
-    val aggregator = new HistogramAggregator("processor")
-
-    val a = Histogram("processor.task1", 1, 2, 3, 4, 5, 6)
-    val b = Histogram("processor.task2", 5, 6, 7, 8, 9, 10)
-    val expect = Histogram("processor.task2", 3, 4, 5, 6, 7, 8)
-
-    val olderTime = 100
-    val newerTime = 200
-
-    aggregator.aggregate(HistoryMetricsItem(time = newerTime, a))
-    aggregator.aggregate(HistoryMetricsItem(time = olderTime, b))
-
-    val result = aggregator.result
-
-    // Picks old time as aggregated time
-    assert(result.time == olderTime)
-
-    // Does average
-    val check = result.value.asInstanceOf[Histogram]
-
-    assert(check.mean - expect.mean < 0.01)
-    assert(check.stddev - expect.stddev < 0.01)
-    assert(check.median - expect.median < 0.01)
-    assert(check.p95 - expect.p95 < 0.01)
-    assert(check.p99 - expect.p99 < 0.01)
-    assert(check.p999 - expect.p999 < 0.01)
-  }
-
-  "MeterAggregator" should "aggregate by summing" in {
-    val aggregator = new MeterAggregator("processor")
-
-    val a = Meter("processor.task1", count = 1, 1, 3, "s")
-    val b = Meter("processor.task2", count = 2, 5, 7, "s")
-    val expect = Meter("processor.task2", count = 3, 6, 10, "s")
-
-    val olderTime = 100
-    val newerTime = 200
-
-    aggregator.aggregate(HistoryMetricsItem(time = newerTime, a))
-    aggregator.aggregate(HistoryMetricsItem(time = olderTime, b))
-
-    val result = aggregator.result
-
-    // Picks old time
-    assert(result.time == olderTime)
-
-    // Does summing
-    val check = result.value.asInstanceOf[Meter]
-
-    assert(check.count == expect.count)
-    assert(check.m1 - expect.m1 < 0.01)
-    assert(check.meanRate - expect.meanRate < 0.01)
-    assert(check.rateUnit == expect.rateUnit)
-  }
-
-  "AggregatorFactory" should "create aggregator" in {
-    val factory = new AggregatorFactory()
-
-    val a = Meter("processor.task1", count = 1, 1, 3, "s")
-    val b = Histogram("processor.task1", 1, 2, 3, 4, 5, 6)
-
-    val aggegator1 = factory.create(HistoryMetricsItem(time = 0, a), "name1")
-    assert(aggegator1.isInstanceOf[MeterAggregator])
-
-    val aggegator2 = factory.create(HistoryMetricsItem(time = 0, b), "name2")
-    assert(aggegator2.isInstanceOf[HistogramAggregator])
-  }
-
-  "ProcessorAggregator" should "aggregate on different read options" in {
-    val hours = 2 // Maintains 2 hours history
-    val seconds = 2 // Maintains 2 seconds recent data
-    val taskCount = 5 // For each processor
-    val metricCount = 100 // For each task, have metricCount metrics
-    val range = new HistoryMetricsConfig(hours, hours / 2 * 3600 * 1000,
-      seconds, seconds / 2 * 1000)
-
-    val aggregator = new ProcessorAggregator(range)
-
-    def count(value: Int): Int = value
-
-    def inputs(timeRange: Long): List[HistoryMetricsItem] = {
-      (0 until taskCount).map(TaskId(processorId = 0, _))
-        .flatMap(histogram(_, "receiveLatency", timeRange, 
metricCount)).toList ++
-        (0 until taskCount).map(TaskId(processorId = 0, _))
-          .flatMap(histogram(_, "processTime", timeRange, metricCount)).toList 
++
-        (0 until taskCount).map(TaskId(processorId = 1, _))
-          .flatMap(histogram(_, "receiveLatency", timeRange, 
metricCount)).toList ++
-        (0 until taskCount).map(TaskId(processorId = 1, _))
-          .flatMap(histogram(_, "processTime", timeRange, metricCount)).toList 
++
-        (0 until taskCount).map(TaskId(processorId = 0, _))
-          .flatMap(meter(_, "sendThroughput", timeRange, metricCount)).toList 
++
-        (0 until taskCount).map(TaskId(processorId = 0, _))
-          .flatMap(meter(_, "receiveThroughput", timeRange, 
metricCount)).toList ++
-        (0 until taskCount).map(TaskId(processorId = 1, _))
-          .flatMap(meter(_, "sendThroughput", timeRange, metricCount)).toList 
++
-        (0 until taskCount).map(TaskId(processorId = 1, _))
-          .flatMap(meter(_, "receiveThroughput", timeRange, 
metricCount)).toList
-    }
-
-    def check(list: List[HistoryMetricsItem], countMap: Map[String, Int]): 
Boolean = {
-      val nameCount = list.map(_.value.name).groupBy(key => 
key).mapValues(_.size).toList.toMap
-      nameCount sameElements countMap
-    }
-
-    // Aggregates on processor and meterNames,
-    val input = inputs(Long.MaxValue)
-    val readLatest = aggregator.aggregate(ReadOption.ReadLatest,
-      input.iterator, now = Long.MaxValue)
-    assert(readLatest.size == 8) // 2 processor * 4 metrics type
-    assert(check(readLatest, Map(
-      "app0.processor0:receiveLatency" -> count(1),
-      "app0.processor0:processTime" -> count(1),
-      "app0.processor0:sendThroughput" -> count(1),
-      "app0.processor0:receiveThroughput" -> count(1),
-      "app0.processor1:receiveLatency" -> count(1),
-      "app0.processor1:processTime" -> count(1),
-      "app0.processor1:sendThroughput" -> count(1),
-      "app0.processor1:receiveThroughput" -> count(1)
-    )))
-
-    // Aggregates on processor and meterNames and time range,
-    val readRecent = aggregator.aggregate(ReadOption.ReadRecent,
-      inputs(seconds * 1000).iterator, now = seconds * 1000)
-    assert(readRecent.size == 16) // 2 processor * 4 metrics type * 2 time 
range
-    assert(check(readRecent, Map(
-      "app0.processor0:receiveLatency" -> count(2),
-      "app0.processor0:processTime" -> count(2),
-      "app0.processor0:sendThroughput" -> count(2),
-      "app0.processor0:receiveThroughput" -> count(2),
-      "app0.processor1:receiveLatency" -> count(2),
-      "app0.processor1:processTime" -> count(2),
-      "app0.processor1:sendThroughput" -> count(2),
-      "app0.processor1:receiveThroughput" -> count(2)
-    )))
-
-    // Aggregates on processor and meterNames and time range,
-    val readHistory = aggregator.aggregate(ReadOption.ReadHistory,
-      inputs(hours * 3600 * 1000).iterator, now = hours * 3600 * 1000)
-    assert(readHistory.size == 16) // 2 processor * 4 metrics type * 2 time 
ranges
-    assert(check(readHistory, Map(
-      "app0.processor0:receiveLatency" -> count(2),
-      "app0.processor0:processTime" -> count(2),
-      "app0.processor0:sendThroughput" -> count(2),
-      "app0.processor0:receiveThroughput" -> count(2),
-      "app0.processor1:receiveLatency" -> count(2),
-      "app0.processor1:processTime" -> count(2),
-      "app0.processor1:sendThroughput" -> count(2),
-      "app0.processor1:receiveThroughput" -> count(2)
-    )))
-  }
-
-  private def histogram(
-      taskId: TaskId, metricName: String = "latency", timeRange: Long = 
Long.MaxValue,
-      repeat: Int = 1): List[HistoryMetricsItem] = {
-    val random = new Random()
-    (0 until repeat).map { _ =>
-      new HistoryMetricsItem(Math.abs(random.nextLong() % timeRange),
-        new 
Histogram(s"app0.processor${taskId.processorId}.task${taskId.index}:$metricName",
-          Math.abs(random.nextDouble()),
-          Math.abs(random.nextDouble()),
-          Math.abs(random.nextDouble()),
-          Math.abs(random.nextDouble()),
-          Math.abs(random.nextDouble()),
-          Math.abs(random.nextDouble())
-        ))
-    }.toList
-  }
-
-  private def meter(taskId: TaskId, metricName: String, timeRange: Long, 
repeat: Int)
-    : List[HistoryMetricsItem] = {
-    val random = new Random()
-    (0 until repeat).map { _ =>
-      new HistoryMetricsItem(Math.abs(random.nextLong() % timeRange),
-        new 
Meter(s"app0.processor${taskId.processorId}.task${taskId.index}:$metricName",
-          Math.abs(random.nextInt()),
-          Math.abs(random.nextDouble()),
-          Math.abs(random.nextDouble()),
-          "event/s")
-      )
-    }.toList
-  }
-
-  "ProcessorAggregator" should "handle smoothly for unsupported metric type 
and " +
-    "error formatted metric name" in {
-    val invalid = List(
-      // Unsupported metric type
-      HistoryMetricsItem(0, new Gauge("app0.processor0.task0:gauge", 100)),
-
-      // Wrong format: should be app0.processor0.task0:throughput
-      HistoryMetricsItem(0, new Meter("app0.processor0.task0/throughput", 100, 
0, 0, ""))
-    )
-
-    val valid = histogram(TaskId(0, 0), repeat = 10)
-
-    val aggregator = new ProcessorAggregator(new HistoryMetricsConfig(0, 0, 0, 
0))
-    val result = aggregator.aggregate(ReadOption.ReadLatest, (valid ++ 
invalid).toIterator,
-      now = Long.MaxValue)
-
-    // For one taskId, will only use one data point.
-    assert(result.size == 1)
-    assert(result.head.value.name == "app0.processor0:latency")
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/metrics/TaskFilterAggregatorSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/io/gearpump/streaming/metrics/TaskFilterAggregatorSpec.scala
 
b/streaming/src/test/scala/io/gearpump/streaming/metrics/TaskFilterAggregatorSpec.scala
deleted file mode 100644
index f654eb9..0000000
--- 
a/streaming/src/test/scala/io/gearpump/streaming/metrics/TaskFilterAggregatorSpec.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.metrics
-
-import scala.util.Random
-
-import org.scalatest.{FlatSpec, Matchers}
-
-import io.gearpump.cluster.MasterToClient.HistoryMetricsItem
-import io.gearpump.metrics.Metrics.{Histogram, Meter}
-import io.gearpump.streaming.metrics.TaskFilterAggregator.Options
-import io.gearpump.streaming.task.TaskId
-
-class TaskFilterAggregatorSpec extends FlatSpec with Matchers {
-
-  def metric(taskId: TaskId): HistoryMetricsItem = {
-    val random = new Random()
-    new HistoryMetricsItem(Math.abs(random.nextLong()),
-      new 
Histogram(s"app0.processor${taskId.processorId}.task${taskId.index}:latency",
-        0, 0, 0, 0, 0, 0))
-  }
-
-  it should "filter data on processor range, task range combination" in {
-    val inputs = (0 until 10).flatMap { processor =>
-      (0 until 10).map { task =>
-        metric(TaskId(processor, task))
-      }
-    }.toList
-
-    val globalLimit = 10
-    val aggregator = new TaskFilterAggregator(globalLimit)
-
-    // Limit not met, return all matches in this matrix
-    var options = new Options(limit = 20, startTask = 3, endTask = 6,
-      startProcessor = 3, endProcessor = 6)
-    assert(aggregator.aggregate(options, inputs.iterator).size == 9)
-
-    // User limit reached
-    options = new Options(limit = 3, startTask = 3, endTask = 5,
-      startProcessor = 3, endProcessor = 5)
-    assert(aggregator.aggregate(options, inputs.iterator).size == 3)
-
-    // Global limit reached
-    options = new Options(limit = 20, startTask = 3, endTask = 8,
-      startProcessor = 3, endProcessor = 8)
-    assert(aggregator.aggregate(options, inputs.iterator).size == globalLimit)
-  }
-
-  it should "reject wrong format options" in {
-    val options = Map(TaskFilterAggregator.StartTask -> "not a number")
-    assert(Options.parse(options) == null)
-  }
-
-  it should "skip wrong format metrics" in {
-    val invalid = List {
-      // Wrong format: should be app0.processor0.task0:throughput
-      HistoryMetricsItem(0, new Meter("app0.processor0.task0/throughput", 100, 
0, 0, ""))
-    }
-    val options = Options.acceptAll
-
-    val aggregator = new TaskFilterAggregator(Int.MaxValue)
-    assert(aggregator.aggregate(options, invalid.iterator).size == 0)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/source/DefaultTimeStampFilterSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/io/gearpump/streaming/source/DefaultTimeStampFilterSpec.scala
 
b/streaming/src/test/scala/io/gearpump/streaming/source/DefaultTimeStampFilterSpec.scala
deleted file mode 100644
index 752a8f4..0000000
--- 
a/streaming/src/test/scala/io/gearpump/streaming/source/DefaultTimeStampFilterSpec.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.source
-
-import org.scalacheck.Gen
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-import io.gearpump.{Message, TimeStamp}
-
-class DefaultTimeStampFilterSpec extends PropSpec with PropertyChecks with 
Matchers {
-  property("DefaultTimeStampFilter should filter message against give 
timestamp") {
-    val timestampGen = Gen.chooseNum[Long](0L, 1000L)
-    val messageGen = for {
-      msg <- Gen.alphaStr
-      time <- timestampGen
-    } yield Message(msg, time)
-
-    val filter = new DefaultTimeStampFilter()
-
-    forAll(timestampGen, messageGen) {
-      (predicate: TimeStamp, message: Message) =>
-        if (message.timestamp >= predicate) {
-          filter.filter(message, predicate) shouldBe Some(message)
-        } else {
-          filter.filter(message, predicate) shouldBe None
-        }
-
-        filter.filter(null, predicate) shouldBe None
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/state/impl/CheckpointManagerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/io/gearpump/streaming/state/impl/CheckpointManagerSpec.scala
 
b/streaming/src/test/scala/io/gearpump/streaming/state/impl/CheckpointManagerSpec.scala
deleted file mode 100644
index ea670f6..0000000
--- 
a/streaming/src/test/scala/io/gearpump/streaming/state/impl/CheckpointManagerSpec.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.state.impl
-
-import org.mockito.Mockito._
-import org.mockito.{Matchers => MockitoMatchers}
-import org.scalacheck.Gen
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-import io.gearpump.TimeStamp
-import io.gearpump.streaming.transaction.api.CheckpointStore
-
-class CheckpointManagerSpec extends PropSpec with PropertyChecks with Matchers 
with MockitoSugar {
-
-  val timestampGen = Gen.chooseNum[Long](0L, 1000L)
-  val checkpointIntervalGen = Gen.chooseNum[Long](100L, 10000L)
-  property("CheckpointManager should recover from CheckpointStore") {
-    forAll(timestampGen, checkpointIntervalGen) {
-      (timestamp: TimeStamp, checkpointInterval: Long) =>
-        val checkpointStore = mock[CheckpointStore]
-        val checkpointManager =
-          new CheckpointManager(checkpointInterval, checkpointStore)
-        checkpointManager.recover(timestamp)
-
-        verify(checkpointStore).recover(timestamp)
-    }
-  }
-
-  property("CheckpointManager should write checkpoint to CheckpointStore") {
-    val checkpointGen = Gen.alphaStr.map(_.getBytes("UTF-8"))
-    forAll(timestampGen, checkpointIntervalGen, checkpointGen) {
-      (timestamp: TimeStamp, checkpointInterval: Long, checkpoint: 
Array[Byte]) =>
-        val checkpointStore = mock[CheckpointStore]
-        val checkpointManager =
-          new CheckpointManager(checkpointInterval, checkpointStore)
-        checkpointManager.checkpoint(timestamp, checkpoint)
-
-        verify(checkpointStore).persist(timestamp, checkpoint)
-    }
-  }
-
-  property("CheckpointManager should close CheckpointStore") {
-    forAll(checkpointIntervalGen) {
-      (checkpointInterval: Long) =>
-        val checkpointStore = mock[CheckpointStore]
-        val checkpointManager =
-          new CheckpointManager(checkpointInterval, checkpointStore)
-        checkpointManager.close()
-        verify(checkpointStore).close()
-    }
-  }
-
-  property("CheckpointManager should update checkpoint time according to max 
message timestamp") {
-    forAll(timestampGen, checkpointIntervalGen) {
-      (timestamp: TimeStamp, checkpointInterval: Long) =>
-        val checkpointStore = mock[CheckpointStore]
-        val checkpointManager =
-          new CheckpointManager(checkpointInterval, checkpointStore)
-        checkpointManager.update(timestamp)
-        checkpointManager.getMaxMessageTime shouldBe timestamp
-
-        val checkpointTime = checkpointManager.getCheckpointTime.get
-        timestamp should (be < checkpointTime and be >= (checkpointTime - 
checkpointInterval))
-
-        checkpointManager.checkpoint(checkpointTime, Array.empty[Byte])
-        verify(checkpointStore).persist(MockitoMatchers.eq(checkpointTime),
-          MockitoMatchers.anyObject[Array[Byte]]())
-        checkpointManager.getCheckpointTime shouldBe empty
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/state/impl/InMemoryCheckpointStoreSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/io/gearpump/streaming/state/impl/InMemoryCheckpointStoreSpec.scala
 
b/streaming/src/test/scala/io/gearpump/streaming/state/impl/InMemoryCheckpointStoreSpec.scala
deleted file mode 100644
index 4bbad3b..0000000
--- 
a/streaming/src/test/scala/io/gearpump/streaming/state/impl/InMemoryCheckpointStoreSpec.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.state.impl
-
-import org.scalacheck.Gen
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-class InMemoryCheckpointStoreSpec extends PropSpec with PropertyChecks with 
Matchers {
-
-  property("InMemoryCheckpointStore should provide read / write checkpoint") {
-    val timestampGen = Gen.chooseNum[Long](1, 1000)
-    val checkpointGen = Gen.alphaStr.map(_.getBytes("UTF-8"))
-    forAll(timestampGen, checkpointGen) { (timestamp: Long, checkpoint: 
Array[Byte]) =>
-      val store = new InMemoryCheckpointStore
-      store.recover(timestamp) shouldBe None
-      store.persist(timestamp, checkpoint)
-      store.recover(timestamp) shouldBe Some(checkpoint)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/state/impl/NonWindowStateSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/io/gearpump/streaming/state/impl/NonWindowStateSpec.scala
 
b/streaming/src/test/scala/io/gearpump/streaming/state/impl/NonWindowStateSpec.scala
deleted file mode 100644
index b20b666..0000000
--- 
a/streaming/src/test/scala/io/gearpump/streaming/state/impl/NonWindowStateSpec.scala
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.state.impl
-
-import scala.util.Success
-
-import org.mockito.Mockito._
-import org.scalacheck.Gen
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-import io.gearpump.TimeStamp
-import io.gearpump.streaming.state.api.{Monoid, Serializer}
-
-class NonWindowStateSpec extends PropSpec with PropertyChecks with Matchers 
with MockitoSugar {
-
-  val longGen = Gen.chooseNum[Long](100L, System.currentTimeMillis())
-
-  property("NonWindowState should recover checkpointed state at given 
timestamp") {
-    forAll(longGen) {
-      (timestamp: TimeStamp) =>
-        val monoid = mock[Monoid[AnyRef]]
-        val serializer = mock[Serializer[AnyRef]]
-        val bytes = Array.empty[Byte]
-        val checkpoint = mock[AnyRef]
-        val zero = mock[AnyRef]
-        when(monoid.zero).thenReturn(zero, zero)
-        when(monoid.plus(zero, zero)).thenReturn(zero, Nil: _*)
-        when(monoid.plus(checkpoint, zero)).thenReturn(checkpoint, Nil: _*)
-
-        val state = new NonWindowState[AnyRef](monoid, serializer)
-        state.left shouldBe zero
-        state.right shouldBe zero
-        state.get shouldBe Some(zero)
-
-        when(serializer.deserialize(bytes)).thenReturn(Success(checkpoint))
-        state.recover(timestamp, bytes)
-
-        state.left shouldBe checkpoint
-        state.right shouldBe zero
-        state.get shouldBe Some(checkpoint)
-    }
-  }
-
-  property("NonWindowState checkpoints state") {
-    forAll(longGen) {
-      (checkpointTime: TimeStamp) =>
-        val monoid = mock[Monoid[AnyRef]]
-        val serializer = mock[Serializer[AnyRef]]
-
-        val left = mock[AnyRef]
-        val right = mock[AnyRef]
-        val zero = mock[AnyRef]
-        val plus = mock[AnyRef]
-
-        when(monoid.zero).thenReturn(zero, zero)
-        when(monoid.plus(zero, zero)).thenReturn(zero, Nil: _*)
-
-        val state = new NonWindowState[AnyRef](monoid, serializer)
-        state.left shouldBe zero
-        state.right shouldBe zero
-        state.get shouldBe Some(zero)
-
-        state.left = left
-        state.right = right
-
-        when(monoid.zero).thenReturn(zero, Nil: _*)
-        when(monoid.plus(left, right)).thenReturn(plus, Nil: _*)
-        when(monoid.plus(plus, zero)).thenReturn(plus, Nil: _*)
-        state.checkpoint()
-
-        verify(serializer).serialize(left)
-        state.left shouldBe plus
-        state.right shouldBe zero
-        state.get shouldBe Some(plus)
-    }
-  }
-
-  property("NonWindowState updates state") {
-    forAll(longGen) {
-      (checkpointTime: TimeStamp) =>
-        val monoid = mock[Monoid[AnyRef]]
-        val serializer = mock[Serializer[AnyRef]]
-
-        val left = mock[AnyRef]
-        val right = mock[AnyRef]
-        val zero = mock[AnyRef]
-        val plus = mock[AnyRef]
-
-        when(monoid.zero).thenReturn(zero, zero)
-        when(monoid.plus(zero, zero)).thenReturn(zero, Nil: _*)
-
-        val state = new NonWindowState[AnyRef](monoid, serializer)
-        state.left shouldBe zero
-        state.right shouldBe zero
-        state.get shouldBe Some(zero)
-
-        when(monoid.plus(zero, left)).thenReturn(left, Nil: _*)
-        when(monoid.plus(left, zero)).thenReturn(left, Nil: _*)
-        state.setNextCheckpointTime(checkpointTime)
-        state.update(checkpointTime - 1, left)
-        state.left shouldBe left
-        state.right shouldBe zero
-        state.get shouldBe Some(left)
-
-        when(monoid.plus(zero, right)).thenReturn(right, Nil: _*)
-        when(monoid.plus(left, right)).thenReturn(plus, Nil: _*)
-        state.setNextCheckpointTime(checkpointTime)
-        state.update(checkpointTime + 1, right)
-        state.left shouldBe left
-        state.right shouldBe right
-        state.get shouldBe Some(plus)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/state/impl/WindowSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/io/gearpump/streaming/state/impl/WindowSpec.scala 
b/streaming/src/test/scala/io/gearpump/streaming/state/impl/WindowSpec.scala
deleted file mode 100644
index e12ec9b..0000000
--- a/streaming/src/test/scala/io/gearpump/streaming/state/impl/WindowSpec.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.state.impl
-
-import org.scalacheck.Gen
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-import io.gearpump.TimeStamp
-
-class WindowSpec extends PropSpec with PropertyChecks with Matchers with 
MockitoSugar {
-
-  val windowSizeGen = Gen.chooseNum[Long](1L, 1000L)
-  val windowStepGen = Gen.chooseNum[Long](1L, 1000L)
-  val timestampGen = Gen.chooseNum[Long](0L, 1000L)
-  property("Window should only slide when time passes window end") {
-    forAll(timestampGen, windowSizeGen, windowStepGen) {
-      (timestamp: TimeStamp, windowSize: Long, windowStep: Long) =>
-        val window = new Window(windowSize, windowStep)
-        window.shouldSlide shouldBe false
-        window.update(timestamp)
-        window.shouldSlide shouldBe timestamp >= windowSize
-    }
-  }
-
-  property("Window should slide by one or to given timestamp") {
-    forAll(timestampGen, windowSizeGen, windowStepGen) {
-      (timestamp: TimeStamp, windowSize: Long, windowStep: Long) =>
-        val window = new Window(windowSize, windowStep)
-        window.range shouldBe(0L, windowSize)
-
-        window.slideOneStep()
-        window.range shouldBe(windowStep, windowSize + windowStep)
-
-        window.slideTo(timestamp)
-        val (startTime, endTime) = window.range
-        if (windowStep > windowSize) {
-          timestamp should (be >= startTime and be < (startTime + windowStep))
-        } else {
-          timestamp should (be >= startTime and be < endTime)
-        }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/state/impl/WindowStateSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/io/gearpump/streaming/state/impl/WindowStateSpec.scala
 
b/streaming/src/test/scala/io/gearpump/streaming/state/impl/WindowStateSpec.scala
deleted file mode 100644
index bc79ff6..0000000
--- 
a/streaming/src/test/scala/io/gearpump/streaming/state/impl/WindowStateSpec.scala
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.state.impl
-
-import scala.collection.immutable.TreeMap
-import scala.util.Success
-
-import org.mockito.Mockito._
-import org.scalacheck.Gen
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-import io.gearpump._
-import io.gearpump.streaming.MockUtil
-import io.gearpump.streaming.state.api.{Group, Serializer}
-
-class WindowStateSpec extends PropSpec with PropertyChecks with Matchers with 
MockitoSugar {
-
-  val longGen = Gen.chooseNum[Long](100L, 10000L)
-
-  val intervalGen = for {
-    st <- longGen
-    et <- Gen.chooseNum[Long](st + 1, 100000L)
-  } yield Interval(st, et)
-
-  property("WindowState init should recover checkpointed state") {
-    forAll(intervalGen) {
-      (interval: Interval) =>
-        val window = mock[Window]
-        val taskContext = MockUtil.mockTaskContext
-        val group = mock[Group[AnyRef]]
-        val serializer = mock[Serializer[TreeMap[Interval, AnyRef]]]
-
-        val timestamp = interval.startTime
-        val zero = mock[AnyRef]
-        val bytes = Array.empty[Byte]
-        val data = mock[AnyRef]
-        val checkpoint = TreeMap(interval -> data)
-        when(group.zero).thenReturn(zero, zero)
-        when(group.plus(zero, data)).thenReturn(data, Nil: _*)
-        when(group.plus(data, zero)).thenReturn(data, Nil: _*)
-        when(group.plus(zero, zero)).thenReturn(zero, Nil: _*)
-        when(serializer.deserialize(bytes)).thenReturn(Success(checkpoint))
-
-        val state = new WindowState[AnyRef](group, serializer, taskContext, 
window)
-        state.left shouldBe zero
-        state.right shouldBe zero
-        state.get shouldBe Some(zero)
-
-        state.recover(timestamp, bytes)
-
-        state.left shouldBe data
-        state.right shouldBe zero
-        state.get shouldBe Some(data)
-        state.getIntervalStates(interval.startTime, interval.endTime) shouldBe 
checkpoint
-    }
-  }
-
-  property("WindowState checkpoints") {
-    forAll(longGen) { (checkpointTime: TimeStamp) =>
-      val window = mock[Window]
-      val taskContext = MockUtil.mockTaskContext
-      val group = mock[Group[AnyRef]]
-      val serializer = mock[Serializer[TreeMap[Interval, AnyRef]]]
-
-      val zero = mock[AnyRef]
-      val left = mock[AnyRef]
-      val right = mock[AnyRef]
-      val plus = mock[AnyRef]
-
-      when(group.zero).thenReturn(zero, zero)
-      when(group.plus(zero, zero)).thenReturn(zero, Nil: _*)
-      val state = new WindowState[AnyRef](group, serializer, taskContext, 
window)
-      state.left shouldBe zero
-      state.right shouldBe zero
-      state.get shouldBe Some(zero)
-
-      val start = checkpointTime - 1
-      val end = checkpointTime + 1
-      val size = end - start
-      val step = 1L
-
-      when(window.range).thenReturn((start, end))
-      when(window.windowSize).thenReturn(size)
-      when(window.windowStep).thenReturn(step)
-      when(group.zero).thenReturn(zero, zero)
-      when(group.plus(zero, left)).thenReturn(left, Nil: _*)
-      when(group.plus(zero, right)).thenReturn(right, Nil: _*)
-      when(group.plus(left, right)).thenReturn(plus, Nil: _*)
-
-      state.left = left
-      state.updateIntervalStates(start, left, checkpointTime)
-      state.right = right
-      state.updateIntervalStates(checkpointTime, right, checkpointTime)
-
-      state.setNextCheckpointTime(checkpointTime)
-      state.checkpoint()
-
-      state.left shouldBe plus
-      state.right shouldBe zero
-      verify(serializer).serialize(TreeMap(Interval(start, checkpointTime) -> 
left))
-    }
-  }
-
-  property("WindowState updates state") {
-    forAll(longGen) { (checkpointTime: TimeStamp) =>
-      val window = mock[Window]
-      val taskContext = MockUtil.mockTaskContext
-      val group = mock[Group[AnyRef]]
-      val serializer = mock[Serializer[TreeMap[Interval, AnyRef]]]
-
-      val zero = mock[AnyRef]
-      val left = mock[AnyRef]
-      val right = mock[AnyRef]
-      val plus = mock[AnyRef]
-
-      when(group.zero).thenReturn(zero, zero)
-      val state = new WindowState[AnyRef](group, serializer, taskContext, 
window)
-
-      val start = checkpointTime - 1
-      val end = checkpointTime + 1
-      val size = end - start
-      val step = 1L
-
-      when(window.range).thenReturn((start, end))
-      when(window.windowSize).thenReturn(size)
-      when(window.windowStep).thenReturn(step)
-      when(window.shouldSlide).thenReturn(false)
-      when(group.plus(zero, left)).thenReturn(left, left)
-      when(group.plus(left, zero)).thenReturn(left, Nil: _*)
-      when(taskContext.upstreamMinClock).thenReturn(0L)
-
-      // Time < checkpointTime
-      // Update left in current window
-      state.setNextCheckpointTime(checkpointTime)
-      state.update(start, left)
-
-      verify(window).update(0L)
-      state.left shouldBe left
-      state.right shouldBe zero
-      state.get shouldBe Some(left)
-      state.getIntervalStates(start, end) shouldBe TreeMap(Interval(start, 
checkpointTime) -> left)
-
-      when(window.range).thenReturn((start, end))
-      when(window.windowSize).thenReturn(size)
-      when(window.windowStep).thenReturn(step)
-      when(window.shouldSlide).thenReturn(false)
-      when(group.plus(zero, right)).thenReturn(right, right)
-      when(group.plus(left, right)).thenReturn(plus, Nil: _*)
-      when(taskContext.upstreamMinClock).thenReturn(0L)
-
-      // Time >= checkpointTime
-      // Update right in current window
-      state.setNextCheckpointTime(checkpointTime)
-      state.update(checkpointTime, right)
-
-      verify(window, times(2)).update(0L)
-      state.left shouldBe left
-      state.right shouldBe right
-      state.get shouldBe Some(plus)
-      state.getIntervalStates(start, end) shouldBe
-        TreeMap(Interval(start, start + step) -> left, Interval(start + step, 
end) -> right)
-
-      // Slides window forward
-      when(window.range).thenReturn((start, end), (start + step, end + step))
-      when(window.shouldSlide).thenReturn(true)
-      when(taskContext.upstreamMinClock).thenReturn(checkpointTime)
-      when(group.minus(left, left)).thenReturn(zero, Nil: _*)
-      when(group.plus(zero, right)).thenReturn(right, Nil: _*)
-      when(group.plus(right, right)).thenReturn(plus, Nil: _*)
-      when(group.plus(zero, plus)).thenReturn(plus, Nil: _*)
-
-      state.setNextCheckpointTime(checkpointTime)
-      state.update(end, right)
-
-      verify(window).slideOneStep()
-      verify(window).update(checkpointTime)
-      state.left shouldBe zero
-      state.right shouldBe plus
-      state.get shouldBe Some(plus)
-      state.getIntervalStates(start, end + step) shouldBe
-        TreeMap(
-          Interval(start, start + step) -> left,
-          Interval(start + step, end) -> right,
-          Interval(end, end + step) -> right)
-    }
-  }
-
-  property("WindowState gets interval for timestamp") {
-    forAll(longGen, longGen, longGen, longGen) {
-      (timestamp: TimeStamp, checkpointTime: TimeStamp, windowSize: Long, 
windowStep: Long) =>
-        val windowManager = new Window(windowSize, windowStep)
-        val taskContext = MockUtil.mockTaskContext
-        val group = mock[Group[AnyRef]]
-        val serializer = mock[Serializer[TreeMap[Interval, AnyRef]]]
-
-        val zero = mock[AnyRef]
-        when(group.zero).thenReturn(zero, zero)
-        val state = new WindowState[AnyRef](group, serializer, taskContext, 
windowManager)
-
-        val interval = state.getInterval(timestamp, checkpointTime)
-        intervalSpec(interval, timestamp, checkpointTime, windowSize, 
windowStep)
-
-        val nextTimeStamp = interval.endTime
-        val nextInterval = state.getInterval(nextTimeStamp, checkpointTime)
-        intervalSpec(nextInterval, nextTimeStamp, checkpointTime, windowSize, 
windowStep)
-
-        interval.endTime shouldBe nextInterval.startTime
-    }
-
-    def intervalSpec(interval: Interval, timestamp: TimeStamp,
-        checkpointTime: TimeStamp, windowSize: Long, windowStep: Long): Unit = 
{
-      interval.startTime should be <= interval.endTime
-      timestamp / windowStep * windowStep should (be <= interval.startTime)
-      (timestamp - windowSize) / windowStep * windowStep should (be <= 
interval.startTime)
-      (timestamp / windowStep + 1) * windowStep should (be >= interval.endTime)
-      ((timestamp - windowSize) / windowStep + 1) * windowStep + windowSize 
should
-        (be >= interval.endTime)
-      checkpointTime should (be <= interval.startTime or be >= 
interval.endTime)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/storage/InMemoryAppStoreOnMasterSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/io/gearpump/streaming/storage/InMemoryAppStoreOnMasterSpec.scala
 
b/streaming/src/test/scala/io/gearpump/streaming/storage/InMemoryAppStoreOnMasterSpec.scala
deleted file mode 100644
index 60baa74..0000000
--- 
a/streaming/src/test/scala/io/gearpump/streaming/storage/InMemoryAppStoreOnMasterSpec.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.storage
-
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
-import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec}
-
-import io.gearpump.cluster.{MasterHarness, MiniCluster}
-import io.gearpump.streaming.StreamingTestUtil
-import io.gearpump.util.Constants
-
-class InMemoryAppStoreOnMasterSpec extends WordSpec with Matchers with 
BeforeAndAfterAll {
-  implicit val timeout = Constants.FUTURE_TIMEOUT
-  implicit val dispatcher = MasterHarness.cachedPool
-
-  "InMemoryAppStoreOnMaster" should {
-    "save and return the data properly" in {
-      val appId = 0
-      val miniCluster = new MiniCluster
-      val master = miniCluster.mockMaster
-      StreamingTestUtil.startAppMaster(miniCluster, appId)
-      val store = new InMemoryAppStoreOnMaster(appId, master)
-
-      Thread.sleep(500)
-
-      store.put("String_type", "this is a string")
-      store.put("Int_type", 1024)
-      store.put("Tuple2_type", ("element1", 1024))
-
-      val future1 = store.get("String_type").map { value =>
-        value.asInstanceOf[String] should be("this is a string")
-      }
-      val future2 = store.get("Int_type").map { value => 
value.asInstanceOf[Int] should be(1024) }
-      val future3 = store.get("Tuple2_type").map { value =>
-        value.asInstanceOf[(String, Int)] should be(("element1", 1024))
-      }
-      val future4 = store.get("key").map { value => value.asInstanceOf[Object] 
should be(null) }
-      Await.result(future1, 15.seconds)
-      Await.result(future2, 15.seconds)
-      Await.result(future3, 15.seconds)
-      Await.result(future4, 15.seconds)
-      miniCluster.shutDown
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/task/SubscriberSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/io/gearpump/streaming/task/SubscriberSpec.scala 
b/streaming/src/test/scala/io/gearpump/streaming/task/SubscriberSpec.scala
deleted file mode 100644
index ffb343e..0000000
--- a/streaming/src/test/scala/io/gearpump/streaming/task/SubscriberSpec.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.gearpump.streaming.task
-
-import org.scalatest.{FlatSpec, Matchers}
-
-import io.gearpump.partitioner.{HashPartitioner, Partitioner}
-import io.gearpump.streaming.task.SubscriberSpec.TestTask
-import io.gearpump.streaming.{DAG, ProcessorDescription}
-import io.gearpump.util.Graph
-import io.gearpump.util.Graph._
-
-class SubscriberSpec extends FlatSpec with Matchers {
-  "Subscriber.of" should "return all subscriber for a processor" in {
-
-    val sourceProcessorId = 0
-    val task1 = ProcessorDescription(id = sourceProcessorId, taskClass =
-      classOf[TestTask].getName, parallelism = 1)
-    val task2 = ProcessorDescription(id = 1, taskClass = 
classOf[TestTask].getName, parallelism = 1)
-    val task3 = ProcessorDescription(id = 2, taskClass = 
classOf[TestTask].getName, parallelism = 1)
-    val partitioner = Partitioner[HashPartitioner]
-    val dag = DAG(Graph(task1 ~ partitioner ~> task2, task1 ~ partitioner ~> 
task3,
-      task2 ~ partitioner ~> task3))
-
-    val subscribers = Subscriber.of(sourceProcessorId, dag)
-    assert(subscribers.size == 2)
-
-    assert(subscribers.toSet ==
-      Set(Subscriber(1, partitioner, task2.parallelism, task2.life), 
Subscriber(2, partitioner,
-        task3.parallelism, task3.life)))
-  }
-}
-
-object SubscriberSpec {
-  class TestTask
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/streaming/src/test/scala/io/gearpump/streaming/task/SubscriptionSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/io/gearpump/streaming/task/SubscriptionSpec.scala 
b/streaming/src/test/scala/io/gearpump/streaming/task/SubscriptionSpec.scala
deleted file mode 100644
index 57f4b8c..0000000
--- a/streaming/src/test/scala/io/gearpump/streaming/task/SubscriptionSpec.scala
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.streaming.task
-
-import java.util.Random
-
-import org.mockito.Mockito._
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.{FlatSpec, Matchers}
-
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.partitioner.{HashPartitioner, Partitioner}
-import io.gearpump.streaming.task.SubscriptionSpec.NextTask
-import io.gearpump.streaming.{LifeTime, ProcessorDescription}
-
-class SubscriptionSpec extends FlatSpec with Matchers with MockitoSugar {
-  val appId = 0
-  val executorId = 0
-  val taskId = TaskId(0, 0)
-  val session = new Random().nextInt()
-
-  val downstreamProcessorId = 1
-  val partitioner = Partitioner[HashPartitioner]
-
-  val parallism = 2
-  val downstreamProcessor = ProcessorDescription(downstreamProcessorId, 
classOf[NextTask].getName,
-    parallism)
-  val subscriber = Subscriber(downstreamProcessorId, partitioner, 
downstreamProcessor.parallelism,
-    downstreamProcessor.life)
-
-  private def prepare: (Subscription, ExpressTransport) = {
-    val transport = mock[ExpressTransport]
-    val subscription = new Subscription(appId, executorId, taskId, subscriber, 
session, transport)
-    subscription.start()
-
-    val expectedAckRequest = InitialAckRequest(taskId, session)
-    verify(transport, times(1)).transport(expectedAckRequest, TaskId(1, 0), 
TaskId(1, 1))
-
-    (subscription, transport)
-  }
-
-  it should "not send any more message when its life ends" in {
-    val (subscription, transport) = prepare
-    subscription.changeLife(LifeTime(0, 0))
-    val count = subscription.sendMessage(Message("some"))
-    assert(count == 0)
-  }
-
-  it should "send message and handle ack correctly" in {
-    val (subscription, transport) = prepare
-    val msg1 = new Message("1", timestamp = 70)
-    subscription.sendMessage(msg1)
-
-    verify(transport, times(1)).transport(msg1, TaskId(1, 1))
-    assert(subscription.minClock == 70)
-
-    val msg2 = new Message("0", timestamp = 50)
-    subscription.sendMessage(msg2)
-    verify(transport, times(1)).transport(msg2, TaskId(1, 0))
-
-    // minClock has been set to smaller one
-    assert(subscription.minClock == 50)
-
-    val initialMinClock = subscription.minClock
-
-    // Acks initial AckRequest(0)
-    subscription.receiveAck(Ack(TaskId(1, 1), 0, 0, session))
-    subscription.receiveAck(Ack(TaskId(1, 0), 0, 0, session))
-
-    // Sends 100 messages
-    100 until 200 foreach { clock =>
-      subscription.sendMessage(Message("1", clock))
-      subscription.sendMessage(Message("2", clock))
-    }
-
-    // Ack not received, minClock no change
-    assert(subscription.minClock == initialMinClock)
-
-    subscription.receiveAck(Ack(TaskId(1, 1), 100, 100, session))
-    subscription.receiveAck(Ack(TaskId(1, 0), 100, 100, session))
-
-    // Ack received, minClock changed
-    assert(subscription.minClock > initialMinClock)
-
-    // Expects to receive two ackRequest for two downstream tasks
-    val ackRequestForTask0 = AckRequest(taskId, 200, session)
-    verify(transport, times(1)).transport(ackRequestForTask0, TaskId(1, 0))
-
-    val ackRequestForTask1 = AckRequest(taskId, 200, session)
-    verify(transport, times(1)).transport(ackRequestForTask1, TaskId(1, 1))
-  }
-
-  it should "disallow more message sending if there is no ack back" in {
-    val (subscription, transport) = prepare
-    // send 100 messages
-    0 until (Subscription.MAX_PENDING_MESSAGE_COUNT * 2 + 1) foreach { clock =>
-      subscription.sendMessage(Message(randomMessage, clock))
-    }
-
-    assert(subscription.allowSendingMoreMessages() == false)
-  }
-
-  it should "report minClock as Long.MaxValue when there is no pending 
message" in {
-    val (subscription, transport) = prepare
-    val msg1 = new Message("1", timestamp = 70)
-    subscription.sendMessage(msg1)
-    assert(subscription.minClock == 70)
-    subscription.receiveAck(Ack(TaskId(1, 1), 1, 1, session))
-    assert(subscription.minClock == Long.MaxValue)
-  }
-
-  private def randomMessage: String = new Random().nextInt.toString
-}
-
-object SubscriptionSpec {
-
-  class NextTask(taskContext: TaskContext, conf: UserConfig) extends 
Task(taskContext, conf) {
-
-    override def onStart(startTime: StartTime): Unit = {
-    }
-
-    override def onNext(msg: Message): Unit = {
-    }
-  }
-}
\ No newline at end of file

Reply via email to