Repository: incubator-gearpump
Updated Branches:
  refs/heads/master 198366360 -> 2d13b9cf8


[GEARPUMP-367] Don't use windowing unnecessarily

Author: manuzhang <owenzhang1...@gmail.com>

Closes #226 from manuzhang/simplify_dsl.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/2d13b9cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/2d13b9cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/2d13b9cf

Branch: refs/heads/master
Commit: 2d13b9cf883ee59b97751d992c2b52dc4068b16c
Parents: 1983663
Author: manuzhang <owenzhang1...@gmail.com>
Authored: Tue Apr 10 22:32:12 2018 +0800
Committer: manuzhang <owenzhang1...@gmail.com>
Committed: Tue Apr 10 22:32:23 2018 +0800

----------------------------------------------------------------------
 .../apache/gearpump/streaming/dsl/package.scala |   2 +-
 .../apache/gearpump/streaming/dsl/plan/OP.scala |  83 +++++--
 .../dsl/plan/functions/FunctionRunner.scala     |   8 +-
 .../streaming/dsl/task/GroupByTask.scala        |  29 ++-
 .../streaming/dsl/task/TransformTask.scala      |  18 +-
 .../dsl/window/impl/StreamingOperator.scala     | 246 +++++++++++++++++++
 .../dsl/window/impl/WindowRunner.scala          | 179 --------------
 .../streaming/source/DataSourceProcessor.scala  |  22 +-
 .../streaming/source/DataSourceTask.scala       |  19 +-
 .../gearpump/streaming/task/TaskUtil.scala      |   4 +-
 .../gearpump/streaming/dsl/plan/OpSpec.scala    |  14 +-
 .../streaming/dsl/plan/PlannerSpec.scala        |   4 +-
 .../dsl/plan/functions/FunctionRunnerSpec.scala |  10 +-
 .../streaming/dsl/scalaapi/StreamAppSpec.scala  |   4 +-
 .../streaming/dsl/task/GroupByTaskSpec.scala    |   4 +-
 .../streaming/dsl/task/TransformTaskSpec.scala  |   6 +-
 .../window/impl/DefaultWindowRunnerSpec.scala   |   4 +-
 .../streaming/source/DataSourceTaskSpec.scala   |  18 +-
 18 files changed, 391 insertions(+), 283 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2d13b9cf/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/package.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/package.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/package.scala
index 6d43f16..2d3d94b 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/package.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/package.scala
@@ -36,7 +36,7 @@ package org.apache.gearpump.streaming
  *     * [[org.apache.gearpump.streaming.dsl.task.GroupByTask]] to execute Ops 
followed by [[org.apache.gearpump.streaming.dsl.plan.GroupByOp]]
  *     * [[org.apache.gearpump.streaming.dsl.task.TransformTask]] to execute 
all other Ops.
  *
- *     All but [[org.apache.gearpump.streaming.sink.DataSinkTask]] delegates 
execution to [[org.apache.gearpump.streaming.dsl.window.impl.WindowRunner]], 
which internally
+ *     All but [[org.apache.gearpump.streaming.sink.DataSinkTask]] delegates 
execution to 
[[org.apache.gearpump.streaming.dsl.window.impl.StreamingOperator]], which 
internally
  *     runs a chain of 
[[org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner]] grouped by 
windows. Window assignments are either explicitly defined with
  *     [[org.apache.gearpump.streaming.dsl.window.api.Windows]] API or 
implicitly in [[org.apache.gearpump.streaming.dsl.window.api.GlobalWindows]]. 
UDFs are eventually
  *     executed by 
[[org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner]].

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2d13b9cf/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
index c37ced6..4a71b08 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
@@ -21,12 +21,11 @@ package org.apache.gearpump.streaming.dsl.plan
 import akka.actor.ActorSystem
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.Processor.DefaultProcessor
-import org.apache.gearpump.streaming.dsl.plan.functions.{AndThen, DummyRunner, 
FunctionRunner}
-import org.apache.gearpump.streaming.dsl.window.impl.{AndThen => 
WindowRunnerAT}
+import org.apache.gearpump.streaming.dsl.plan.functions.{AndThen, DummyRunner, 
FlatMapper, FunctionRunner}
+import org.apache.gearpump.streaming.dsl.window.impl.{AndThenOperator, 
FlatMapOperator, StreamingOperator, WindowOperator}
 import org.apache.gearpump.streaming.{Constants, Processor}
 import org.apache.gearpump.streaming.dsl.task.{GroupByTask, TransformTask}
 import org.apache.gearpump.streaming.dsl.window.api.{GlobalWindows, Windows}
-import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, 
WindowRunner}
 import org.apache.gearpump.streaming.sink.{DataSink, DataSinkProcessor}
 import org.apache.gearpump.streaming.source.{DataSource, DataSourceTask}
 import org.apache.gearpump.streaming.task.Task
@@ -69,6 +68,16 @@ object Op {
     }
   }
 
+  def isFlatMapper(runner: FunctionRunner[Any, Any]): Boolean = {
+    runner match {
+      case fm: FlatMapper[Any, Any] =>
+        true
+      case at: AndThen[Any, Any, Any] =>
+        isFlatMapper(at.first) && isFlatMapper(at.second)
+      case _ =>
+        false
+    }
+  }
 }
 
 /**
@@ -134,39 +143,59 @@ case class DataSourceOp(
     dataSource: DataSource,
     parallelism: Int = 1,
     description: String = "source",
-    userConfig: UserConfig = UserConfig.empty)
+    userConfig: UserConfig = UserConfig.empty,
+    operator: Option[StreamingOperator[Any, Any]] = None)
   extends Op {
 
   override def chain(other: Op)(implicit system: ActorSystem): Op = {
     other match {
-      case op: WindowTransformOp[_, _] =>
+      case op: WindowTransformOp[Any, Any] =>
+        val chainedRunner =
+          operator.map(AndThenOperator(_, op.operator)).getOrElse(op.operator)
         DataSourceOp(
           dataSource,
           parallelism,
           Op.concatenate(description, op.description),
           
Op.concatenate(userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR,
-            op.windowRunner),
-            op.userConfig))
-      case op: TransformOp[_, _] =>
-        chain(
-          WindowOp(GlobalWindows()).chain(op))
+            chainedRunner),
+            op.userConfig),
+        Some(chainedRunner))
+      case op: TransformOp[Any, Any] =>
+        val runner = op.runner
+        if (Op.isFlatMapper(runner)) {
+          val fm = new FlatMapOperator[Any, Any](runner)
+          val chainedRunner =
+            operator.map(AndThenOperator(_, fm)).getOrElse(fm)
+          DataSourceOp(
+            dataSource,
+            parallelism,
+            Op.concatenate(description, op.description),
+            
Op.concatenate(userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR,
+              chainedRunner),
+              op.userConfig),
+            Some(chainedRunner)
+          )
+        } else {
+          chain(
+            WindowOp(GlobalWindows()).chain(op))
+        }
       case op: WindowOp =>
         chain(
           op.chain(TransformOp(new DummyRunner[Any]())))
       case op: TransformWindowTransformOp[_, _, _] =>
-        chain(
-          WindowOp(GlobalWindows()).chain(op.transformOp)
-            .chain(op.windowTransformOp))
+        chain(op.transformOp).chain(op.windowTransformOp)
       case _ =>
         throw new OpChainException(this, other)
     }
   }
 
   override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] 
= {
-    Op.withGlobalWindowsDummyRunner(this, userConfig,
+    if (userConfig.getValue(Constants.GEARPUMP_STREAMING_OPERATOR).isEmpty) {
+      chain(TransformOp(new DummyRunner[Any])).toProcessor
+    } else {
       Processor[DataSourceTask[Any, Any]](parallelism, description,
         userConfig.withValue(Constants.GEARPUMP_STREAMING_SOURCE, dataSource))
-    )
+    }
   }
 }
 
@@ -195,10 +224,10 @@ case class DataSinkOp(
  * to another Op to be executed
  */
 case class TransformOp[IN, OUT](
-    fn: FunctionRunner[IN, OUT],
+    runner: FunctionRunner[IN, OUT],
     userConfig: UserConfig = UserConfig.empty) extends Op {
 
-  override def description: String = fn.description
+  override def description: String = runner.description
 
   override def chain(other: Op)(implicit system: ActorSystem): Op = {
     other match {
@@ -208,16 +237,16 @@ case class TransformOp[IN, OUT](
         // => ChainableOp(f1).chain(ChainableOp(f2)).chain(ChainableOp(f3))
         // => AndThen(AndThen(f1, f2), f3)
         TransformOp(
-          AndThen(fn, op.fn),
+          AndThen(runner, op.runner),
           Op.concatenate(userConfig, op.userConfig))
       case op: WindowOp =>
         TransformWindowTransformOp(this,
-          WindowTransformOp(new DefaultWindowRunner[OUT, OUT](
+          WindowTransformOp(new WindowOperator[OUT, OUT](
             op.windows, new DummyRunner[OUT]
           ), op.description, op.userConfig))
       case op: TransformWindowTransformOp[OUT, _, _] =>
         TransformWindowTransformOp(TransformOp(
-          AndThen(fn, op.transformOp.fn),
+          AndThen(runner, op.transformOp.runner),
           Op.concatenate(userConfig, op.transformOp.userConfig)
         ), op.windowTransformOp)
       case _ =>
@@ -244,13 +273,13 @@ case class WindowOp(
   override def chain(other: Op)(implicit system: ActorSystem): Op = {
     other match {
       case op: TransformOp[_, _] =>
-        WindowTransformOp(new DefaultWindowRunner(windows, op.fn),
+        WindowTransformOp(new WindowOperator(windows, op.runner),
           Op.concatenate(description, op.description),
           Op.concatenate(userConfig, op.userConfig))
       case op: WindowOp =>
         chain(TransformOp(new 
DummyRunner[Any])).chain(op.chain(TransformOp(new DummyRunner[Any])))
       case op: TransformWindowTransformOp[_, _, _] =>
-        WindowTransformOp(new DefaultWindowRunner(windows, op.transformOp.fn),
+        WindowTransformOp(new WindowOperator(windows, op.transformOp.runner),
           Op.concatenate(description, op.transformOp.description),
           Op.concatenate(userConfig, 
op.transformOp.userConfig)).chain(op.windowTransformOp)
       case _ =>
@@ -290,7 +319,7 @@ case class GroupByOp[IN, GROUP] private(
           Op.concatenate(description, op.description),
           Op.concatenate(
             userConfig
-              .withValue(Constants.GEARPUMP_STREAMING_OPERATOR, 
op.windowRunner),
+              .withValue(Constants.GEARPUMP_STREAMING_OPERATOR, op.operator),
             userConfig))
       case op: WindowOp =>
         chain(op.chain(TransformOp(new DummyRunner[Any]())))
@@ -329,7 +358,7 @@ case class MergeOp(
           parallelism,
           description,
           
Op.concatenate(userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR,
-            op.windowRunner),
+            op.operator),
             op.userConfig))
       case op: WindowOp =>
         chain(op.chain(TransformOp(new DummyRunner[Any]())))
@@ -352,7 +381,7 @@ case class MergeOp(
  * it will be translated to a 
[[org.apache.gearpump.streaming.dsl.task.TransformTask]].
  */
 private case class WindowTransformOp[IN, OUT](
-    windowRunner: WindowRunner[IN, OUT],
+    operator: StreamingOperator[IN, OUT],
     description: String,
     userConfig: UserConfig) extends Op {
 
@@ -360,7 +389,7 @@ private case class WindowTransformOp[IN, OUT](
     other match {
       case op: WindowTransformOp[OUT, _] =>
         WindowTransformOp(
-          WindowRunnerAT(windowRunner, op.windowRunner),
+          AndThenOperator(operator, op.operator),
           Op.concatenate(description, op.description),
           Op.concatenate(userConfig, op.userConfig)
         )
@@ -372,7 +401,7 @@ private case class WindowTransformOp[IN, OUT](
   override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] 
= {
     // TODO: this should be chained to DataSourceOp / GroupByOp / MergeOp
     Processor[TransformTask[Any, Any]](1, description, userConfig.withValue(
-      Constants.GEARPUMP_STREAMING_OPERATOR, windowRunner))
+      Constants.GEARPUMP_STREAMING_OPERATOR, operator))
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2d13b9cf/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala
index 2c11238..c638257 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala
@@ -39,6 +39,7 @@ sealed trait FunctionRunner[IN, OUT] extends 
java.io.Serializable {
   def description: String
 }
 
+
 case class AndThen[IN, MIDDLE, OUT](first: FunctionRunner[IN, MIDDLE],
     second: FunctionRunner[MIDDLE, OUT])
   extends FunctionRunner[IN, OUT] {
@@ -114,10 +115,5 @@ class FoldRunner[T, A](fn: FoldFunction[T, A], val 
description: String)
   }
 }
 
-class DummyRunner[T] extends FunctionRunner[T, T] {
-
-  override def process(value: T): TraversableOnce[T] = Option(value)
-
-  override def description: String = ""
-}
+class DummyRunner[T] extends FlatMapper[T, T](FlatMapFunction(Option(_)), "")
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2d13b9cf/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala
index b3f3ad2..b615354 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala
@@ -25,7 +25,7 @@ import com.gs.collections.impl.map.mutable.UnifiedMap
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import 
org.apache.gearpump.streaming.Constants.{GEARPUMP_STREAMING_GROUPBY_FUNCTION, 
GEARPUMP_STREAMING_OPERATOR}
-import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, 
WindowRunner}
+import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, 
StreamingOperator}
 import org.apache.gearpump.streaming.source.Watermark
 import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskUtil}
 
@@ -44,20 +44,21 @@ class GroupByTask[IN, GROUP, OUT](
     )
   }
 
-  private val groups: UnifiedMap[GROUP, WindowRunner[IN, OUT]] =
-    new UnifiedMap[GROUP, WindowRunner[IN, OUT]]
+  private val groups: UnifiedMap[GROUP, StreamingOperator[IN, OUT]] =
+    new UnifiedMap[GROUP, StreamingOperator[IN, OUT]]
 
   override def onNext(message: Message): Unit = {
     val input = message.value.asInstanceOf[IN]
     val group = groupBy(input)
 
     if (!groups.containsKey(group)) {
-      groups.put(group,
-        userConfig.getValue[WindowRunner[IN, OUT]](
-          GEARPUMP_STREAMING_OPERATOR)(taskContext.system).get)
+      val operator = userConfig.getValue[StreamingOperator[IN, OUT]](
+        GEARPUMP_STREAMING_OPERATOR)(taskContext.system).get
+      operator.setup()
+      groups.put(group, operator)
     }
 
-    groups.get(group).process(TimestampedValue(message.value.asInstanceOf[IN],
+    groups.get(group).foreach(TimestampedValue(message.value.asInstanceOf[IN],
       message.timestamp))
   }
 
@@ -65,11 +66,19 @@ class GroupByTask[IN, GROUP, OUT](
     if (groups.isEmpty && watermark == Watermark.MAX) {
       taskContext.updateWatermark(Watermark.MAX)
     } else {
-      groups.values.forEach(new Consumer[WindowRunner[IN, OUT]] {
-        override def accept(runner: WindowRunner[IN, OUT]): Unit = {
-          TaskUtil.trigger(watermark, runner, taskContext)
+      groups.values.forEach(new Consumer[StreamingOperator[IN, OUT]] {
+        override def accept(operator: StreamingOperator[IN, OUT]): Unit = {
+          TaskUtil.trigger(watermark, operator, taskContext)
         }
       })
     }
   }
+
+  override def onStop(): Unit = {
+    groups.values.forEach(new Consumer[StreamingOperator[IN, OUT]] {
+      override def accept(operator: StreamingOperator[IN, OUT]): Unit = {
+        operator.teardown()
+      }
+    })
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2d13b9cf/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
index 5ad64fa..6c78e0b 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
@@ -22,25 +22,33 @@ import java.time.Instant
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.Constants._
-import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, 
WindowRunner}
+import org.apache.gearpump.streaming.dsl.window.impl.{StreamingOperator, 
TimestampedValue}
 import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskUtil}
 
 class TransformTask[IN, OUT](
-    runner: WindowRunner[IN, OUT],
+    operator: StreamingOperator[IN, OUT],
     taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, 
userConf) {
 
   def this(context: TaskContext, conf: UserConfig) = {
     this(
-      conf.getValue[WindowRunner[IN, 
OUT]](GEARPUMP_STREAMING_OPERATOR)(context.system).get,
+      conf.getValue[StreamingOperator[IN, 
OUT]](GEARPUMP_STREAMING_OPERATOR)(context.system).get,
       context, conf
     )
   }
 
+  override def onStart(startTime: Instant): Unit = {
+    operator.setup()
+  }
+
   override def onNext(msg: Message): Unit = {
-    runner.process(TimestampedValue(msg.value.asInstanceOf[IN], msg.timestamp))
+    operator.foreach(TimestampedValue(msg.value.asInstanceOf[IN], 
msg.timestamp))
   }
 
   override def onWatermarkProgress(watermark: Instant): Unit = {
-    TaskUtil.trigger(watermark, runner, taskContext)
+    TaskUtil.trigger(watermark, operator, taskContext)
+  }
+
+  override def onStop(): Unit = {
+    operator.teardown()
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2d13b9cf/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/StreamingOperator.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/StreamingOperator.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/StreamingOperator.scala
new file mode 100644
index 0000000..4f29c9e
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/StreamingOperator.scala
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.window.impl
+
+import java.time.Instant
+
+import com.gs.collections.api.block.predicate.Predicate
+import com.gs.collections.api.block.procedure.Procedure
+import com.gs.collections.impl.list.mutable.FastList
+import com.gs.collections.impl.map.sorted.mutable.TreeSortedMap
+import org.apache.gearpump.Message
+import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner
+import org.apache.gearpump.streaming.dsl.window.api.WindowFunction.Context
+import org.apache.gearpump.streaming.dsl.window.api.{Discarding, Windows}
+import org.apache.gearpump.streaming.source.Watermark
+import org.apache.gearpump.streaming.task.TaskUtil
+
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ * Inputs for [[StreamingOperator]].
+ */
+case class TimestampedValue[T](value: T, timestamp: Instant) {
+
+  def this(msg: Message) = {
+    this(msg.value.asInstanceOf[T], msg.timestamp)
+  }
+
+  def toMessage: Message = Message(value, timestamp)
+}
+
+/**
+ * Outputs triggered by [[StreamingOperator]]
+ */
+case class TriggeredOutputs[T](outputs: TraversableOnce[TimestampedValue[T]],
+    watermark: Instant)
+
+
+trait StreamingOperator[IN, OUT] extends java.io.Serializable {
+
+  def setup(): Unit = {}
+
+  def foreach(tv: TimestampedValue[IN]): Unit
+
+  def flatMap(
+      tv: TimestampedValue[IN]): TraversableOnce[TimestampedValue[OUT]] = {
+    foreach(tv)
+    None
+  }
+
+  def trigger(time: Instant): TriggeredOutputs[OUT]
+
+  def teardown(): Unit = {}
+}
+
+/**
+ * A composite WindowRunner that first executes its left child and feeds 
results
+ * into result child.
+ */
+case class AndThenOperator[IN, MIDDLE, OUT](left: StreamingOperator[IN, 
MIDDLE],
+    right: StreamingOperator[MIDDLE, OUT]) extends StreamingOperator[IN, OUT] {
+
+  override def setup(): Unit = {
+    left.setup()
+    right.setup()
+  }
+
+  override def foreach(
+      tv: TimestampedValue[IN]): Unit = {
+    left.flatMap(tv).foreach(right.flatMap)
+  }
+
+  override def flatMap(
+      tv: TimestampedValue[IN]): TraversableOnce[TimestampedValue[OUT]] = {
+    left.flatMap(tv).flatMap(right.flatMap)
+  }
+
+  override def trigger(time: Instant): TriggeredOutputs[OUT] = {
+    val lOutputs = left.trigger(time)
+    lOutputs.outputs.foreach(right.foreach)
+    right.trigger(lOutputs.watermark)
+  }
+
+  override def teardown(): Unit = {
+    left.teardown()
+    right.teardown()
+  }
+}
+
+/**
+ * @param runner FlatMapper or chained FlatMappers
+ */
+class FlatMapOperator[IN, OUT](runner: FunctionRunner[IN, OUT])
+  extends StreamingOperator[IN, OUT] {
+
+  override def setup(): Unit = {
+    runner.setup()
+  }
+
+  override def foreach(tv: TimestampedValue[IN]): Unit = {
+    throw new UnsupportedOperationException("foreach should not be invoked on 
FlatMapOperator; " +
+      "please use flatMap instead")
+  }
+
+  override def flatMap(
+      tv: TimestampedValue[IN]): TraversableOnce[TimestampedValue[OUT]] = {
+    runner.process(tv.value)
+      .map(TimestampedValue(_, tv.timestamp))
+  }
+
+  override def trigger(time: Instant): TriggeredOutputs[OUT] = {
+    TriggeredOutputs(None, time)
+  }
+
+  override def teardown(): Unit = {
+    runner.teardown()
+  }
+}
+
+/**
+ * This is responsible for executing window calculation.
+ *   1. Groups elements into windows as defined by window function
+ *   2. Applies window calculation to each group
+ *   3. Emits results on triggering
+ */
+class WindowOperator[IN, OUT](
+    windows: Windows,
+    runner: FunctionRunner[IN, OUT])
+  extends StreamingOperator[IN, OUT] {
+
+  private val windowFn = windows.windowFn
+  private val windowInputs = new TreeSortedMap[Window, 
FastList[TimestampedValue[IN]]]
+  private var isSetup = false
+  private var watermark = Watermark.MIN
+
+  override def foreach(
+      tv: TimestampedValue[IN]): Unit = {
+    val wins = windowFn(new Context[IN] {
+      override def element: IN = tv.value
+
+      override def timestamp: Instant = tv.timestamp
+    })
+    wins.foreach { win =>
+      if (windowFn.isNonMerging) {
+        if (!windowInputs.containsKey(win)) {
+          val inputs = new FastList[TimestampedValue[IN]]
+          windowInputs.put(win, inputs)
+        }
+        windowInputs.get(win).add(tv)
+      } else {
+        merge(windowInputs, win, tv)
+      }
+    }
+
+    def merge(
+        winIns: TreeSortedMap[Window, FastList[TimestampedValue[IN]]],
+        win: Window, tv: TimestampedValue[IN]): Unit = {
+      val intersected = winIns.keySet.select(new Predicate[Window] {
+        override def accept(each: Window): Boolean = {
+          win.intersects(each)
+        }
+      })
+      var mergedWin = win
+      val mergedInputs = FastList.newListWith(tv)
+      intersected.forEach(new Procedure[Window] {
+        override def value(each: Window): Unit = {
+          mergedWin = mergedWin.span(each)
+          mergedInputs.addAll(winIns.remove(each))
+        }
+      })
+      winIns.put(mergedWin, mergedInputs)
+    }
+  }
+
+  override def trigger(time: Instant): TriggeredOutputs[OUT] = {
+    @annotation.tailrec
+    def onTrigger(
+        outputs: ArrayBuffer[TimestampedValue[OUT]],
+        wmk: Instant): TriggeredOutputs[OUT] = {
+      if (windowInputs.notEmpty()) {
+        val firstWin = windowInputs.firstKey
+        if (!time.isBefore(firstWin.endTime)) {
+          val inputs = windowInputs.remove(firstWin)
+          if (!isSetup) {
+            runner.setup()
+            isSetup = true
+          }
+          inputs.forEach(new Procedure[TimestampedValue[IN]] {
+            override def value(tv: TimestampedValue[IN]): Unit = {
+              runner.process(tv.value).foreach {
+                out: OUT => outputs += TimestampedValue(out, tv.timestamp)
+              }
+            }
+          })
+          runner.finish().foreach {
+            out: OUT =>
+              outputs += TimestampedValue(out, firstWin.endTime.minusMillis(1))
+          }
+          val newWmk = TaskUtil.max(wmk, firstWin.endTime)
+          if (windows.accumulationMode == Discarding) {
+            runner.teardown()
+            // discarding, setup need to be called for each window
+            isSetup = false
+          }
+          onTrigger(outputs, newWmk)
+        } else {
+          // The output watermark is the minimum of end of last triggered 
window
+          // and start of first un-triggered window
+          TriggeredOutputs(outputs, TaskUtil.min(wmk, firstWin.startTime))
+        }
+      } else {
+        // All windows have been triggered.
+        if (time == Watermark.MAX) {
+          // This means there will be no more inputs
+          // so it's safe to advance to the maximum watermark.
+          TriggeredOutputs(outputs, Watermark.MAX)
+        } else {
+          TriggeredOutputs(outputs, wmk)
+        }
+      }
+    }
+
+    val triggeredOutputs = onTrigger(ArrayBuffer.empty[TimestampedValue[OUT]], 
watermark)
+    watermark = TaskUtil.max(watermark, triggeredOutputs.watermark)
+    TriggeredOutputs(triggeredOutputs.outputs, watermark)
+  }
+
+  override def teardown(): Unit = {
+    runner.teardown()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2d13b9cf/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
deleted file mode 100644
index ee3c067..0000000
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.dsl.window.impl
-
-import java.time.Instant
-
-import com.gs.collections.api.block.predicate.Predicate
-import com.gs.collections.api.block.procedure.Procedure
-import com.gs.collections.impl.list.mutable.FastList
-import com.gs.collections.impl.map.sorted.mutable.TreeSortedMap
-import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner
-import org.apache.gearpump.streaming.dsl.window.api.WindowFunction.Context
-import org.apache.gearpump.streaming.dsl.window.api.{Discarding, Windows}
-import org.apache.gearpump.streaming.source.Watermark
-import org.apache.gearpump.streaming.task.TaskUtil
-
-import scala.collection.mutable.ArrayBuffer
-
-/**
- * Inputs for [[WindowRunner]].
- */
-case class TimestampedValue[T](value: T, timestamp: Instant)
-
-/**
- * Outputs triggered by [[WindowRunner]]
- */
-case class TriggeredOutputs[T](outputs: TraversableOnce[TimestampedValue[T]],
-    watermark: Instant)
-
-/**
- * This is responsible for executing window calculation.
- *   1. Groups elements into windows as defined by window function
- *   2. Applies window calculation to each group
- *   3. Emits results on triggering
- */
-trait WindowRunner[IN, OUT] extends java.io.Serializable {
-
-  def process(timestampedValue: TimestampedValue[IN]): Unit
-
-  def trigger(time: Instant): TriggeredOutputs[OUT]
-}
-
-/**
- * A composite WindowRunner that first executes its left child and feeds 
results
- * into result child.
- */
-case class AndThen[IN, MIDDLE, OUT](left: WindowRunner[IN, MIDDLE],
-    right: WindowRunner[MIDDLE, OUT]) extends WindowRunner[IN, OUT] {
-
-  override def process(timestampedValue: TimestampedValue[IN]): Unit = {
-    left.process(timestampedValue)
-  }
-
-  override def trigger(time: Instant): TriggeredOutputs[OUT] = {
-    val lOutputs = left.trigger(time)
-    lOutputs.outputs.foreach(right.process)
-    right.trigger(lOutputs.watermark)
-  }
-}
-
-/**
- * Default implementation for [[WindowRunner]].
- */
-class DefaultWindowRunner[IN, OUT](
-    windows: Windows,
-    fnRunner: FunctionRunner[IN, OUT])
-  extends WindowRunner[IN, OUT] {
-
-  private val windowFn = windows.windowFn
-  private val windowInputs = new TreeSortedMap[Window, 
FastList[TimestampedValue[IN]]]
-  private var setup = false
-  private var watermark = Watermark.MIN
-
-  override def process(timestampedValue: TimestampedValue[IN]): Unit = {
-    val wins = windowFn(new Context[IN] {
-      override def element: IN = timestampedValue.value
-
-      override def timestamp: Instant = timestampedValue.timestamp
-    })
-    wins.foreach { win =>
-      if (windowFn.isNonMerging) {
-        if (!windowInputs.containsKey(win)) {
-          val inputs = new FastList[TimestampedValue[IN]]
-          windowInputs.put(win, inputs)
-        }
-        windowInputs.get(win).add(timestampedValue)
-      } else {
-        merge(windowInputs, win, timestampedValue)
-      }
-    }
-
-    def merge(
-        winIns: TreeSortedMap[Window, FastList[TimestampedValue[IN]]],
-        win: Window, tv: TimestampedValue[IN]): Unit = {
-      val intersected = winIns.keySet.select(new Predicate[Window] {
-        override def accept(each: Window): Boolean = {
-          win.intersects(each)
-        }
-      })
-      var mergedWin = win
-      val mergedInputs = FastList.newListWith(tv)
-      intersected.forEach(new Procedure[Window] {
-        override def value(each: Window): Unit = {
-          mergedWin = mergedWin.span(each)
-          mergedInputs.addAll(winIns.remove(each))
-        }
-      })
-      winIns.put(mergedWin, mergedInputs)
-    }
-  }
-
-  override def trigger(time: Instant): TriggeredOutputs[OUT] = {
-    @annotation.tailrec
-    def onTrigger(
-        outputs: ArrayBuffer[TimestampedValue[OUT]],
-        wmk: Instant): TriggeredOutputs[OUT] = {
-      if (windowInputs.notEmpty()) {
-        val firstWin = windowInputs.firstKey
-        if (!time.isBefore(firstWin.endTime)) {
-          val inputs = windowInputs.remove(firstWin)
-          if (!setup) {
-            fnRunner.setup()
-            setup = true
-          }
-          inputs.forEach(new Procedure[TimestampedValue[IN]] {
-            override def value(tv: TimestampedValue[IN]): Unit = {
-              fnRunner.process(tv.value).foreach {
-                out: OUT => outputs += TimestampedValue(out, tv.timestamp)
-              }
-            }
-          })
-          fnRunner.finish().foreach {
-            out: OUT =>
-              outputs += TimestampedValue(out, firstWin.endTime.minusMillis(1))
-          }
-          val newWmk = TaskUtil.max(wmk, firstWin.endTime)
-          if (windows.accumulationMode == Discarding) {
-            fnRunner.teardown()
-            // discarding, setup need to be called for each window
-            setup = false
-          }
-          onTrigger(outputs, newWmk)
-        } else {
-          // The output watermark is the minimum of end of last triggered 
window
-          // and start of first un-triggered window
-          TriggeredOutputs(outputs, TaskUtil.min(wmk, firstWin.startTime))
-        }
-      } else {
-        // All windows have been triggered.
-        if (time == Watermark.MAX) {
-          // This means there will be no more inputs
-          // so it's safe to advance to the maximum watermark.
-          TriggeredOutputs(outputs, Watermark.MAX)
-        } else {
-          TriggeredOutputs(outputs, wmk)
-        }
-      }
-    }
-
-    val triggeredOutputs = onTrigger(ArrayBuffer.empty[TimestampedValue[OUT]], 
watermark)
-    watermark = TaskUtil.max(watermark, triggeredOutputs.watermark)
-    TriggeredOutputs(triggeredOutputs.outputs, watermark)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2d13b9cf/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala
index dd4c0d3..c471a00 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala
@@ -18,11 +18,11 @@
 
 package org.apache.gearpump.streaming.source
 
+
 import akka.actor.ActorSystem
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.dsl.plan.functions.DummyRunner
-import org.apache.gearpump.streaming.dsl.window.api.{WindowFunction, Windows}
-import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, 
Window, WindowRunner}
+import org.apache.gearpump.streaming.dsl.window.impl.{FlatMapOperator, 
StreamingOperator}
 import org.apache.gearpump.streaming.{Constants, Processor}
 
 /**
@@ -48,19 +48,9 @@ object DataSourceProcessor {
     Processor[DataSourceTask[Any, Any]](parallelism, description,
       taskConf
         .withValue[DataSource](Constants.GEARPUMP_STREAMING_SOURCE, dataSource)
-        .withValue[WindowRunner[Any, 
Any]](Constants.GEARPUMP_STREAMING_OPERATOR,
-        new DefaultWindowRunner[Any, Any](
-          Windows(PerElementWindowFunction, description = "perElementWindows"),
-          new DummyRunner[Any])))
-  }
-
-
-  case object PerElementWindowFunction extends WindowFunction {
-    override def apply[T](
-        context: WindowFunction.Context[T]): Array[Window] = {
-      Array(Window(context.timestamp, context.timestamp.plusMillis(1)))
-    }
-
-    override def isNonMerging: Boolean = true
+        .withValue[StreamingOperator[Any, 
Any]](Constants.GEARPUMP_STREAMING_OPERATOR,
+        new FlatMapOperator(new DummyRunner[Any])
+      )
+    )
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2d13b9cf/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
index f93c496..b09ad66 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
@@ -23,7 +23,7 @@ import java.time.Instant
 import org.apache.gearpump._
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.Constants._
-import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, 
WindowRunner}
+import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, 
StreamingOperator}
 import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskUtil}
 
 /**
@@ -40,7 +40,7 @@ import org.apache.gearpump.streaming.task.{Task, TaskContext, 
TaskUtil}
  */
 class DataSourceTask[IN, OUT] private[source](
     source: DataSource,
-    windowRunner: WindowRunner[IN, OUT],
+    operator: StreamingOperator[IN, OUT],
     context: TaskContext,
     conf: UserConfig)
   extends Task(context, conf) {
@@ -48,7 +48,7 @@ class DataSourceTask[IN, OUT] private[source](
   def this(context: TaskContext, conf: UserConfig) = {
     this(
       conf.getValue[DataSource](GEARPUMP_STREAMING_SOURCE)(context.system).get,
-      conf.getValue[WindowRunner[IN, 
OUT]](GEARPUMP_STREAMING_OPERATOR)(context.system).get,
+      conf.getValue[StreamingOperator[IN, 
OUT]](GEARPUMP_STREAMING_OPERATOR)(context.system).get,
       context, conf
     )
   }
@@ -58,27 +58,32 @@ class DataSourceTask[IN, OUT] private[source](
   override def onStart(startTime: Instant): Unit = {
     LOG.info(s"opening data source at ${startTime.toEpochMilli}")
     source.open(context, startTime)
+    operator.setup()
 
     self ! Watermark(source.getWatermark)
   }
 
   override def onNext(m: Message): Unit = {
     0.until(batchSize).foreach { _ =>
-      Option(source.read()).foreach(
-        msg => windowRunner.process(
-          TimestampedValue(msg.value.asInstanceOf[IN], msg.timestamp)))
+      Option(source.read()).foreach(process)
     }
 
     self ! Watermark(source.getWatermark)
   }
 
   override def onWatermarkProgress(watermark: Instant): Unit = {
-    TaskUtil.trigger(watermark, windowRunner, context)
+    TaskUtil.trigger(watermark, operator, context)
   }
 
   override def onStop(): Unit = {
     LOG.info("closing data source...")
     source.close()
+    operator.teardown()
+  }
+
+  private def process(msg: Message): Unit = {
+    operator.flatMap(new TimestampedValue(msg))
+      .foreach { tv => context.output(tv.toMessage) }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2d13b9cf/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskUtil.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskUtil.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskUtil.scala
index bd889c4..ed304ce 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskUtil.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskUtil.scala
@@ -21,7 +21,7 @@ package org.apache.gearpump.streaming.task
 import java.time.Instant
 
 import org.apache.gearpump.Message
-import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, 
WindowRunner}
+import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, 
StreamingOperator}
 
 object TaskUtil {
 
@@ -36,7 +36,7 @@ object TaskUtil {
     loader.loadClass(className).asSubclass(classOf[Task])
   }
 
-  def trigger[IN, OUT](watermark: Instant, runner: WindowRunner[IN, OUT],
+  def trigger[IN, OUT](watermark: Instant, runner: StreamingOperator[IN, OUT],
       context: TaskContext): Unit = {
     val triggeredOutputs = runner.trigger(watermark)
     context.updateWatermark(triggeredOutputs.watermark)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2d13b9cf/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
index ca0135d..79ef135 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
@@ -28,7 +28,7 @@ import 
org.apache.gearpump.streaming.dsl.plan.OpSpec.{AnySink, AnySource, AnyTas
 import org.apache.gearpump.streaming.dsl.plan.functions.{DummyRunner, 
FlatMapper, FunctionRunner}
 import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
 import org.apache.gearpump.streaming.dsl.window.api.GlobalWindows
-import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, 
WindowRunner}
+import org.apache.gearpump.streaming.dsl.window.impl.{WindowOperator, 
StreamingOperator}
 import org.apache.gearpump.streaming.sink.DataSink
 import org.apache.gearpump.streaming.source.DataSource
 import org.apache.gearpump.streaming.task.{Task, TaskContext}
@@ -66,7 +66,7 @@ class OpSpec extends WordSpec with Matchers with 
BeforeAndAfterAll with MockitoS
       val dataSourceOp = DataSourceOp(dataSource)
       val transformOp = mock[TransformOp[Any, Any]]
       val fn = mock[FunctionRunner[Any, Any]]
-      when(transformOp.fn).thenReturn(fn)
+      when(transformOp.runner).thenReturn(fn)
 
       val chainedOp = dataSourceOp.chain(transformOp)
 
@@ -85,7 +85,7 @@ class OpSpec extends WordSpec with Matchers with 
BeforeAndAfterAll with MockitoS
       val processor = dataSourceOp.toProcessor
       processor shouldBe a[Processor[_]]
       processor.parallelism shouldBe dataSourceOp.parallelism
-      processor.description shouldBe 
s"${dataSourceOp.description}.globalWindows"
+      processor.description shouldBe s"${dataSourceOp.description}"
     }
   }
 
@@ -173,9 +173,9 @@ class OpSpec extends WordSpec with Matchers with 
BeforeAndAfterAll with MockitoS
 
     "chain WindowTransformOp" in {
 
-      val runner = new DefaultWindowRunner[Any, Any](GlobalWindows(), new 
DummyRunner())
+      val runner = new WindowOperator[Any, Any](GlobalWindows(), new 
DummyRunner())
       val windowTransformOp = mock[WindowTransformOp[Any, Any]]
-      when(windowTransformOp.windowRunner).thenReturn(runner)
+      when(windowTransformOp.operator).thenReturn(runner)
 
       val chainedOp = groupByOp.chain(windowTransformOp)
       chainedOp shouldBe a[GroupByOp[_, _]]
@@ -199,9 +199,9 @@ class OpSpec extends WordSpec with Matchers with 
BeforeAndAfterAll with MockitoS
     val mergeOp = MergeOp()
 
     "chain WindowTransformOp" in {
-      val runner = mock[WindowRunner[Any, Any]]
+      val runner = mock[StreamingOperator[Any, Any]]
       val windowTransformOp = mock[WindowTransformOp[Any, Any]]
-      when(windowTransformOp.windowRunner).thenReturn(runner)
+      when(windowTransformOp.operator).thenReturn(runner)
 
       val chainedOp = mergeOp.chain(windowTransformOp)
       chainedOp shouldBe a [MergeOp]

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2d13b9cf/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
index be4cc63..f2c9d0e 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
@@ -87,8 +87,8 @@ class PlannerSpec extends FlatSpec with Matchers with 
BeforeAndAfterAll with Moc
       .mapVertex(_.description)
 
     plan.getVertices.toSet should contain theSameElementsAs
-      Set("source.globalWindows", "groupBy.globalWindows.flatMap.reduce", 
"processor", "sink")
-    plan.outgoingEdgesOf("source.globalWindows").iterator.next()._2 shouldBe
+      Set("source", "groupBy.globalWindows.flatMap.reduce", "processor", 
"sink")
+    plan.outgoingEdgesOf("source").iterator.next()._2 shouldBe
       a[GroupByPartitioner[_, _]]
     
plan.outgoingEdgesOf("groupBy.globalWindows.flatMap.reduce").iterator.next()._2 
shouldBe
       a[CoLocationPartitioner]

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2d13b9cf/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala
index 6244224..c92f9c8 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala
@@ -29,7 +29,7 @@ import 
org.apache.gearpump.streaming.dsl.scalaapi.CollectionDataSource
 import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
 import org.apache.gearpump.streaming.dsl.task.TransformTask
 import org.apache.gearpump.streaming.dsl.window.api.GlobalWindows
-import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, 
WindowRunner}
+import org.apache.gearpump.streaming.dsl.window.impl.{WindowOperator, 
StreamingOperator}
 import org.mockito.Matchers._
 import org.mockito.Mockito._
 import org.scalatest.{Matchers, WordSpec}
@@ -218,11 +218,11 @@ class FunctionRunnerSpec extends WordSpec with Matchers 
with MockitoSugar {
 
       val data = "one two three".split("\\s+")
       val dataSource = new CollectionDataSource[String](data)
-      val runner1 = new DefaultWindowRunner[String, String](
+      val runner1 = new WindowOperator[String, String](
         GlobalWindows(), new DummyRunner[String])
       val conf = UserConfig.empty
         .withValue(GEARPUMP_STREAMING_SOURCE, dataSource)
-        .withValue[WindowRunner[String, String]](GEARPUMP_STREAMING_OPERATOR, 
runner1)
+        .withValue[StreamingOperator[String, 
String]](GEARPUMP_STREAMING_OPERATOR, runner1)
 
       // Source with no transformer
       val source = new DataSourceTask[String, String](
@@ -239,7 +239,7 @@ class FunctionRunnerSpec extends WordSpec with Matchers 
with MockitoSugar {
       val anotherTaskContext = MockUtil.mockTaskContext
       val double = new FlatMapper[String, String](FlatMapFunction(
         word => List(word, word)), "double")
-      val runner2 = new DefaultWindowRunner[String, String](
+      val runner2 = new WindowOperator[String, String](
         GlobalWindows(), double)
       val another = new DataSourceTask(anotherTaskContext,
         conf.withValue(GEARPUMP_STREAMING_OPERATOR, runner2))
@@ -262,7 +262,7 @@ class FunctionRunnerSpec extends WordSpec with Matchers 
with MockitoSugar {
       val conf = UserConfig.empty
       val double = new FlatMapper[String, String](FlatMapFunction(
         word => List(word, word)), "double")
-      val transform = new DefaultWindowRunner[String, String](GlobalWindows(), 
double)
+      val transform = new WindowOperator[String, String](GlobalWindows(), 
double)
       val task = new TransformTask[String, String](transform, taskContext, 
conf)
       task.onStart(Instant.EPOCH)
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2d13b9cf/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala
index d43bca0..f701c5e 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala
@@ -61,9 +61,9 @@ class StreamAppSpec extends FlatSpec with Matchers with 
BeforeAndAfterAll with M
     dag.getVertices.size shouldBe 2
     dag.getVertices.foreach { processor =>
       processor.taskClass shouldBe classOf[DataSourceTask[_, _]].getName
-      if (processor.description == "A.globalWindows") {
+      if (processor.description == "A") {
         processor.parallelism shouldBe 2
-      } else if (processor.description == "B.globalWindows") {
+      } else if (processor.description == "B") {
         processor.parallelism shouldBe 3
       } else {
         fail(s"undefined source ${processor.description}")

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2d13b9cf/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/GroupByTaskSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/GroupByTaskSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/GroupByTaskSpec.scala
index 9e6bf59..62e14f4 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/GroupByTaskSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/GroupByTaskSpec.scala
@@ -24,7 +24,7 @@ import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.dsl.plan.functions.DummyRunner
 import org.apache.gearpump.streaming.dsl.window.api.GlobalWindows
 import org.apache.gearpump.streaming.{Constants, MockUtil}
-import org.apache.gearpump.streaming.dsl.window.impl.DefaultWindowRunner
+import org.apache.gearpump.streaming.dsl.window.impl.WindowOperator
 import org.apache.gearpump.streaming.source.Watermark
 import org.mockito.Mockito._
 import org.scalacheck.Gen
@@ -40,7 +40,7 @@ class GroupByTaskSpec extends PropSpec with PropertyChecks
 
     forAll(longGen) { (time: Instant) =>
       val groupBy = mock[Any => Int]
-      val windowRunner = new DefaultWindowRunner[Any, Any](GlobalWindows(), 
new DummyRunner[Any])
+      val windowRunner = new WindowOperator[Any, Any](GlobalWindows(), new 
DummyRunner[Any])
       val context = MockUtil.mockTaskContext
       val config = UserConfig.empty
         .withValue(

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2d13b9cf/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
index e38c5a3..0009ad5 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
@@ -22,7 +22,7 @@ import java.time.Instant
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, 
TriggeredOutputs, WindowRunner}
+import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, 
TriggeredOutputs, StreamingOperator}
 import org.mockito.Mockito.{verify, when}
 import org.scalacheck.Gen
 import org.scalatest.{Matchers, PropSpec}
@@ -36,7 +36,7 @@ class TransformTaskSpec extends PropSpec with PropertyChecks 
with Matchers with
     val watermarkGen = longGen.map(Instant.ofEpochMilli)
 
     forAll(watermarkGen) { (watermark: Instant) =>
-      val windowRunner = mock[WindowRunner[Any, Any]]
+      val windowRunner = mock[StreamingOperator[Any, Any]]
       val context = MockUtil.mockTaskContext
       val config = UserConfig.empty
       val task = new TransformTask[Any, Any](windowRunner, context, config)
@@ -45,7 +45,7 @@ class TransformTaskSpec extends PropSpec with PropertyChecks 
with Matchers with
       val message = Message(value, time)
 
       task.onNext(message)
-      verify(windowRunner).process(TimestampedValue(value, time))
+      verify(windowRunner).foreach(TimestampedValue(value, time))
 
       when(windowRunner.trigger(watermark)).thenReturn(
         TriggeredOutputs(Some(TimestampedValue(value, time)), watermark))

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2d13b9cf/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala
index b23d0ee..1ac7213 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala
@@ -46,10 +46,10 @@ class DefaultWindowRunnerSpec extends PropSpec with 
PropertyChecks
     implicit val system = MockUtil.system
     val reduce = ReduceFunction[KV]((kv1, kv2) => (kv1._1, kv1._2 + kv2._2))
     val windows = SessionWindows.apply(Duration.ofMillis(4L))
-    val windowRunner = new DefaultWindowRunner[KV, Option[KV]](windows,
+    val windowRunner = new WindowOperator[KV, Option[KV]](windows,
       new FoldRunner[KV, Option[KV]](reduce, "reduce"))
 
-    data.foreach(m => 
windowRunner.process(TimestampedValue(m.value.asInstanceOf[KV], m.timestamp)))
+    data.foreach(m => 
windowRunner.foreach(TimestampedValue(m.value.asInstanceOf[KV], m.timestamp)))
     windowRunner.trigger(Watermark.MAX).outputs.toList shouldBe
       List(
         TimestampedValue(Some(("foo", 1)), Instant.ofEpochMilli(4)),

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2d13b9cf/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
index d62739a..cd2cfa7 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
@@ -23,7 +23,7 @@ import java.time.Instant
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, 
TriggeredOutputs, WindowRunner}
+import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, 
TriggeredOutputs, StreamingOperator}
 import org.mockito.Mockito._
 import org.scalacheck.Gen
 import org.scalatest.mock.MockitoSugar
@@ -40,7 +40,7 @@ class DataSourceTaskSpec extends PropSpec with PropertyChecks 
with Matchers with
       val dataSource = mock[DataSource]
       val config = UserConfig.empty
         .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1)
-        val runner = mock[WindowRunner[Any, Any]]
+        val runner = mock[StreamingOperator[Any, Any]]
       val sourceTask = new DataSourceTask[Any, Any](dataSource, runner, 
taskContext, config)
 
       sourceTask.onStart(startTime)
@@ -57,13 +57,17 @@ class DataSourceTaskSpec extends PropSpec with 
PropertyChecks with Matchers with
         val dataSource = mock[DataSource]
         val config = UserConfig.empty
           .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1)
-        val runner = mock[WindowRunner[Any, Any]]
-        val sourceTask = new DataSourceTask[Any, Any](dataSource, runner, 
taskContext, config)
+        val processor = mock[StreamingOperator[String, String]]
+        val sourceTask = new DataSourceTask[String, String](dataSource, 
processor,
+          taskContext, config)
         val msg = Message(str, timestamp)
         when(dataSource.read()).thenReturn(msg)
 
-        when(runner.trigger(Watermark.MAX)).thenReturn(
-          TriggeredOutputs(Some(TimestampedValue(str.asInstanceOf[Any], 
timestamp)), Watermark.MAX))
+        when(processor.flatMap(new TimestampedValue[String](msg))).thenReturn(
+          Some(new TimestampedValue[String](msg))
+        )
+        when(processor.trigger(Watermark.MAX)).thenReturn(
+          TriggeredOutputs[String](None, Watermark.MAX))
 
         sourceTask.onNext(Message("next"))
         sourceTask.onWatermarkProgress(Watermark.MAX)
@@ -79,7 +83,7 @@ class DataSourceTaskSpec extends PropSpec with PropertyChecks 
with Matchers with
     val dataSource = mock[DataSource]
     val config = UserConfig.empty
       .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1)
-    val runner = mock[WindowRunner[Any, Any]]
+    val runner = mock[StreamingOperator[Any, Any]]
     val sourceTask = new DataSourceTask[Any, Any](dataSource, runner, 
taskContext, config)
 
     sourceTask.onStop()

Reply via email to