Repository: incubator-gearpump
Updated Branches:
  refs/heads/master c1370d9bf -> 24e1a4546


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
index d007e09..ca0135d 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
@@ -25,13 +25,13 @@ import org.apache.gearpump.cluster.{TestUtil, UserConfig}
 import org.apache.gearpump.streaming.Processor
 import org.apache.gearpump.streaming.Processor.DefaultProcessor
 import org.apache.gearpump.streaming.dsl.plan.OpSpec.{AnySink, AnySource, 
AnyTask}
-import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapper, 
FunctionRunner}
+import org.apache.gearpump.streaming.dsl.plan.functions.{DummyRunner, 
FlatMapper, FunctionRunner}
 import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
-import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow
+import org.apache.gearpump.streaming.dsl.window.api.GlobalWindows
+import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, 
WindowRunner}
 import org.apache.gearpump.streaming.sink.DataSink
 import org.apache.gearpump.streaming.source.DataSource
 import org.apache.gearpump.streaming.task.{Task, TaskContext}
-import org.mockito.Matchers._
 import org.mockito.Mockito._
 import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec}
 import org.scalatest.mock.MockitoSugar
@@ -61,16 +61,16 @@ class OpSpec extends WordSpec with Matchers with 
BeforeAndAfterAll with MockitoS
 
   "DataSourceOp" should {
 
-    "chain ChainableOp" in {
+    "chain TransformOp" in {
       val dataSource = new AnySource
       val dataSourceOp = DataSourceOp(dataSource)
-      val chainableOp = mock[ChainableOp[Any, Any]]
+      val transformOp = mock[TransformOp[Any, Any]]
       val fn = mock[FunctionRunner[Any, Any]]
+      when(transformOp.fn).thenReturn(fn)
 
-      val chainedOp = dataSourceOp.chain(chainableOp)
+      val chainedOp = dataSourceOp.chain(transformOp)
 
       chainedOp shouldBe a[DataSourceOp]
-      verify(chainableOp).fn
 
       unchainableOps.foreach { op =>
         intercept[OpChainException] {
@@ -79,13 +79,13 @@ class OpSpec extends WordSpec with Matchers with 
BeforeAndAfterAll with MockitoS
       }
     }
 
-    "get Processor of DataSource" in {
+    "be translated into processor" in {
       val dataSource = new AnySource
       val dataSourceOp = DataSourceOp(dataSource)
-      val processor = dataSourceOp.getProcessor
+      val processor = dataSourceOp.toProcessor
       processor shouldBe a[Processor[_]]
       processor.parallelism shouldBe dataSourceOp.parallelism
-      processor.description shouldBe dataSourceOp.description
+      processor.description shouldBe 
s"${dataSourceOp.description}.globalWindows"
     }
   }
 
@@ -94,7 +94,7 @@ class OpSpec extends WordSpec with Matchers with 
BeforeAndAfterAll with MockitoS
     "not chain any Op" in {
       val dataSink = new AnySink
       val dataSinkOp = DataSinkOp(dataSink)
-      val chainableOp = mock[ChainableOp[Any, Any]]
+      val chainableOp = mock[TransformOp[Any, Any]]
       val ops = chainableOp +: unchainableOps
       ops.foreach { op =>
         intercept[OpChainException] {
@@ -103,10 +103,10 @@ class OpSpec extends WordSpec with Matchers with 
BeforeAndAfterAll with MockitoS
       }
     }
 
-    "get Processor of DataSink" in {
+    "be translated to processor" in {
       val dataSink = new AnySink
       val dataSinkOp = DataSinkOp(dataSink)
-      val processor = dataSinkOp.getProcessor
+      val processor = dataSinkOp.toProcessor
       processor shouldBe a[Processor[_]]
       processor.parallelism shouldBe dataSinkOp.parallelism
       processor.description shouldBe dataSinkOp.description
@@ -117,7 +117,7 @@ class OpSpec extends WordSpec with Matchers with 
BeforeAndAfterAll with MockitoS
 
     "not chain any Op" in {
       val processorOp = new ProcessorOp[AnyTask]
-      val chainableOp = mock[ChainableOp[Any, Any]]
+      val chainableOp = mock[TransformOp[Any, Any]]
       val ops = chainableOp +: unchainableOps
       ops.foreach { op =>
         intercept[OpChainException] {
@@ -126,41 +126,41 @@ class OpSpec extends WordSpec with Matchers with 
BeforeAndAfterAll with MockitoS
       }
     }
 
-    "get Processor" in {
+    "be translated into processor" in {
       val processorOp = new ProcessorOp[AnyTask]
-      val processor = processorOp.getProcessor
+      val processor = processorOp.toProcessor
       processor shouldBe a [DefaultProcessor[_]]
       processor.parallelism shouldBe processorOp.parallelism
       processor.description shouldBe processorOp.description
     }
   }
 
-  "ChainableOp" should {
+  "TransformOp" should {
 
-    "chain ChainableOp" in {
+    "chain TransformOp" in {
       val fn1 = mock[FunctionRunner[Any, Any]]
-      val chainableOp1 = ChainableOp[Any, Any](fn1)
+      val transformOp1 = TransformOp[Any, Any](fn1)
 
       val fn2 = mock[FunctionRunner[Any, Any]]
-      val chainableOp2 = ChainableOp[Any, Any](fn2)
+      val transformOp2 = TransformOp[Any, Any](fn2)
 
-      val chainedOp = chainableOp1.chain(chainableOp2)
+      val chainedOp = transformOp1.chain(transformOp2)
 
-      chainedOp shouldBe a[ChainableOp[_, _]]
+      chainedOp shouldBe a[TransformOp[_, _]]
 
       unchainableOps.foreach { op =>
         intercept[OpChainException] {
-          chainableOp1.chain(op)
+          transformOp1.chain(op)
         }
       }
     }
 
-    "get Processor" in {
+    "be translated to processor" in {
       val fn = mock[FlatMapFunction[Any, Any]]
       val flatMapper = new FlatMapper(fn, "flatMap")
-      val chainableOp = ChainableOp[Any, Any](flatMapper)
+      val transformOp = TransformOp[Any, Any](flatMapper)
 
-      val processor = chainableOp.getProcessor
+      val processor = transformOp.toProcessor
       processor shouldBe a[Processor[_]]
       processor.parallelism shouldBe 1
     }
@@ -168,14 +168,16 @@ class OpSpec extends WordSpec with Matchers with 
BeforeAndAfterAll with MockitoS
 
   "GroupByOp" should {
 
-    "chain ChainableOp" in {
-      val groupBy = mock[GroupAlsoByWindow[Any, Any]]
-      val groupByOp = GroupByOp[Any, Any](groupBy)
-      val fn = mock[FunctionRunner[Any, Any]]
-      val chainableOp = mock[ChainableOp[Any, Any]]
-      when(chainableOp.fn).thenReturn(fn)
+    val groupBy = (any: Any) => any
+    val groupByOp = GroupByOp[Any, Any](groupBy)
+
+    "chain WindowTransformOp" in {
 
-      val chainedOp = groupByOp.chain(chainableOp)
+      val runner = new DefaultWindowRunner[Any, Any](GlobalWindows(), new 
DummyRunner())
+      val windowTransformOp = mock[WindowTransformOp[Any, Any]]
+      when(windowTransformOp.windowRunner).thenReturn(runner)
+
+      val chainedOp = groupByOp.chain(windowTransformOp)
       chainedOp shouldBe a[GroupByOp[_, _]]
 
       unchainableOps.foreach { op =>
@@ -185,25 +187,23 @@ class OpSpec extends WordSpec with Matchers with 
BeforeAndAfterAll with MockitoS
       }
     }
 
-    "delegate to groupByFn on getProcessor" in {
-      val groupBy = mock[GroupAlsoByWindow[Any, Any]]
-      val groupByOp = GroupByOp[Any, Any](groupBy)
-
-      groupByOp.getProcessor
-      verify(groupBy).getProcessor(anyInt, anyString, 
any[UserConfig])(any[ActorSystem])
+    "be translated to processor" in {
+      val processor = groupByOp.toProcessor
+      processor shouldBe a[Processor[_]]
+      processor.parallelism shouldBe 1
     }
   }
 
   "MergeOp" should {
 
-    val mergeOp = MergeOp("merge")
+    val mergeOp = MergeOp()
 
-    "chain ChainableOp" in {
-      val fn = mock[FunctionRunner[Any, Any]]
-      val chainableOp = mock[ChainableOp[Any, Any]]
-      when(chainableOp.fn).thenReturn(fn)
+    "chain WindowTransformOp" in {
+      val runner = mock[WindowRunner[Any, Any]]
+      val windowTransformOp = mock[WindowTransformOp[Any, Any]]
+      when(windowTransformOp.windowRunner).thenReturn(runner)
 
-      val chainedOp = mergeOp.chain(chainableOp)
+      val chainedOp = mergeOp.chain(windowTransformOp)
       chainedOp shouldBe a [MergeOp]
 
       unchainableOps.foreach { op =>
@@ -213,8 +213,8 @@ class OpSpec extends WordSpec with Matchers with 
BeforeAndAfterAll with MockitoS
       }
     }
 
-    "get Processor" in {
-      val processor = mergeOp.getProcessor
+    "be translated to processor" in {
+      val processor = mergeOp.toProcessor
       processor shouldBe a[Processor[_]]
       processor.parallelism shouldBe 1
     }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
index 70abde9..70d21b5 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
@@ -24,16 +24,14 @@ import akka.actor.ActorSystem
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.{TestUtil, UserConfig}
 import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction
-import org.apache.gearpump.streaming.partitioner.CoLocationPartitioner
-import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner
+import org.apache.gearpump.streaming.partitioner.{CoLocationPartitioner, 
GroupByPartitioner}
 import org.apache.gearpump.streaming.dsl.plan.PlannerSpec._
 import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapper, 
FoldRunner}
 import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
-import org.apache.gearpump.streaming.dsl.window.api.CountWindows
-import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow
 import org.apache.gearpump.streaming.sink.DataSink
 import org.apache.gearpump.streaming.source.DataSource
 import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.dsl.window.api.GlobalWindows
 import org.apache.gearpump.streaming.task.{Task, TaskContext}
 import org.apache.gearpump.util.Graph
 import org.scalatest.mock.MockitoSugar
@@ -58,10 +56,11 @@ class PlannerSpec extends FlatSpec with Matchers with 
BeforeAndAfterAll with Moc
   "Planner" should "chain operations" in {
     val graph = Graph.empty[Op, OpEdge]
     val sourceOp = DataSourceOp(new AnySource)
-    val groupBy = GroupAlsoByWindow((any: Any) => any, 
CountWindows.apply[Any](1))
+    val groupBy = (any: Any) => any
     val groupByOp = GroupByOp(groupBy)
-    val flatMapOp = ChainableOp[Any, Any](anyFlatMapper)
-    val reduceOp = ChainableOp[Any, Option[Any]](anyReducer)
+    val windowOp = WindowOp(GlobalWindows())
+    val flatMapOp = TransformOp[Any, Any](anyFlatMapper)
+    val reduceOp = TransformOp[Any, Option[Any]](anyReducer)
     val processorOp = new ProcessorOp[AnyTask]
     val sinkOp = DataSinkOp(new AnySink)
     val directEdge = Direct
@@ -70,8 +69,10 @@ class PlannerSpec extends FlatSpec with Matchers with 
BeforeAndAfterAll with Moc
     graph.addVertex(sourceOp)
     graph.addVertex(groupByOp)
     graph.addEdge(sourceOp, shuffleEdge, groupByOp)
+    graph.addVertex(windowOp)
+    graph.addEdge(groupByOp, directEdge, windowOp)
     graph.addVertex(flatMapOp)
-    graph.addEdge(groupByOp, directEdge, flatMapOp)
+    graph.addEdge(windowOp, directEdge, flatMapOp)
     graph.addVertex(reduceOp)
     graph.addEdge(flatMapOp, directEdge, reduceOp)
     graph.addVertex(processorOp)
@@ -86,9 +87,11 @@ class PlannerSpec extends FlatSpec with Matchers with 
BeforeAndAfterAll with Moc
       .mapVertex(_.description)
 
     plan.vertices.toSet should contain theSameElementsAs
-      Set("source", "groupBy", "processor", "sink")
-    plan.outgoingEdgesOf("source").iterator.next()._2 shouldBe 
a[GroupByPartitioner[_, _]]
-    plan.outgoingEdgesOf("groupBy").iterator.next()._2 shouldBe 
a[CoLocationPartitioner]
+      Set("source.globalWindows", "groupBy.globalWindows.flatMap.reduce", 
"processor", "sink")
+    plan.outgoingEdgesOf("source.globalWindows").iterator.next()._2 shouldBe
+      a[GroupByPartitioner[_, _]]
+    
plan.outgoingEdgesOf("groupBy.globalWindows.flatMap.reduce").iterator.next()._2 
shouldBe
+      a[CoLocationPartitioner]
     plan.outgoingEdgesOf("processor").iterator.next()._2 shouldBe 
a[CoLocationPartitioner]
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala
index f5d7c20..6244224 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala
@@ -19,28 +19,22 @@ package org.apache.gearpump.streaming.dsl.plan.functions
 
 import java.time.Instant
 
-import akka.actor.ActorSystem
 import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.{TestUtil, UserConfig}
+import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.MockUtil
 import org.apache.gearpump.streaming.source.{DataSourceTask, Watermark}
 import org.apache.gearpump.streaming.Constants._
 import org.apache.gearpump.streaming.dsl.api.functions.{FoldFunction, 
ReduceFunction}
 import org.apache.gearpump.streaming.dsl.scalaapi.CollectionDataSource
 import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
-import org.apache.gearpump.streaming.dsl.task.TransformTask.Transform
-import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, TransformTask}
-import org.apache.gearpump.streaming.dsl.window.api.CountWindows
-import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow
-import org.mockito.ArgumentCaptor
+import org.apache.gearpump.streaming.dsl.task.TransformTask
+import org.apache.gearpump.streaming.dsl.window.api.GlobalWindows
+import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, 
WindowRunner}
 import org.mockito.Matchers._
 import org.mockito.Mockito._
 import org.scalatest.{Matchers, WordSpec}
 import org.scalatest.mock.MockitoSugar
 
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
 class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar {
   import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunnerSpec._
 
@@ -216,40 +210,6 @@ class FunctionRunnerSpec extends WordSpec with Matchers 
with MockitoSugar {
     }
   }
 
-  "Emit" should {
-
-    val emitFunction = mock[T => Unit]
-    val emit = new Emit[T](emitFunction)
-
-    "emit input value when processing input value" in {
-      val input = mock[T]
-
-      emit.process(input) shouldBe List.empty[Unit]
-
-      verify(emitFunction).apply(input)
-    }
-
-    "return empty description" in {
-      emit.description shouldBe ""
-    }
-
-    "return None on finish" in {
-      emit.finish() shouldBe List.empty[Unit]
-    }
-
-    "do nothing on setup" in {
-      emit.setup()
-
-      verifyZeroInteractions(emitFunction)
-    }
-
-    "do nothing on teardown" in {
-      emit.teardown()
-
-      verifyZeroInteractions(emitFunction)
-    }
-  }
-
   "Source" should {
     "iterate over input source and apply attached operator" in {
 
@@ -258,7 +218,11 @@ class FunctionRunnerSpec extends WordSpec with Matchers 
with MockitoSugar {
 
       val data = "one two three".split("\\s+")
       val dataSource = new CollectionDataSource[String](data)
-      val conf = UserConfig.empty.withValue(GEARPUMP_STREAMING_SOURCE, 
dataSource)
+      val runner1 = new DefaultWindowRunner[String, String](
+        GlobalWindows(), new DummyRunner[String])
+      val conf = UserConfig.empty
+        .withValue(GEARPUMP_STREAMING_SOURCE, dataSource)
+        .withValue[WindowRunner[String, String]](GEARPUMP_STREAMING_OPERATOR, 
runner1)
 
       // Source with no transformer
       val source = new DataSourceTask[String, String](
@@ -275,8 +239,10 @@ class FunctionRunnerSpec extends WordSpec with Matchers 
with MockitoSugar {
       val anotherTaskContext = MockUtil.mockTaskContext
       val double = new FlatMapper[String, String](FlatMapFunction(
         word => List(word, word)), "double")
+      val runner2 = new DefaultWindowRunner[String, String](
+        GlobalWindows(), double)
       val another = new DataSourceTask(anotherTaskContext,
-        conf.withValue(GEARPUMP_STREAMING_OPERATOR, double))
+        conf.withValue(GEARPUMP_STREAMING_OPERATOR, runner2))
       another.onStart(Instant.EPOCH)
       another.onNext(Message("next"))
       another.onWatermarkProgress(Watermark.MAX)
@@ -287,44 +253,8 @@ class FunctionRunnerSpec extends WordSpec with Matchers 
with MockitoSugar {
     }
   }
 
-  "CountTriggerTask" should {
-    "group input by groupBy Function and " +
-      "apply attached operator for each group" in {
-
-      val data = "1 2  2  3 3  3"
-
-      val concat = new FoldRunner[String, Option[String]](ReduceFunction({ 
(left, right) =>
-        left + right}), "concat")
-
-      implicit val system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
-      val config = UserConfig.empty.withValue[FunctionRunner[String, 
Option[String]]](
-        GEARPUMP_STREAMING_OPERATOR, concat)
-
-      val taskContext = MockUtil.mockTaskContext
-
-      val groupBy = GroupAlsoByWindow((input: String) => input,
-        CountWindows.apply[String](1).accumulating)
-      val task = new CountTriggerTask[String, String](groupBy, taskContext, 
config)
-      task.onStart(Instant.EPOCH)
-
-      val peopleCaptor = ArgumentCaptor.forClass(classOf[Message])
-
-      data.split("\\s+").foreach { word =>
-        task.onNext(Message(word))
-      }
-      verify(taskContext, times(6)).output(peopleCaptor.capture())
-
-      import scala.collection.JavaConverters._
-
-      val values = peopleCaptor.getAllValues.asScala.map(input =>
-        input.value.asInstanceOf[Option[String]].get)
-      assert(values.mkString(",") == "1,2,22,3,33,333")
-      system.terminate()
-      Await.result(system.whenTerminated, Duration.Inf)
-    }
-  }
 
-  "TransformTask" should {
+  "MergeTask" should {
     "accept two stream and apply the attached operator" in {
 
       // Source with transformer
@@ -332,7 +262,7 @@ class FunctionRunnerSpec extends WordSpec with Matchers 
with MockitoSugar {
       val conf = UserConfig.empty
       val double = new FlatMapper[String, String](FlatMapFunction(
         word => List(word, word)), "double")
-      val transform = new Transform[String, String](taskContext, Some(double))
+      val transform = new DefaultWindowRunner[String, String](GlobalWindows(), 
double)
       val task = new TransformTask[String, String](transform, taskContext, 
conf)
       task.onStart(Instant.EPOCH)
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala
index 5b90a3e..c8c8b9f 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala
@@ -61,9 +61,9 @@ class StreamAppSpec extends FlatSpec with Matchers with 
BeforeAndAfterAll with M
     dag.vertices.size shouldBe 2
     dag.vertices.foreach { processor =>
       processor.taskClass shouldBe classOf[DataSourceTask[_, _]].getName
-      if (processor.description == "A") {
+      if (processor.description == "A.globalWindows") {
         processor.parallelism shouldBe 2
-      } else if (processor.description == "B") {
+      } else if (processor.description == "B.globalWindows") {
         processor.parallelism shouldBe 3
       } else {
         fail(s"undefined source ${processor.description}")

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala
index 4c7e209..ef8f932 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala
@@ -22,10 +22,9 @@ import akka.actor._
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.client.ClientContext
 import org.apache.gearpump.cluster.{TestUtil, UserConfig}
-import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner
 import org.apache.gearpump.streaming.dsl.scalaapi.StreamSpec.Join
-import org.apache.gearpump.streaming.dsl.task.{EventTimeTriggerTask, 
TransformTask}
-import org.apache.gearpump.streaming.partitioner.{CoLocationPartitioner, 
HashPartitioner, PartitionerDescription}
+import org.apache.gearpump.streaming.dsl.task.{GroupByTask, TransformTask}
+import org.apache.gearpump.streaming.partitioner.{CoLocationPartitioner, 
GroupByPartitioner, HashPartitioner, PartitionerDescription}
 import org.apache.gearpump.streaming.source.DataSourceTask
 import org.apache.gearpump.streaming.task.{Task, TaskContext}
 import org.apache.gearpump.streaming.{ProcessorDescription, StreamApplication}
@@ -92,7 +91,7 @@ class StreamSpec extends FlatSpec with Matchers with 
BeforeAndAfterAll with Mock
 
   private def getExpectedDagTopology: Graph[String, String] = {
     val source = classOf[DataSourceTask[_, _]].getName
-    val group = classOf[EventTimeTriggerTask[_, _]].getName
+    val group = classOf[GroupByTask[_, _, _]].getName
     val merge = classOf[TransformTask[_, _]].getName
     val join = classOf[Join].getName
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTaskSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTaskSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTaskSpec.scala
deleted file mode 100644
index 1a4958a..0000000
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTaskSpec.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.dsl.task
-
-import java.time.Instant
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.dsl.window.api.CountWindows
-import org.apache.gearpump.streaming.dsl.window.impl.{GroupAlsoByWindow, 
WindowRunner}
-import org.mockito.Mockito._
-import org.scalacheck.Gen
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-class CountTriggerTaskSpec extends PropSpec with PropertyChecks
-  with Matchers with MockitoSugar {
-
-  property("CountTriggerTask should trigger output by number of messages in a 
window") {
-
-    implicit val system = MockUtil.system
-
-    val numGen = Gen.chooseNum[Int](1, 1000)
-
-    forAll(numGen, numGen) { (windowSize: Int, msgNum: Int) =>
-
-      val groupBy = mock[GroupAlsoByWindow[Any, Any]]
-      val window = CountWindows.apply[Any](windowSize)
-      when(groupBy.window).thenReturn(window)
-      val windowRunner = mock[WindowRunner]
-      val userConfig = UserConfig.empty
-
-      val task = new CountTriggerTask[Any, Any](groupBy, windowRunner,
-        MockUtil.mockTaskContext, userConfig)
-      val message = mock[Message]
-
-      for (i <- 1 to msgNum) {
-        task.onNext(message)
-      }
-      verify(windowRunner, times(msgNum)).process(message)
-      verify(windowRunner, times(msgNum / 
windowSize)).trigger(Instant.ofEpochMilli(windowSize))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala
deleted file mode 100644
index 9414c76..0000000
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.dsl.task
-
-import java.time.{Duration, Instant}
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.dsl.window.api.{EventTimeTrigger, 
SlidingWindows}
-import org.apache.gearpump.streaming.dsl.window.impl.{GroupAlsoByWindow, 
WindowRunner}
-import org.mockito.Matchers._
-import org.mockito.Mockito._
-import org.scalacheck.Gen
-import org.scalatest.{Matchers, PropSpec}
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.prop.PropertyChecks
-
-class EventTimeTriggerTaskSpec extends PropSpec with PropertyChecks
-  with Matchers with MockitoSugar {
-
-  property("EventTimeTriggerTask should trigger on watermark") {
-    val longGen = Gen.chooseNum[Long](1L, 1000L)
-    val windowSizeGen = longGen
-    val windowStepGen = longGen
-    val watermarkGen = longGen.map(Instant.ofEpochMilli)
-
-    forAll(windowSizeGen, windowStepGen, watermarkGen) {
-      (windowSize: Long, windowStep: Long, watermark: Instant) =>
-
-        val window = SlidingWindows.apply[Any](Duration.ofMillis(windowSize),
-          Duration.ofMillis(windowStep)).triggering(EventTimeTrigger)
-        val groupBy = mock[GroupAlsoByWindow[Any, Any]]
-        val windowRunner = mock[WindowRunner]
-        val context = MockUtil.mockTaskContext
-        val config = UserConfig.empty
-
-        when(groupBy.window).thenReturn(window)
-
-        val task = new EventTimeTriggerTask[Any, Any](groupBy, windowRunner, 
context, config)
-
-        val message = mock[Message]
-        task.onNext(message)
-        verify(windowRunner).process(message)
-
-        task.onWatermarkProgress(watermark)
-        verify(windowRunner).trigger(any[Instant])
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/GroupByTaskSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/GroupByTaskSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/GroupByTaskSpec.scala
new file mode 100644
index 0000000..0f87a1c
--- /dev/null
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/GroupByTaskSpec.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.task
+
+import java.time.Instant
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.dsl.plan.functions.DummyRunner
+import org.apache.gearpump.streaming.dsl.window.api.GlobalWindows
+import org.apache.gearpump.streaming.{Constants, MockUtil}
+import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, 
WindowRunner}
+import org.apache.gearpump.streaming.source.Watermark
+import org.mockito.Mockito._
+import org.scalacheck.Gen
+import org.scalatest.{Matchers, PropSpec}
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.prop.PropertyChecks
+
+class GroupByTaskSpec extends PropSpec with PropertyChecks
+  with Matchers with MockitoSugar {
+
+  property("GroupByTask should trigger on watermark") {
+    val longGen = Gen.chooseNum[Long](1L, 1000L).map(Instant.ofEpochMilli)
+
+    forAll(longGen) { (time: Instant) =>
+      val groupBy = mock[Any => Int]
+      val windowRunner = new DefaultWindowRunner[Any, Any](GlobalWindows(), 
new DummyRunner[Any])
+      val context = MockUtil.mockTaskContext
+      val config = UserConfig.empty
+        .withValue(
+          Constants.GEARPUMP_STREAMING_OPERATOR, windowRunner)(MockUtil.system)
+
+      val task = new GroupByTask[Any, Int, Any](groupBy, context, config)
+      val value = time
+      val message = Message(value, time)
+      when(groupBy(time)).thenReturn(0)
+      task.onNext(message)
+
+      task.onWatermarkProgress(Watermark.MAX)
+      verify(context).output(message)
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala
deleted file mode 100644
index cbc9e0c..0000000
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.dsl.task
-
-import java.time.{Duration, Instant}
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.MockUtil
-import 
org.apache.gearpump.streaming.dsl.task.ProcessingTimeTriggerTask.Triggering
-import org.apache.gearpump.streaming.dsl.window.api.{ProcessingTimeTrigger, 
SlidingWindows}
-import org.apache.gearpump.streaming.dsl.window.impl.{GroupAlsoByWindow, 
WindowRunner}
-import org.mockito.Matchers._
-import org.mockito.Mockito._
-import org.scalacheck.Gen
-import org.scalatest.{Matchers, PropSpec}
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.prop.PropertyChecks
-
-class ProcessingTimeTriggerTaskSpec extends PropSpec with PropertyChecks
-  with Matchers with MockitoSugar {
-
-  property("ProcessingTimeTriggerTask should trigger on system time interval") 
{
-    val longGen = Gen.chooseNum[Long](1L, 1000L)
-    val windowSizeGen = longGen
-    val windowStepGen = longGen
-    val startTimeGen = longGen.map(Instant.ofEpochMilli)
-
-    forAll(windowSizeGen, windowStepGen, startTimeGen) {
-      (windowSize: Long, windowStep: Long, startTime: Instant) =>
-
-        val window = SlidingWindows.apply[Any](Duration.ofMillis(windowSize),
-          Duration.ofMillis(windowStep)).triggering(ProcessingTimeTrigger)
-        val groupBy = mock[GroupAlsoByWindow[Any, Any]]
-        val windowRunner = mock[WindowRunner]
-        val context = MockUtil.mockTaskContext
-        val config = UserConfig.empty
-
-        when(groupBy.window).thenReturn(window)
-
-        val task = new ProcessingTimeTriggerTask[Any, Any](groupBy, 
windowRunner, context, config)
-
-        task.onStart(startTime)
-
-        val message = mock[Message]
-        task.onNext(message)
-        verify(windowRunner).process(message)
-
-        task.receiveUnManagedMessage(Triggering)
-        verify(windowRunner).trigger(any[Instant])
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
index 481925a..6b66f01 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
@@ -22,11 +22,9 @@ import java.time.Instant
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner
-import org.apache.gearpump.streaming.dsl.task.TransformTask.Transform
-import org.apache.gearpump.streaming.source.Watermark
+import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, 
WindowRunner}
 import org.mockito.{Matchers => MockitoMatchers}
-import org.mockito.Mockito.{times, verify, when}
+import org.mockito.Mockito.{verify, when}
 import org.scalacheck.Gen
 import org.scalatest.{Matchers, PropSpec}
 import org.scalatest.mock.MockitoSugar
@@ -34,43 +32,25 @@ import org.scalatest.prop.PropertyChecks
 
 class TransformTaskSpec extends PropSpec with PropertyChecks with Matchers 
with MockitoSugar {
 
-  private val timeGen = Gen.chooseNum[Long](Watermark.MIN.toEpochMilli,
-    Watermark.MAX.toEpochMilli - 1).map(Instant.ofEpochMilli)
-  private val runnerGen = {
-    val runner = mock[FunctionRunner[Any, Any]]
-    Gen.oneOf(Some(runner), None)
-  }
-
-  property("TransformTask should emit on watermark") {
-    val msgGen = for {
-      str <- Gen.alphaStr.suchThat(!_.isEmpty)
-      t <- timeGen
-    } yield Message(s"$str:$t", t)
-    val msgsGen = Gen.listOfN(10, msgGen)
-
-    forAll(runnerGen, msgsGen) {
-      (runner: Option[FunctionRunner[Any, Any]], msgs: List[Message]) =>
-        val taskContext = MockUtil.mockTaskContext
-        implicit val system = MockUtil.system
-        val config = UserConfig.empty
-        val transform = new Transform[Any, Any](taskContext, runner)
-        val task = new TransformTask[Any, Any](transform, taskContext, config)
-
-        msgs.foreach(task.onNext)
-
-        runner.foreach(r => when(r.finish()).thenReturn(None))
-        task.onWatermarkProgress(Watermark.MIN)
-        verify(taskContext, times(0)).output(MockitoMatchers.any[Message])
-
-        msgs.foreach { msg =>
-          runner.foreach(r =>
-            when(r.process(msg.value)).thenReturn(Some(msg.value)))
-        }
-        task.onWatermarkProgress(Watermark.MAX)
-
-        msgs.foreach { msg =>
-          verify(taskContext).output(MockitoMatchers.eq(msg))
-        }
+  property("MergeTask should trigger on watermark") {
+    val longGen = Gen.chooseNum[Long](1L, 1000L)
+    val watermarkGen = longGen.map(Instant.ofEpochMilli)
+
+    forAll(watermarkGen) { (watermark: Instant) =>
+      val windowRunner = mock[WindowRunner[Any, Any]]
+      val context = MockUtil.mockTaskContext
+      val config = UserConfig.empty
+      val task = new TransformTask[Any, Any](windowRunner, context, config)
+      val time = watermark.minusMillis(1L)
+      val value: Any = time
+      val message = Message(value, time)
+
+      task.onNext(message)
+      verify(windowRunner).process(TimestampedValue(value, time))
+
+      
when(windowRunner.trigger(watermark)).thenReturn(Some(TimestampedValue(value, 
time)))
+      task.onWatermarkProgress(watermark)
+      verify(context).output(message)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala
index fbbee3e..98e9919 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala
@@ -21,18 +21,15 @@ package org.apache.gearpump.streaming.dsl.window.impl
 import java.time.{Duration, Instant}
 
 import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction
-import org.apache.gearpump.streaming.{Constants, MockUtil}
+import org.apache.gearpump.streaming.MockUtil
 import org.apache.gearpump.streaming.dsl.plan.functions.FoldRunner
 import org.apache.gearpump.streaming.dsl.window.api.SessionWindows
 import org.apache.gearpump.streaming.source.Watermark
-import org.mockito.Mockito.{times, verify}
 import org.scalatest.{Matchers, PropSpec}
 import org.scalatest.mock.MockitoSugar
 import org.scalatest.prop.PropertyChecks
 
-
 class DefaultWindowRunnerSpec extends PropSpec with PropertyChecks
   with Matchers with MockitoSugar {
 
@@ -40,34 +37,25 @@ class DefaultWindowRunnerSpec extends PropSpec with 
PropertyChecks
 
     val data = List(
       Message(("foo", 1L), Instant.ofEpochMilli(1L)),
-      Message(("bar", 1L), Instant.ofEpochMilli(8L)),
       Message(("foo", 1L), Instant.ofEpochMilli(15L)),
-      Message(("bar", 1L), Instant.ofEpochMilli(17L)),
-      Message(("bar", 1L), Instant.ofEpochMilli(18L)),
       Message(("foo", 1L), Instant.ofEpochMilli(25L)),
-      Message(("foo", 1L), Instant.ofEpochMilli(26L)),
-      Message(("bar", 1L), Instant.ofEpochMilli(30L)),
-      Message(("bar", 1L), Instant.ofEpochMilli(31L))
+      Message(("foo", 1L), Instant.ofEpochMilli(26L))
     )
 
     type KV = (String, Long)
-    val taskContext = MockUtil.mockTaskContext
     implicit val system = MockUtil.system
     val reduce = ReduceFunction[KV]((kv1, kv2) => (kv1._1, kv1._2 + kv2._2))
-    val operator = new FoldRunner(reduce, "reduce")
-    val userConfig = UserConfig.empty.withValue(
-      Constants.GEARPUMP_STREAMING_OPERATOR, operator)
-    val windows = SessionWindows.apply[KV](Duration.ofMillis(4L))
-    val groupBy = GroupAlsoByWindow[KV, String](_._1, windows)
-    val windowRunner = new DefaultWindowRunner(taskContext, userConfig, 
groupBy)
-
-    data.foreach(windowRunner.process)
-    windowRunner.trigger(Watermark.MAX)
-
-    verify(taskContext, times(2)).output(Message(Some(("foo", 1)), 
Watermark.MAX))
-    verify(taskContext).output(Message(Some(("foo", 2)), Watermark.MAX))
-    verify(taskContext, times(2)).output(Message(Some(("bar", 2)), 
Watermark.MAX))
-    verify(taskContext).output(Message(Some(("bar", 1)), Watermark.MAX))
+    val windows = SessionWindows.apply(Duration.ofMillis(4L))
+    val windowRunner = new DefaultWindowRunner[KV, Option[KV]](windows,
+      new FoldRunner[KV, Option[KV]](reduce, "reduce"))
+
+    data.foreach(m => 
windowRunner.process(TimestampedValue(m.value.asInstanceOf[KV], m.timestamp)))
+    windowRunner.trigger(Watermark.MAX).toList shouldBe
+      List(
+        TimestampedValue(Some(("foo", 1)), Instant.ofEpochMilli(4)),
+        TimestampedValue(Some(("foo", 1)), Instant.ofEpochMilli(18)),
+        TimestampedValue(Some(("foo", 2)), Instant.ofEpochMilli(29))
+      )
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/partitioner/GroupByPartitionerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/partitioner/GroupByPartitionerSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/partitioner/GroupByPartitionerSpec.scala
new file mode 100644
index 0000000..038f91d
--- /dev/null
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/partitioner/GroupByPartitionerSpec.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.partitioner
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.streaming.partitioner.GroupByPartitionerSpec.People
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
+class GroupByPartitionerSpec extends FlatSpec with Matchers with 
BeforeAndAfterAll {
+
+  it should "group by message payload and window" in {
+    val mark = People("Mark", "male")
+    val tom = People("Tom", "male")
+    val michelle = People("Michelle", "female")
+
+    val partitionNum = 10
+
+    val groupBy = new GroupByPartitioner[People, String](_.gender)
+    groupBy.getPartition(Message(mark, 1L), partitionNum) shouldBe
+      groupBy.getPartition(Message(tom, 2L), partitionNum)
+
+    groupBy.getPartition(Message(mark, 2L), partitionNum) should not be
+      groupBy.getPartition(Message(michelle, 3L), partitionNum)
+  }
+}
+
+object GroupByPartitionerSpec {
+  case class People(name: String, gender: String)
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
index 7651251..f7a3a63 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
@@ -23,8 +23,7 @@ import java.time.Instant
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner
-import org.apache.gearpump.streaming.dsl.task.TransformTask.Transform
+import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, 
WindowRunner}
 import org.mockito.Mockito._
 import org.scalacheck.Gen
 import org.scalatest.mock.MockitoSugar
@@ -33,21 +32,16 @@ import org.scalatest.prop.PropertyChecks
 
 class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers 
with MockitoSugar {
 
-  private val runnerGen = {
-    val runner = mock[FunctionRunner[Any, Any]]
-    Gen.oneOf(Some(runner), None)
-  }
-
   property("DataSourceTask should setup data source") {
-    forAll(runnerGen, Gen.chooseNum[Long](0L, 
1000L).map(Instant.ofEpochMilli)) {
-      (runner: Option[FunctionRunner[Any, Any]], startTime: Instant) =>
+    forAll(Gen.chooseNum[Long](0L, 1000L).map(Instant.ofEpochMilli)) {
+      (startTime: Instant) =>
       val taskContext = MockUtil.mockTaskContext
       implicit val system = MockUtil.system
       val dataSource = mock[DataSource]
       val config = UserConfig.empty
         .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1)
-      val transform = new Transform[Any, Any](taskContext, runner)
-      val sourceTask = new DataSourceTask[Any, Any](taskContext, config, 
dataSource, transform)
+        val runner = mock[WindowRunner[Any, Any]]
+      val sourceTask = new DataSourceTask[Any, Any](dataSource, runner, 
taskContext, config)
 
       sourceTask.onStart(startTime)
 
@@ -56,21 +50,20 @@ class DataSourceTaskSpec extends PropSpec with 
PropertyChecks with Matchers with
   }
 
   property("DataSourceTask should read from DataSource and transform inputs") {
-    forAll(runnerGen, Gen.alphaStr, Gen.chooseNum[Long](0L, 
1000L).map(Instant.ofEpochMilli)) {
-      (runner: Option[FunctionRunner[Any, Any]], str: String, timestamp: 
Instant) =>
+    forAll(Gen.alphaStr, Gen.chooseNum[Long](0L, 
1000L).map(Instant.ofEpochMilli)) {
+      (str: String, timestamp: Instant) =>
         val taskContext = MockUtil.mockTaskContext
         implicit val system = MockUtil.system
         val dataSource = mock[DataSource]
         val config = UserConfig.empty
           .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1)
-        val transform = new Transform[Any, Any](taskContext, runner)
-        val sourceTask = new DataSourceTask[Any, Any](taskContext, config, 
dataSource, transform)
+        val runner = mock[WindowRunner[Any, Any]]
+        val sourceTask = new DataSourceTask[Any, Any](dataSource, runner, 
taskContext, config)
         val msg = Message(str, timestamp)
         when(dataSource.read()).thenReturn(msg)
-        runner.foreach(r => {
-          when(r.process(str)).thenReturn(Some(str))
-          when(r.finish()).thenReturn(None)
-        })
+
+        when(runner.trigger(Watermark.MAX)).thenReturn(
+          Some(TimestampedValue(str.asInstanceOf[Any], timestamp)))
 
         sourceTask.onNext(Message("next"))
         sourceTask.onWatermarkProgress(Watermark.MAX)
@@ -80,18 +73,16 @@ class DataSourceTaskSpec extends PropSpec with 
PropertyChecks with Matchers with
   }
 
   property("DataSourceTask should teardown DataSource") {
-    forAll(runnerGen) { (runner: Option[FunctionRunner[Any, Any]]) =>
-      val taskContext = MockUtil.mockTaskContext
-      implicit val system = MockUtil.system
-      val dataSource = mock[DataSource]
-      val config = UserConfig.empty
-        .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1)
-      val transform = new Transform[Any, Any](taskContext, runner)
-      val sourceTask = new DataSourceTask[Any, Any](taskContext, config, 
dataSource, transform)
+    val taskContext = MockUtil.mockTaskContext
+    implicit val system = MockUtil.system
+    val dataSource = mock[DataSource]
+    val config = UserConfig.empty
+      .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1)
+    val runner = mock[WindowRunner[Any, Any]]
+    val sourceTask = new DataSourceTask[Any, Any](dataSource, runner, 
taskContext, config)
 
-      sourceTask.onStop()
+    sourceTask.onStop()
 
-      verify(dataSource).close()
-    }
+    verify(dataSource).close()
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
index fb0beaa..65cb17a 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
@@ -24,9 +24,10 @@ import java.util.Random
 import org.mockito.Mockito._
 import org.scalatest.mock.MockitoSugar
 import org.scalatest.{FlatSpec, Matchers}
-import org.apache.gearpump.{MAX_TIME_MILLIS, Message}
+import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner}
+import org.apache.gearpump.streaming.source.Watermark
 import org.apache.gearpump.streaming.task.SubscriptionSpec.NextTask
 import org.apache.gearpump.streaming.{LifeTime, ProcessorDescription}
 
@@ -115,7 +116,7 @@ class SubscriptionSpec extends FlatSpec with Matchers with 
MockitoSugar {
       subscription.sendMessage(Message(randomMessage, clock))
     }
 
-    assert(subscription.allowSendingMoreMessages() == false)
+    assert(!subscription.allowSendingMoreMessages())
   }
 
   it should "report minClock as Long.MaxValue when there is no pending 
message" in {
@@ -124,7 +125,7 @@ class SubscriptionSpec extends FlatSpec with Matchers with 
MockitoSugar {
     subscription.sendMessage(msg1)
     assert(subscription.minClock == 70)
     subscription.receiveAck(Ack(TaskId(1, 1), 1, 1, session))
-    assert(subscription.minClock == MAX_TIME_MILLIS)
+    assert(subscription.minClock == Watermark.MAX.toEpochMilli)
   }
 
   private def randomMessage: String = new Random().nextInt.toString

Reply via email to