Merge branch 'master' into akka-streams

Author: manuzhang <[email protected]>
Author: darionyaphet <[email protected]>

Closes #95 from manuzhang/akka-streams.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/bc394035
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/bc394035
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/bc394035

Branch: refs/heads/akka-streams
Commit: bc39403525b4065360acf82386fac21a588b59e6
Parents: 4fe5458
Author: manuzhang <[email protected]>
Authored: Tue Oct 11 11:57:48 2016 +0800
Committer: manuzhang <[email protected]>
Committed: Tue Oct 11 11:57:48 2016 +0800

----------------------------------------------------------------------
 .../wordcount/dsl/WindowedWordCount.scala       |  87 ++++
 .../apache/gearpump/redis/RedisMessage.scala    | 456 +++++++++++++++++++
 .../org/apache/gearpump/redis/RedisSink.scala   | 119 +++++
 project/Build.scala                             |  62 ++-
 project/BuildShaded.scala                       | 127 +++---
 .../apache/gearpump/streaming/Constants.scala   |   1 +
 .../gearpump/streaming/StreamApplication.scala  |   2 +-
 .../apache/gearpump/streaming/dsl/Stream.scala  | 106 +++--
 .../gearpump/streaming/dsl/StreamApp.scala      |  34 +-
 .../streaming/dsl/javaapi/JavaStream.scala      |  22 +-
 .../apache/gearpump/streaming/dsl/op/OP.scala   | 109 -----
 .../dsl/partitioner/GroupByPartitioner.scala    |  49 ++
 .../dsl/partitioner/GroupbyPartitioner.scala    |  46 --
 .../apache/gearpump/streaming/dsl/plan/OP.scala | 214 +++++++++
 .../streaming/dsl/plan/OpTranslator.scala       | 222 ---------
 .../gearpump/streaming/dsl/plan/Planner.scala   |  65 ++-
 .../plan/functions/SingleInputFunction.scala    | 107 +++++
 .../streaming/dsl/task/CountTriggerTask.scala   |  63 +++
 .../dsl/task/EventTimeTriggerTask.scala         |  59 +++
 .../dsl/task/ProcessingTimeTriggerTask.scala    |  82 ++++
 .../streaming/dsl/task/TransformTask.scala      |  47 ++
 .../dsl/window/api/AccumulationMode.scala       |  24 +
 .../streaming/dsl/window/api/GroupByFn.scala    |  47 ++
 .../streaming/dsl/window/api/Trigger.scala      |  27 ++
 .../streaming/dsl/window/api/Window.scala       |  77 ++++
 .../streaming/dsl/window/api/WindowFn.scala     |  63 +++
 .../dsl/window/impl/ReduceFnRunner.scala        |  29 ++
 .../streaming/dsl/window/impl/Window.scala      |  75 +++
 .../dsl/window/impl/WindowRunner.scala          | 114 +++++
 .../streaming/source/DataSourceTask.scala       |  15 +-
 .../gearpump/streaming/task/TaskActor.scala     |   4 +-
 .../gearpump/streaming/dsl/StreamAppSpec.scala  |  67 +--
 .../gearpump/streaming/dsl/StreamSpec.scala     |  24 +-
 .../partitioner/GroupByPartitionerSpec.scala    |  23 +-
 .../gearpump/streaming/dsl/plan/OpSpec.scala    | 244 ++++++++++
 .../streaming/dsl/plan/OpTranslatorSpec.scala   | 148 ------
 .../streaming/dsl/plan/PlannerSpec.scala        | 132 ++++++
 .../functions/SingleInputFunctionSpec.scala     | 333 ++++++++++++++
 .../dsl/task/CountTriggerTaskSpec.scala         |  61 +++
 .../dsl/task/EventTimeTriggerTaskSpec.scala     |  66 +++
 .../task/ProcessingTimeTriggerTaskSpec.scala    |  69 +++
 41 files changed, 2937 insertions(+), 784 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
 
b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
new file mode 100644
index 0000000..4f43fd4
--- /dev/null
+++ 
b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.examples.wordcount.dsl
+
+import java.time.{Duration, Instant}
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.streaming.dsl.{LoggerSink, StreamApp}
+import org.apache.gearpump.streaming.dsl.window.api.{EventTimeTrigger, 
FixedWindow}
+import org.apache.gearpump.streaming.source.DataSource
+import org.apache.gearpump.streaming.task.TaskContext
+import org.apache.gearpump.util.AkkaApp
+
+object WindowedWordCount extends AkkaApp with ArgumentsParser {
+
+  override val options: Array[(String, CLIOption[Any])] = Array.empty
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    val context = ClientContext(akkaConf)
+    val app = StreamApp("dsl", context)
+    app.source[String](new TimedDataSource).
+      // word => (word, count)
+      flatMap(line => line.split("[\\s]+")).map((_, 1)).
+      // fix window
+      window(FixedWindow.apply(Duration.ofMillis(5L))
+        .triggering(EventTimeTrigger)).
+      // (word, count1), (word, count2) => (word, count1 + count2)
+      groupBy(_._1).
+      sum.sink(new LoggerSink)
+
+    context.submit(app)
+    context.close()
+  }
+
+  private class TimedDataSource extends DataSource {
+
+    private var data = List(
+      Message("foo", 1L),
+      Message("bar", 2L),
+      Message("foo", 3L),
+      Message("foo", 5L),
+      Message("bar", 7L),
+      Message("bar", 8L)
+    )
+
+    private var watermark: Instant = Instant.ofEpochMilli(0)
+
+    override def read(): Message = {
+      if (data.nonEmpty) {
+        val msg = data.head
+        data = data.tail
+        watermark = Instant.ofEpochMilli(msg.timestamp)
+        msg
+      } else {
+        null
+      }
+    }
+
+    override def open(context: TaskContext, startTime: Instant): Unit = {}
+
+    override def close(): Unit = {}
+
+    override def getWatermark: Instant = {
+      if (data.isEmpty) {
+        watermark = watermark.plusMillis(1)
+      }
+      watermark
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala
----------------------------------------------------------------------
diff --git 
a/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala 
b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala
new file mode 100644
index 0000000..84dec70
--- /dev/null
+++ 
b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.redis
+
+import java.nio.charset.Charset
+
+object RedisMessage {
+
+  private def toBytes(strings: List[String]): List[Array[Byte]] =
+    strings.map(string => string.getBytes(Charset.forName("UTF8")))
+
+  private def toBytes(string: String): Array[Byte] =
+    string.getBytes(Charset.forName("UTF8"))
+
+  object Connection {
+
+    /**
+     * Change the selected database for the current connection
+     *
+     * @param index
+     */
+    case class SELECT(index: Int)
+
+  }
+
+  object Geo {
+
+    /**
+     * Add one geospatial item in the geospatial index represented using a 
sorted set
+     *
+     * @param key
+     * @param longitude
+     * @param latitude
+     * @param member
+     */
+    case class GEOADD(key: Array[Byte], longitude: Double,
+                      latitude: Double, member: Array[Byte]) {
+      def this(key: String, longitude: Double,
+               latitude: Double, member: String) =
+        this(toBytes(key), longitude, latitude, toBytes(member))
+    }
+
+  }
+
+  object Hashes {
+
+    /**
+     * Delete a hash field
+     *
+     * @param key
+     * @param field
+     */
+    case class HDEL(key: Array[Byte], field: Array[Byte]) {
+      def this(key: String, field: String) = this(toBytes(key), toBytes(field))
+    }
+
+    /**
+     * Increment the integer value of a hash field by the given number
+     *
+     * @param key
+     * @param field
+     * @param increment
+     */
+    case class HINCRBY(key: Array[Byte], field: Array[Byte], increment: Long) {
+      def this(key: String, field: String, increment: Long) =
+        this(toBytes(key), toBytes(field), increment)
+    }
+
+    /**
+     * Increment the float value of a hash field by the given amount
+     *
+     * @param key
+     * @param field
+     * @param increment
+     */
+    case class HINCRBYFLOAT(key: Array[Byte], field: Array[Byte], increment: 
Float) {
+      def this(key: String, field: String, increment: Float) =
+        this(toBytes(key), toBytes(field), increment)
+    }
+
+
+    /**
+     * Set the string value of a hash field
+     *
+     * @param key
+     * @param field
+     * @param value
+     */
+    case class HSET(key: Array[Byte], field: Array[Byte], value: Array[Byte]) {
+      def this(key: String, field: String, value: String) =
+        this(toBytes(key), toBytes(field), toBytes(value))
+    }
+
+    /**
+     * Set the value of a hash field, only if the field does not exist
+     *
+     * @param key
+     * @param field
+     * @param value
+     */
+    case class HSETNX(key: Array[Byte], field: Array[Byte], value: 
Array[Byte]) {
+      def this(key: String, field: String, value: String) =
+        this(toBytes(key), toBytes(field), toBytes(value))
+    }
+
+  }
+
+  object HyperLogLog {
+
+    /**
+     * Adds the specified elements to the specified HyperLogLog
+     *
+     * @param key
+     * @param element
+     */
+    case class PFADD(key: String, element: String)
+
+  }
+
+  object Lists {
+
+
+    /**
+     * Prepend one or multiple values to a list
+     *
+     * @param key
+     * @param value
+     */
+    case class LPUSH(key: Array[Byte], value: Array[Byte]) {
+
+      def this(key: String, value: String) = this(key, toBytes(value))
+    }
+
+    /**
+     * Prepend a value to a list, only if the list exists
+     *
+     * @param key
+     * @param value
+     */
+    case class LPUSHX(key: Array[Byte], value: Array[Byte]) {
+      def this(key: String, value: String) = this(toBytes(key), toBytes(value))
+    }
+
+    /**
+     * Set the value of an element in a list by its index
+     *
+     * @param key
+     * @param index
+     * @param value
+     */
+    case class LSET(key: Array[Byte], index: Long, value: Array[Byte]) {
+      def this(key: String, index: Long, value: String) = this(toBytes(key), 
index, toBytes(value))
+    }
+
+    /**
+     * Append one or multiple values to a list
+     *
+     * @param key
+     * @param value
+     */
+    case class RPUSH(key: Array[Byte], value: Array[Byte]) {
+
+      def this(key: String, value: String) = this(key, toBytes(value))
+    }
+
+    /**
+     * Append a value to a list, only if the list exists
+     *
+     * @param key
+     * @param value
+     */
+    case class RPUSHX(key: Array[Byte], value: Array[Byte]) {
+      def this(key: String, value: String) = this(toBytes(key), toBytes(value))
+    }
+
+  }
+
+  object Keys {
+
+    /**
+     * Delete a key
+     *
+     * @param message
+     */
+    case class DEL(message: Array[Byte]) {
+
+      def this(message: String) = this(toBytes(message))
+    }
+
+    /**
+     * Set a key's time to live in seconds
+     *
+     * @param key
+     */
+    case class EXPIRE(key: Array[Byte], seconds: Int) {
+      def this(key: String, seconds: Int) = this(toBytes(key), seconds)
+    }
+
+    /**
+     * Set the expiration for a key as a UNIX timestamp
+     *
+     * @param key
+     * @param timestamp
+     */
+    case class EXPIREAT(key: Array[Byte], timestamp: Long) {
+      def this(key: String, timestamp: Long) = this(toBytes(key), timestamp)
+    }
+
+    /**
+     * Atomically transfer a key from a Redis instance to another one.
+     *
+     * @param host
+     * @param port
+     * @param key
+     * @param database
+     * @param timeout
+     */
+    case class MIGRATE(host: Array[Byte], port: Int, key: Array[Byte], 
database: Int, timeout: Int) {
+      def this(host: String, port: Int, key: String, database: Int, timeout: 
Int) =
+        this(toBytes(host), port, toBytes(key), database, timeout)
+    }
+
+    /**
+     * Move a key to another database
+     *
+     * @param key
+     * @param db
+     */
+    case class MOVE(key: Array[Byte], db: Int) {
+      def this(key: String, db: Int) = this(toBytes(key), db)
+    }
+
+    /**
+     * Remove the expiration from a key
+     *
+     * @param key
+     */
+    case class PERSIST(key: Array[Byte]) {
+      def this(key: String) = this(toBytes(key))
+    }
+
+    /**
+     * Set a key's time to live in milliseconds
+     *
+     * @param key
+     * @param milliseconds
+     */
+    case class PEXPIRE(key: Array[Byte], milliseconds: Long) {
+      def this(key: String, milliseconds: Long) = this(toBytes(key), 
milliseconds)
+    }
+
+    /**
+     * Set the expiration for a key as a UNIX timestamp specified in 
milliseconds
+     *
+     * @param key
+     * @param timestamp
+     */
+    case class PEXPIREAT(key: Array[Byte], timestamp: Long) {
+      def this(key: String, milliseconds: Long) = this(toBytes(key), 
milliseconds)
+    }
+
+    /**
+     * Rename a key
+     *
+     * @param key
+     * @param newKey
+     */
+    case class RENAME(key: Array[Byte], newKey: Array[Byte]) {
+      def this(key: String, newKey: String) = this(toBytes(key), 
toBytes(newKey))
+    }
+
+    /**
+     * Rename a key, only if the new key does not exist
+     *
+     * @param key
+     * @param newKey
+     */
+    case class RENAMENX(key: Array[Byte], newKey: Array[Byte]) {
+      def this(key: String, newKey: String) = this(toBytes(key), 
toBytes(newKey))
+    }
+
+  }
+
+
+  object Sets {
+
+    /**
+     * Add one or more members to a set
+     *
+     * @param key
+     * @param members
+     */
+    case class SADD(key: Array[Byte], members: Array[Byte]) {
+
+      def this(key: String, members: String) = this(key, toBytes(members))
+    }
+
+
+    /**
+     * Move a member from one set to another
+     *
+     * @param source
+     * @param destination
+     * @param member
+     */
+    case class SMOVE(source: Array[Byte], destination: Array[Byte], member: 
Array[Byte]) {
+      def this(source: String, destination: String, member: String) =
+        this(toBytes(source), toBytes(destination), toBytes(member))
+    }
+
+
+    /**
+     * Remove one or more members from a set
+     *
+     * @param key
+     * @param member
+     */
+    case class SREM(key: Array[Byte], member: Array[Byte]) {
+
+      def this(key: String, member: String) = this(key, toBytes(member))
+    }
+
+  }
+
+  object String {
+
+    /**
+     * Append a value to a key
+     *
+     * @param key
+     * @param value
+     */
+    case class APPEND(key: Array[Byte], value: Array[Byte]) {
+      def this(key: String, value: String) = this(toBytes(key), toBytes(value))
+    }
+
+    /**
+     * Decrement the integer value of a key by one
+     *
+     * @param key
+     */
+    case class DECR(key: Array[Byte]) {
+      def this(key: String) = this(toBytes(key))
+    }
+
+    /**
+     * Decrement the integer value of a key by the given number
+     *
+     * @param key
+     * @param decrement
+     */
+    case class DECRBY(key: Array[Byte], decrement: Int) {
+      def this(key: String, decrement: Int) = this(toBytes(key), decrement)
+    }
+
+    /**
+     * Increment the integer value of a key by one
+     *
+     * @param key
+     */
+    case class INCR(key: Array[Byte]) {
+      def this(key: String) = this(toBytes(key))
+    }
+
+    /**
+     * Increment the integer value of a key by the given amount
+     *
+     * @param key
+     * @param increment
+     */
+    case class INCRBY(key: Array[Byte], increment: Int) {
+      def this(key: String, increment: Int) = this(toBytes(key), increment)
+    }
+
+    /**
+     * Increment the float value of a key by the given amount
+     *
+     * @param key
+     * @param increment
+     */
+    case class INCRBYFLOAT(key: Array[Byte], increment: Double) {
+      def this(key: String, increment: Number) = this(toBytes(key), increment)
+    }
+
+
+    /**
+     * Set the string value of a key
+     *
+     * @param key
+     * @param value
+     */
+    case class SET(key: Array[Byte], value: Array[Byte]) {
+      def this(key: String, value: String) = this(toBytes(key), toBytes(value))
+    }
+
+    /**
+     * Sets or clears the bit at offset in the string value stored at key
+     *
+     * @param key
+     * @param offset
+     * @param value
+     */
+    case class SETBIT(key: Array[Byte], offset: Long, value: Array[Byte]) {
+      def this(key: String, offset: Long, value: String) = this(toBytes(key), 
offset, toBytes(value))
+    }
+
+    /**
+     * Set the value and expiration of a key
+     *
+     * @param key
+     * @param seconds
+     * @param value
+     */
+    case class SETEX(key: Array[Byte], seconds: Int, value: Array[Byte]) {
+      def this(key: String, seconds: Int, value: String) = this(toBytes(key), 
seconds, toBytes(value))
+    }
+
+    /**
+     * Set the value of a key, only if the key does not exist
+     *
+     * @param key
+     * @param value
+     */
+    case class SETNX(key: Array[Byte], value: Array[Byte]) {
+      def this(key: String, value: String) = this(toBytes(key), toBytes(value))
+    }
+
+    /**
+     * Overwrite part of a string at key starting at the specified offset
+     *
+     * @param key
+     * @param offset
+     * @param value
+     */
+    case class SETRANGE(key: Array[Byte], offset: Int, value: Array[Byte]) {
+      def this(key: String, offset: Int, value: String) = this(toBytes(key), 
offset, toBytes(value))
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala
----------------------------------------------------------------------
diff --git 
a/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala 
b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala
new file mode 100644
index 0000000..3f75949
--- /dev/null
+++ b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.redis
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.redis.RedisMessage.Geo.GEOADD
+import org.apache.gearpump.redis.RedisMessage.Hashes._
+import org.apache.gearpump.redis.RedisMessage.HyperLogLog._
+import org.apache.gearpump.redis.RedisMessage.Keys._
+import org.apache.gearpump.redis.RedisMessage.Lists._
+import org.apache.gearpump.redis.RedisMessage.Sets._
+import org.apache.gearpump.redis.RedisMessage.String._
+import org.apache.gearpump.streaming.sink.DataSink
+import org.apache.gearpump.streaming.task.TaskContext
+import org.apache.gearpump.util.LogUtil
+import redis.clients.jedis.Jedis
+import redis.clients.jedis.Protocol.{DEFAULT_DATABASE, DEFAULT_HOST, 
DEFAULT_PORT, DEFAULT_TIMEOUT}
+
+/**
+  * Save message in Redis Instance
+  *
+  * @param host
+  * @param port
+  * @param timeout
+  * @param database
+  * @param password
+  */
+class RedisSink(
+                    host: String = DEFAULT_HOST,
+                    port: Int = DEFAULT_PORT,
+                    timeout: Int = DEFAULT_TIMEOUT,
+                    database: Int = DEFAULT_DATABASE,
+                    password: String = "") extends DataSink {
+
+  private val LOG = LogUtil.getLogger(getClass)
+  @transient private lazy val client = new Jedis(host, port, timeout)
+
+  override def open(context: TaskContext): Unit = {
+    client.select(database)
+
+    if (password != null && password.length != 0) {
+      client.auth(password)
+    }
+  }
+
+  override def write(message: Message): Unit = {
+
+    message.msg match {
+      // GEO
+      case msg: GEOADD => client.geoadd(msg.key, msg.longitude, msg.latitude, 
msg.member)
+
+      // Hashes
+      case msg: HDEL => client.hdel(msg.key, msg.field)
+      case msg: HINCRBY => client.hincrBy(msg.key, msg.field, msg.increment)
+      case msg: HINCRBYFLOAT => client.hincrByFloat(msg.key, msg.field, 
msg.increment)
+      case msg: HSET => client.hset(msg.key, msg.field, msg.value)
+      case msg: HSETNX => client.hsetnx(msg.key, msg.field, msg.value)
+
+      // HyperLogLog
+      case msg: PFADD => client.pfadd(msg.key, msg.element)
+
+      // Lists
+      case msg: LPUSH => client.lpush(msg.key, msg.value)
+      case msg: LPUSHX => client.lpushx(msg.key, msg.value)
+      case msg: LSET => client.lset(msg.key, msg.index, msg.value)
+      case msg: RPUSH => client.rpush(msg.key, msg.value)
+      case msg: RPUSHX => client.rpushx(msg.key, msg.value)
+
+      // Keys
+      case msg: DEL => client.del(msg.message)
+      case msg: EXPIRE => client.expire(msg.key, msg.seconds)
+      case msg: EXPIREAT => client.expireAt(msg.key, msg.timestamp)
+      case msg: MIGRATE => client.migrate(msg.host, msg.port, msg.key, 
msg.database, msg.timeout)
+      case msg: MOVE => client.move(msg.key, msg.db)
+      case msg: PERSIST => client.persist(msg.key)
+      case msg: PEXPIRE => client.pexpire(msg.key, msg.milliseconds)
+      case msg: PEXPIREAT => client.pexpireAt(msg.key, msg.timestamp)
+      case msg: RENAME => client.rename(msg.key, msg.newKey)
+      case msg: RENAMENX => client.renamenx(msg.key, msg.newKey)
+
+      // Sets
+      case msg: SADD => client.sadd(msg.key, msg.members)
+      case msg: SMOVE => client.smove(msg.source, msg.destination, msg.member)
+      case msg: SREM => client.srem(msg.key, msg.member)
+
+      // String
+      case msg: APPEND => client.append(msg.key, msg.value)
+      case msg: DECR => client.decr(msg.key)
+      case msg: DECRBY => client.decrBy(msg.key, msg.decrement)
+      case msg: INCR => client.incr(msg.key)
+      case msg: INCRBY => client.incrBy(msg.key, msg.increment)
+      case msg: INCRBYFLOAT => client.incrByFloat(msg.key, msg.increment)
+      case msg: SET => client.set(msg.key, msg.value)
+      case msg: SETBIT => client.setbit(msg.key, msg.offset, msg.value)
+      case msg: SETEX => client.setex(msg.key, msg.seconds, msg.value)
+      case msg: SETNX => client.setnx(msg.key, msg.value)
+      case msg: SETRANGE => client.setrange(msg.key, msg.offset, msg.value)
+    }
+  }
+
+  override def close(): Unit = {
+    client.close()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/project/Build.scala
----------------------------------------------------------------------
diff --git a/project/Build.scala b/project/Build.scala
index 0b1628e..f1e0443 100644
--- a/project/Build.scala
+++ b/project/Build.scala
@@ -154,12 +154,6 @@ object Build extends sbt.Build {
     dependencyOverrides += "com.typesafe.akka" %% "akka-stream" % 
akkaStreamVersion
   )
 
-  val streamingDependencies = Seq(
-    unmanagedJars in Compile ++= Seq(
-      getShadedJarFile("gs-collections", version.value)
-    )
-  )
-
   val coreDependencies = Seq(
     libraryDependencies ++= Seq(
       "org.slf4j" % "slf4j-api" % slf4jVersion,
@@ -199,9 +193,9 @@ object Build extends sbt.Build {
     ),
 
     unmanagedJars in Compile ++= Seq(
-      getShadedJarFile("metrics-graphite", version.value),
-      getShadedJarFile("guava", version.value),
-      getShadedJarFile("akka-kryo", version.value)
+      getShadedJarFile(shaded_metrics_graphite.id, version.value),
+      getShadedJarFile(shaded_guava.id, version.value),
+      getShadedJarFile(shaded_akka_kryo.id, version.value)
     )
   )
 
@@ -250,6 +244,20 @@ object Build extends sbt.Build {
       .map(_.filterNot(_.getCanonicalPath.contains("akka")))
   }
 
+  private def addShadedDeps(deps: Seq[xml.Node], node: xml.Node): xml.Node = {
+    node match {
+      case elem: xml.Elem =>
+        val child = if (elem.label == "dependencies") {
+          elem.child ++ deps
+        } else {
+          elem.child.map(addShadedDeps(deps, _))
+        }
+        xml.Elem(elem.prefix, elem.label, elem.attributes, elem.scope, false, 
child: _*)
+      case _ =>
+        node
+    }
+  }
+
   lazy val root = Project(
     id = "gearpump",
     base = file("."),
@@ -262,7 +270,14 @@ object Build extends sbt.Build {
   lazy val core = Project(
     id = "gearpump-core",
     base = file("core"),
-    settings = commonSettings ++ javadocSettings ++ coreDependencies)
+    settings = commonSettings ++ javadocSettings ++ coreDependencies ++ Seq(
+      pomPostProcess := {
+        (node: xml.Node) => addShadedDeps(List(
+          getShadedDepXML(organization.value, shaded_akka_kryo.id, 
version.value),
+          getShadedDepXML(organization.value, shaded_guava.id, version.value),
+          getShadedDepXML(organization.value, shaded_metrics_graphite.id, 
version.value)), node)
+      }
+    ))
       .disablePlugins(sbtassembly.AssemblyPlugin)
 
   lazy val daemon = Project(
@@ -282,9 +297,18 @@ object Build extends sbt.Build {
   lazy val streaming = Project(
     id = "gearpump-streaming",
     base = file("streaming"),
-    settings = commonSettings ++ javadocSettings ++ streamingDependencies)
-      .dependsOn(core % "test->test; compile->compile", daemon % "test->test")
-      .disablePlugins(sbtassembly.AssemblyPlugin)
+    settings = commonSettings ++ javadocSettings ++ Seq(
+      unmanagedJars in Compile ++= Seq(
+        getShadedJarFile(shaded_gs_collections.id, version.value)
+      ),
+
+      pomPostProcess := {
+        (node: xml.Node) => addShadedDeps(List(
+          getShadedDepXML(organization.value, shaded_gs_collections.id, 
version.value)), node)
+      }
+    ))
+    .dependsOn(core % "test->test; compile->compile", shaded_gs_collections, 
daemon % "test->test")
+    .disablePlugins(sbtassembly.AssemblyPlugin)
 
   lazy val external_kafka = Project(
     id = "gearpump-external-kafka",
@@ -402,6 +426,18 @@ object Build extends sbt.Build {
       .dependsOn (services % "test->test; compile->compile", daemon % 
"test->test; compile->compile")
       .disablePlugins(sbtassembly.AssemblyPlugin)
 
+  lazy val redis = Project(
+    id = "gearpump-experiments-redis",
+    base = file("experiments/redis"),
+    settings = commonSettings ++ noPublish ++ myAssemblySettings ++
+      Seq(
+        libraryDependencies ++= Seq(
+          "redis.clients" % "jedis" % "2.9.0"
+        ),
+        mainClass in(Compile, packageBin) := 
Some("org.apache.gearpump.example.Test")
+      ))
+    .dependsOn(streaming % "test->test; provided", daemon % "test->test; 
provided")
+
   lazy val storm = Project(
     id = "gearpump-experiments-storm",
     base = file("experiments/storm"),

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/project/BuildShaded.scala
----------------------------------------------------------------------
diff --git a/project/BuildShaded.scala b/project/BuildShaded.scala
index 1f59bfd..a43587c 100644
--- a/project/BuildShaded.scala
+++ b/project/BuildShaded.scala
@@ -35,7 +35,7 @@ object BuildShaded extends sbt.Build {
       _.copy(includeScala = false)
     },
     assemblyJarName in assembly := {
-      s"${name.value}-$scalaVersionMajor-${version.value}-assembly.jar"
+      s"${name.value}_$scalaVersionMajor-${version.value}.jar"
     },
     target in assembly := baseDirectory.value.getParentFile / "target" / 
scalaVersionMajor
   )
@@ -44,92 +44,99 @@ object BuildShaded extends sbt.Build {
     id = "gearpump-shaded",
     base = file("shaded")
   ).aggregate(shaded_akka_kryo, shaded_gs_collections, shaded_guava, 
shaded_metrics_graphite)
-      .disablePlugins(sbtassembly.AssemblyPlugin)
-
+    .disablePlugins(sbtassembly.AssemblyPlugin)
 
   lazy val shaded_akka_kryo = Project(
     id = "gearpump-shaded-akka-kryo",
     base = file("shaded/akka-kryo"),
-    settings = shadeAssemblySettings ++ 
addArtifact(Artifact("gearpump-shaded-akka-kryo",
-      "assembly"), sbtassembly.AssemblyKeys.assembly) ++
-        Seq(
-          assemblyShadeRules in assembly := Seq(
-            ShadeRule.zap("com.google.protobuf.**").inAll,
-            ShadeRule.zap("com.typesafe.config.**").inAll,
-            ShadeRule.zap("akka.**").inAll,
-            ShadeRule.zap("org.jboss.netty.**").inAll,
-            ShadeRule.zap("net.jpountz.lz4.**").inAll,
-            ShadeRule.zap("org.uncommons.maths.**").inAll,
-            ShadeRule.rename("com.romix.**" -> 
"org.apache.gearpump.romix.@1").inAll,
-            ShadeRule.rename("com.esotericsoftware.**" ->
-                "org.apache.gearpump.esotericsoftware.@1").inAll,
-            ShadeRule.rename("org.objenesis.**" -> 
"org.apache.gearpump.objenesis.@1").inAll
-          )
-        ) ++
-        Seq(
-          libraryDependencies ++= Seq(
-            "com.github.romix.akka" %% "akka-kryo-serialization" % kryoVersion
-          )
+    settings = shadeAssemblySettings ++ 
addArtifact(Artifact("gearpump-shaded-akka-kryo"),
+      sbtassembly.AssemblyKeys.assembly) ++
+      Seq(
+        assemblyShadeRules in assembly := Seq(
+          ShadeRule.zap("com.google.protobuf.**").inAll,
+          ShadeRule.zap("com.typesafe.config.**").inAll,
+          ShadeRule.zap("akka.**").inAll,
+          ShadeRule.zap("org.jboss.netty.**").inAll,
+          ShadeRule.zap("net.jpountz.lz4.**").inAll,
+          ShadeRule.zap("org.uncommons.maths.**").inAll,
+          ShadeRule.rename("com.romix.**" -> 
"org.apache.gearpump.romix.@1").inAll,
+          ShadeRule.rename("com.esotericsoftware.**" ->
+            "org.apache.gearpump.esotericsoftware.@1").inAll,
+          ShadeRule.rename("org.objenesis.**" -> 
"org.apache.gearpump.objenesis.@1").inAll
+        )
+      ) ++
+      Seq(
+        libraryDependencies ++= Seq(
+          "com.github.romix.akka" %% "akka-kryo-serialization" % kryoVersion
         )
+      )
   )
 
   lazy val shaded_gs_collections = Project(
     id = "gearpump-shaded-gs-collections",
     base = file("shaded/gs-collections"),
-    settings = shadeAssemblySettings ++ 
addArtifact(Artifact("gearpump-shaded-gs-collections",
-      "assembly"), sbtassembly.AssemblyKeys.assembly) ++
-        Seq(
-          assemblyShadeRules in assembly := Seq(
-            ShadeRule.rename("com.gs.collections.**" ->
-                "org.apache.gearpump.gs.collections.@1").inAll
-          )
-        ) ++
-        Seq(
-          libraryDependencies ++= Seq(
-            "com.goldmansachs" % "gs-collections" % gsCollectionsVersion
-          )
+    settings = shadeAssemblySettings ++ 
addArtifact(Artifact("gearpump-shaded-gs-collections"),
+      sbtassembly.AssemblyKeys.assembly) ++
+      Seq(
+        assemblyShadeRules in assembly := Seq(
+          ShadeRule.rename("com.gs.collections.**" ->
+            "org.apache.gearpump.gs.collections.@1").inAll
         )
+      ) ++
+      Seq(
+        libraryDependencies ++= Seq(
+          "com.goldmansachs" % "gs-collections" % gsCollectionsVersion
+        )
+      )
   )
 
   lazy val shaded_guava = Project(
     id = "gearpump-shaded-guava",
     base = file("shaded/guava"),
-    settings = shadeAssemblySettings ++ 
addArtifact(Artifact("gearpump-shaded-guava",
-      "assembly"), sbtassembly.AssemblyKeys.assembly) ++
-        Seq(
-          assemblyShadeRules in assembly := Seq(
-            ShadeRule.rename("com.google.**" -> 
"org.apache.gearpump.google.@1").inAll
-          )
-        ) ++
-        Seq(
-          libraryDependencies ++= Seq(
-            "com.google.guava" % "guava" % guavaVersion
-          )
+    settings = shadeAssemblySettings ++ 
addArtifact(Artifact("gearpump-shaded-guava"),
+      sbtassembly.AssemblyKeys.assembly) ++
+      Seq(
+        assemblyShadeRules in assembly := Seq(
+          ShadeRule.rename("com.google.**" -> 
"org.apache.gearpump.google.@1").inAll
+        )
+      ) ++
+      Seq(
+        libraryDependencies ++= Seq(
+          "com.google.guava" % "guava" % guavaVersion
         )
+      )
   )
 
   lazy val shaded_metrics_graphite = Project(
     id = "gearpump-shaded-metrics-graphite",
     base = file("shaded/metrics-graphite"),
-    settings = shadeAssemblySettings ++ 
addArtifact(Artifact("gearpump-shaded-metrics-graphite",
-      "assembly"), sbtassembly.AssemblyKeys.assembly) ++
-        Seq(
-          assemblyShadeRules in assembly := Seq(
-            ShadeRule.rename("com.codahale.metrics.**" ->
-                "org.apache.gearpump.codahale.metrics.@1").inAll
-          )
-        ) ++
-        Seq(
-          libraryDependencies ++= Seq(
-            "com.codahale.metrics" % "metrics-graphite" % codahaleVersion,
-            "com.codahale.metrics" % "metrics-jvm" % codahaleVersion
-          )
+    settings = shadeAssemblySettings ++ 
addArtifact(Artifact("gearpump-shaded-metrics-graphite"),
+      sbtassembly.AssemblyKeys.assembly) ++
+      Seq(
+        assemblyShadeRules in assembly := Seq(
+          ShadeRule.rename("com.codahale.metrics.**" ->
+            "org.apache.gearpump.codahale.metrics.@1").inAll
         )
+      ) ++
+      Seq(
+        libraryDependencies ++= Seq(
+          "com.codahale.metrics" % "metrics-graphite" % codahaleVersion,
+          "com.codahale.metrics" % "metrics-jvm" % codahaleVersion
+        )
+      )
   )
 
   def getShadedJarFile(name: String, gearpumpVersion: String): File = {
     shaded.base / "target" / scalaVersionMajor /
-      s"gearpump-shaded-$name-$scalaVersionMajor-$gearpumpVersion-assembly.jar"
+      s"${name}_$scalaVersionMajor-$gearpumpVersion.jar"
+  }
+
+  def getShadedDepXML(groupId: String, artifactId: String, version: String): 
scala.xml.Node = {
+    <dependency>
+      <groupId>{groupId}</groupId>
+      <artifactId>{artifactId}</artifactId>
+      <version>{version}</version>
+    </dependency>
   }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala
index cd33b50..f99a436 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala
@@ -22,6 +22,7 @@ object Constants {
   val GEARPUMP_STREAMING_OPERATOR = "gearpump.streaming.dsl.operator"
   val GEARPUMP_STREAMING_SOURCE = "gearpump.streaming.source"
   val GEARPUMP_STREAMING_GROUPBY_FUNCTION = 
"gearpump.streaming.dsl.groupby-function"
+  val GEARPUMP_STREAMING_WINDOW_FUNCTION = 
"gearpump.streaming.dsl.window-function"
 
   val GEARPUMP_STREAMING_LOCALITIES = "gearpump.streaming.localities"
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
index 66ec873..a6588a1 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
@@ -123,7 +123,7 @@ object LifeTime {
  */
 class StreamApplication(
     override val name: String, val inputUserConfig: UserConfig,
-    val dag: Graph[ProcessorDescription, PartitionerDescription])
+    dag: Graph[ProcessorDescription, PartitionerDescription])
   extends Application {
 
   require(!dag.hasDuplicatedEdge(), "Graph should not have duplicated edges")

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala
index 786d496..440a45e 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala
@@ -20,7 +20,10 @@ package org.apache.gearpump.streaming.dsl
 
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.dsl.op._
+import org.apache.gearpump.streaming.dsl.plan._
+import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapFunction, 
ReduceFunction}
+import org.apache.gearpump.streaming.dsl.window.api._
+import org.apache.gearpump.streaming.dsl.window.impl._
 import org.apache.gearpump.streaming.sink.DataSink
 import org.apache.gearpump.streaming.task.{Task, TaskContext}
 import org.apache.gearpump.util.Graph
@@ -35,12 +38,12 @@ class Stream[T](
   /**
    * converts a value[T] to a list of value[R]
    *
-   * @param fun FlatMap function
+   * @param fn FlatMap function
    * @param description The description message for this operation
    * @return A new stream with type [R]
    */
-  def flatMap[R](fun: T => TraversableOnce[R], description: String = null): 
Stream[R] = {
-    val flatMapOp = FlatMapOp(fun, Option(description).getOrElse("flatmap"))
+  def flatMap[R](fn: T => TraversableOnce[R], description: String = 
"flatMap"): Stream[R] = {
+    val flatMapOp = ChainableOp(new FlatMapFunction[T, R](fn, description))
     graph.addVertex(flatMapOp)
     graph.addEdge(thisNode, edge.getOrElse(Direct), flatMapOp)
     new Stream[R](graph, flatMapOp)
@@ -49,36 +52,36 @@ class Stream[T](
   /**
    * Maps message of type T message of type R
    *
-   * @param fun Function
+   * @param fn Function
    * @return A new stream with type [R]
    */
-  def map[R](fun: T => R, description: String = null): Stream[R] = {
+  def map[R](fn: T => R, description: String = "map"): Stream[R] = {
     this.flatMap({ data =>
-      Option(fun(data))
-    }, Option(description).getOrElse("map"))
+      Option(fn(data))
+    }, description)
   }
 
   /**
    * Keeps records when fun(T) == true
    *
-   * @param fun  the filter
+   * @param fn  the filter
    * @return  a new stream after filter
    */
-  def filter(fun: T => Boolean, description: String = null): Stream[T] = {
+  def filter(fn: T => Boolean, description: String = "filter"): Stream[T] = {
     this.flatMap({ data =>
-      if (fun(data)) Option(data) else None
-    }, Option(description).getOrElse("filter"))
+      if (fn(data)) Option(data) else None
+    }, description)
   }
 
   /**
    * Reduces operations.
    *
-   * @param fun  reduction function
+   * @param fn  reduction function
    * @param description description message for this operator
    * @return a new stream after reduction
    */
-  def reduce(fun: (T, T) => T, description: String = null): Stream[T] = {
-    val reduceOp = ReduceOp(fun, Option(description).getOrElse("reduce"))
+  def reduce(fn: (T, T) => T, description: String = "reduce"): Stream[T] = {
+    val reduceOp = ChainableOp(new ReduceFunction(fn, description))
     graph.addVertex(reduceOp)
     graph.addEdge(thisNode, edge.getOrElse(Direct), reduceOp)
     new Stream(graph, reduceOp)
@@ -88,7 +91,10 @@ class Stream[T](
    * Log to task log file
    */
   def log(): Unit = {
-    this.map(msg => LoggerFactory.getLogger("dsl").info(msg.toString), "log")
+    this.map(msg => {
+      LoggerFactory.getLogger("dsl").info(msg.toString)
+      msg
+    }, "log")
   }
 
   /**
@@ -97,8 +103,8 @@ class Stream[T](
    * @param other the other stream
    * @return  the merged stream
    */
-  def merge(other: Stream[T], description: String = null): Stream[T] = {
-    val mergeOp = MergeOp(Option(description).getOrElse("merge"))
+  def merge(other: Stream[T], description: String = "merge"): Stream[T] = {
+    val mergeOp = MergeOp(description, UserConfig.empty)
     graph.addVertex(mergeOp)
     graph.addEdge(thisNode, edge.getOrElse(Direct), mergeOp)
     graph.addEdge(other.thisNode, other.edge.getOrElse(Shuffle), mergeOp)
@@ -115,20 +121,29 @@ class Stream[T](
    *
    * For example,
    * {{{
-   * Stream[People].groupBy(_.gender).flatmap(..).filter.(..).reduce(..)
+   * Stream[People].groupBy(_.gender).flatMap(..).filter(..).reduce(..)
    * }}}
    *
-   * @param fun  Group by function
+   * @param fn  Group by function
    * @param parallelism  Parallelism level
    * @param description  The description
    * @return  the grouped stream
    */
-  def groupBy[Group](fun: T => Group, parallelism: Int = 1, description: 
String = null)
-    : Stream[T] = {
-    val groupOp = GroupByOp(fun, parallelism, 
Option(description).getOrElse("groupBy"))
-    graph.addVertex(groupOp)
-    graph.addEdge(thisNode, edge.getOrElse(Shuffle), groupOp)
-    new Stream[T](graph, groupOp)
+  def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1,
+      description: String = "groupBy"): Stream[T] = {
+    window(CountWindow.apply(1).accumulating)
+      .groupBy[GROUP](fn, parallelism, description)
+  }
+
+  /**
+   * Window function
+   *
+   * @param win window definition
+   * @param description window description
+   * @return [[WindowStream]] where groupBy could be applied
+   */
+  def window(win: Window, description: String = "window"): WindowStream[T] = {
+    new WindowStream[T](graph, edge, thisNode, win, description)
   }
 
   /**
@@ -140,15 +155,28 @@ class Stream[T](
    */
   def process[R](
       processor: Class[_ <: Task], parallelism: Int, conf: UserConfig = 
UserConfig.empty,
-      description: String = null): Stream[R] = {
-    val processorOp = ProcessorOp(processor, parallelism, conf,
-      Option(description).getOrElse("process"))
+      description: String = "process"): Stream[R] = {
+    val processorOp = ProcessorOp(processor, parallelism, conf, description)
     graph.addVertex(processorOp)
     graph.addEdge(thisNode, edge.getOrElse(Shuffle), processorOp)
     new Stream[R](graph, processorOp, Some(Shuffle))
   }
 }
 
+class WindowStream[T](graph: Graph[Op, OpEdge], edge: Option[OpEdge], 
thisNode: Op,
+    window: Window, winDesc: String) {
+
+  def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1,
+      description: String = "groupBy"): Stream[T] = {
+    val groupBy: GroupByFn[T, (GROUP, List[Bucket])] = GroupAlsoByWindow(fn, 
window)
+    val groupOp = GroupByOp[T, (GROUP, List[Bucket])](groupBy, parallelism,
+      s"$winDesc.$description")
+    graph.addVertex(groupOp)
+    graph.addEdge(thisNode, edge.getOrElse(Shuffle), groupOp)
+    new Stream[T](graph, groupOp)
+  }
+}
+
 class KVStream[K, V](stream: Stream[Tuple2[K, V]]) {
   /**
    * GroupBy key
@@ -192,30 +220,18 @@ object Stream {
   }
 
   implicit class Sink[T](stream: Stream[T]) extends java.io.Serializable {
-    def sink[T](dataSink: DataSink, parallism: Int, conf: UserConfig, 
description: String)
-      : Stream[T] = {
-      implicit val sink = DataSinkOp[T](dataSink, parallism, conf,
-        Some(description).getOrElse("traversable"))
+    def sink(dataSink: DataSink, parallelism: Int = 1,
+        conf: UserConfig = UserConfig.empty, description: String = "sink"): 
Stream[T] = {
+      implicit val sink = DataSinkOp(dataSink, parallelism, conf, description)
       stream.graph.addVertex(sink)
       stream.graph.addEdge(stream.thisNode, Shuffle, sink)
       new Stream[T](stream.graph, sink)
     }
-
-    def sink[T](
-        sink: Class[_ <: Task], parallism: Int, conf: UserConfig = 
UserConfig.empty,
-        description: String = null): Stream[T] = {
-      val sinkOp = ProcessorOp(sink, parallism, conf, 
Option(description).getOrElse("source"))
-      stream.graph.addVertex(sinkOp)
-      stream.graph.addEdge(stream.thisNode, Shuffle, sinkOp)
-      new Stream[T](stream.graph, sinkOp)
-    }
   }
 }
 
 class LoggerSink[T] extends DataSink {
-  var logger: Logger = null
-
-  private var context: TaskContext = null
+  var logger: Logger = _
 
   override def open(context: TaskContext): Unit = {
     this.logger = context.logger

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala
index d45737b..8116146 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala
@@ -24,10 +24,9 @@ import akka.actor.ActorSystem
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.cluster.client.ClientContext
 import org.apache.gearpump.streaming.StreamApplication
-import org.apache.gearpump.streaming.dsl.op.{DataSourceOp, Op, OpEdge, 
ProcessorOp}
-import org.apache.gearpump.streaming.dsl.plan.Planner
+import org.apache.gearpump.streaming.dsl.plan._
 import org.apache.gearpump.streaming.source.DataSource
-import org.apache.gearpump.streaming.task.{Task, TaskContext}
+import org.apache.gearpump.streaming.task.TaskContext
 import org.apache.gearpump.util.Graph
 import org.apache.gearpump.Message
 
@@ -50,7 +49,8 @@ import scala.language.implicitConversions
  * @param name name of app
  */
 class StreamApp(
-    val name: String, system: ActorSystem, userConfig: UserConfig, val graph: 
Graph[Op, OpEdge]) {
+    name: String, system: ActorSystem, userConfig: UserConfig,
+    private val graph: Graph[Op, OpEdge]) {
 
   def this(name: String, system: ActorSystem, userConfig: UserConfig) = {
     this(name, system, userConfig, Graph.empty[Op, OpEdge])
@@ -76,34 +76,16 @@ object StreamApp {
 
   implicit class Source(app: StreamApp) extends java.io.Serializable {
 
-    def source[T](dataSource: DataSource, parallelism: Int): Stream[T] = {
-      source(dataSource, parallelism, UserConfig.empty)
-    }
-
-    def source[T](dataSource: DataSource, parallelism: Int, description: 
String): Stream[T] = {
-      source(dataSource, parallelism, UserConfig.empty, description)
-    }
-
-    def source[T](dataSource: DataSource, parallelism: Int, conf: UserConfig): 
Stream[T] = {
-      source(dataSource, parallelism, conf, description = null)
-    }
-
-    def source[T](dataSource: DataSource, parallelism: Int, conf: UserConfig, 
description: String)
-      : Stream[T] = {
+    def source[T](dataSource: DataSource, parallelism: Int = 1,
+        conf: UserConfig = UserConfig.empty, description: String = "source"): 
Stream[T] = {
       implicit val sourceOp = DataSourceOp(dataSource, parallelism, conf, 
description)
       app.graph.addVertex(sourceOp)
       new Stream[T](app.graph, sourceOp)
     }
+
     def source[T](seq: Seq[T], parallelism: Int, description: String): 
Stream[T] = {
       this.source(new CollectionDataSource[T](seq), parallelism, 
UserConfig.empty, description)
     }
-
-    def source[T](source: Class[_ <: Task], parallelism: Int, conf: 
UserConfig, description: String)
-      : Stream[T] = {
-      val sourceOp = ProcessorOp(source, parallelism, conf, 
Option(description).getOrElse("source"))
-      app.graph.addVertex(sourceOp)
-      new Stream[T](app.graph, sourceOp)
-    }
   }
 }
 
@@ -115,7 +97,7 @@ class CollectionDataSource[T](seq: Seq[T]) extends 
DataSource {
 
   override def read(): Message = {
     if (iterator.hasNext) {
-      Message(iterator.next())
+      Message(iterator.next(), Instant.now().toEpochMilli)
     } else {
       null
     }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
index 6eff20c..3003b98 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
@@ -19,9 +19,9 @@
 package org.apache.gearpump.streaming.dsl.javaapi
 
 import scala.collection.JavaConverters._
-
 import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.dsl.Stream
+import org.apache.gearpump.streaming.dsl.window.api.Window
+import org.apache.gearpump.streaming.dsl.{Stream, WindowStream}
 import org.apache.gearpump.streaming.javaapi.dsl.functions._
 import org.apache.gearpump.streaming.task.Task
 
@@ -63,9 +63,13 @@ class JavaStream[T](val stream: Stream[T]) {
    * Group by a stream and turns it to a list of sub-streams. Operations 
chained after
    * groupBy applies to sub-streams.
    */
-  def groupBy[Group](fn: GroupByFunction[T, Group], parallelism: Int, 
description: String)
-    : JavaStream[T] = {
-    new JavaStream[T](stream.groupBy({t: T => fn(t)}, parallelism, 
description))
+  def groupBy[GROUP](fn: GroupByFunction[T, GROUP],
+      parallelism: Int, description: String): JavaStream[T] = {
+    new JavaStream[T](stream.groupBy((t: T) => fn, parallelism, description))
+  }
+
+  def window(win: Window, description: String): JavaWindowStream[T] = {
+    new JavaWindowStream[T](stream.window(win, description))
   }
 
   /** Add a low level Processor to process messages */
@@ -75,3 +79,11 @@ class JavaStream[T](val stream: Stream[T]) {
     new JavaStream[R](stream.process(processor, parallelism, conf, 
description))
   }
 }
+
+class JavaWindowStream[T](stream: WindowStream[T]) {
+
+  def groupBy[GROUP](fn: GroupByFunction[T, GROUP], parallelism: Int,
+      description: String): JavaStream[T] = {
+    new JavaStream[T](stream.groupBy((t: T) => fn, parallelism, description))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/op/OP.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/op/OP.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/op/OP.scala
deleted file mode 100644
index 49d9dec..0000000
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/op/OP.scala
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.dsl.op
-
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.sink.DataSink
-import org.apache.gearpump.streaming.source.DataSource
-import org.apache.gearpump.streaming.task.Task
-
-/**
- * Operators for the DSL
- */
-sealed trait Op {
-  def description: String
-  def conf: UserConfig
-}
-
-/**
- * When translated to running DAG, SlaveOP can be attach to MasterOP or other 
SlaveOP
- * "Attach" means running in same Actor.
- */
-trait SlaveOp[T] extends Op
-
-case class FlatMapOp[T, R](
-    fun: (T) => TraversableOnce[R], description: String, conf: UserConfig = 
UserConfig.empty)
-  extends SlaveOp[T]
-
-case class ReduceOp[T](fun: (T, T) => T, description: String, conf: UserConfig 
= UserConfig.empty)
-  extends SlaveOp[T]
-
-trait MasterOp extends Op
-
-trait ParameterizedOp[T] extends MasterOp
-
-case class MergeOp(description: String, override val conf: UserConfig = 
UserConfig.empty)
-  extends MasterOp
-
-case class GroupByOp[T, R](
-    fun: T => R, parallelism: Int, description: String,
-    override val conf: UserConfig = UserConfig.empty)
-  extends ParameterizedOp[T]
-
-case class ProcessorOp[T <: Task](
-    processor: Class[T], parallelism: Int, conf: UserConfig, description: 
String)
-  extends ParameterizedOp[T]
-
-case class DataSourceOp[T](
-    dataSource: DataSource, parallelism: Int, conf: UserConfig, description: 
String)
-  extends ParameterizedOp[T]
-
-case class DataSinkOp[T](
-    dataSink: DataSink, parallelism: Int, conf: UserConfig, description: 
String)
-  extends ParameterizedOp[T]
-
-/**
- * Contains operators which can be chained to single one.
- *
- * For example, flatmap().map().reduce() can be chained to single operator as
- * no data shuffling is required.
- * @param ops list of operations
- */
-case class OpChain(ops: List[Op]) extends Op {
-  def head: Op = ops.head
-  def last: Op = ops.last
-
-  def description: String = null
-
-  override def conf: UserConfig = {
-    // The head's conf has priority
-    ops.reverse.foldLeft(UserConfig.empty) { (conf, op) =>
-      conf.withConfig(op.conf)
-    }
-  }
-}
-
-trait OpEdge
-
-/**
- * The upstream OP and downstream OP doesn't require network data shuffle.
- *
- * For example, map, flatmap operation doesn't require network shuffle, we can 
use Direct
- * to represent the relation with upstream operators.
- */
-case object Direct extends OpEdge
-
-/**
- * The upstream OP and downstream OP DOES require network data shuffle.
- *
- * For example, map, flatmap operation doesn't require network shuffle, we can 
use Direct
- * to represent the relation with upstream operators.
- */
-case object Shuffle extends OpEdge
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala
new file mode 100644
index 0000000..2ec881b
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.dsl.partitioner
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.partitioner.UnicastPartitioner
+import org.apache.gearpump.streaming.dsl.window.api.GroupByFn
+
+/**
+ * Partition messages by applying group by function first.
+ *
+ * For example:
+ * {{{
+ * case class People(name: String, gender: String)
+ *
+ * object Test{
+ *
+ *   val groupBy: (People => String) = people => people.gender
+ *   val partitioner = GroupByPartitioner(groupBy)
+ * }
+ * }}}
+ *
+ * @param fn First apply message with groupBy function, then pick the hashCode 
of the output
+ *   to do the partitioning. You must define hashCode() for output type of 
groupBy function.
+ */
+class GroupByPartitioner[T, Group](fn: GroupByFn[T, Group])
+  extends UnicastPartitioner {
+  override def getPartition(message: Message, partitionNum: Int, 
currentPartitionId: Int): Int = {
+    val hashCode = fn.groupBy(message).hashCode()
+    (hashCode & Integer.MAX_VALUE) % partitionNum
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala
deleted file mode 100644
index b2e2932..0000000
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.dsl.partitioner
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.partitioner.UnicastPartitioner
-
-/**
- * Partition messages by applying group by function first.
- *
- * For example:
- * {{{
- * case class People(name: String, gender: String)
- *
- * object Test{
- *
- *   val groupBy: (People => String) = people => people.gender
- *   val partitioner = GroupByPartitioner(groupBy)
- * }
- * }}}
- *
- * @param groupBy First apply message with groupBy function, then pick the 
hashCode of the output
- *   to do the partitioning. You must define hashCode() for output type of 
groupBy function.
- */
-class GroupByPartitioner[T, GROUP](groupBy: T => GROUP = null) extends 
UnicastPartitioner {
-  override def getPartition(msg: Message, partitionNum: Int, 
currentPartitionId: Int): Int = {
-    val hashCode = groupBy(msg.msg.asInstanceOf[T]).hashCode()
-    (hashCode & Integer.MAX_VALUE) % partitionNum
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
new file mode 100644
index 0000000..744976b
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.dsl.plan
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.Constants._
+import org.apache.gearpump.streaming.Processor.DefaultProcessor
+import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction
+import org.apache.gearpump.streaming.{Constants, Processor}
+import org.apache.gearpump.streaming.dsl.task.TransformTask
+import org.apache.gearpump.streaming.dsl.window.api.GroupByFn
+import org.apache.gearpump.streaming.sink.{DataSink, DataSinkProcessor}
+import org.apache.gearpump.streaming.source.{DataSource, DataSourceTask}
+import org.apache.gearpump.streaming.task.Task
+
+import scala.reflect.ClassTag
+
+/**
+ * This is a vertex on the logical plan.
+ */
+sealed trait Op {
+
+  def description: String
+
+  def userConfig: UserConfig
+
+  def chain(op: Op)(implicit system: ActorSystem): Op
+
+  def getProcessor(implicit system: ActorSystem): Processor[_ <: Task]
+}
+
+/**
+ * This represents a low level Processor.
+ */
+case class ProcessorOp[T <: Task](
+    processor: Class[T],
+    parallelism: Int,
+    userConfig: UserConfig,
+    description: String)
+  extends Op {
+
+  def this(
+      parallelism: Int = 1,
+      userConfig: UserConfig = UserConfig.empty,
+      description: String = "processor")(implicit classTag: ClassTag[T]) = {
+    this(classTag.runtimeClass.asInstanceOf[Class[T]], parallelism, 
userConfig, description)
+  }
+
+  override def chain(other: Op)(implicit system: ActorSystem): Op = {
+    throw new OpChainException(this, other)
+  }
+
+  override def getProcessor(implicit system: ActorSystem): Processor[_ <: 
Task] = {
+    DefaultProcessor(parallelism, description, userConfig, processor)
+  }
+}
+
+/**
+ * This represents a DataSource.
+ */
+case class DataSourceOp(
+    dataSource: DataSource,
+    parallelism: Int = 1,
+    userConfig: UserConfig = UserConfig.empty,
+    description: String = "source")
+  extends Op {
+
+  override def chain(other: Op)(implicit system: ActorSystem): Op = {
+    other match {
+      case op: ChainableOp[_, _] =>
+        DataSourceOp(dataSource, parallelism,
+          userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, op.fn),
+          description)
+      case _ =>
+        throw new OpChainException(this, other)
+    }
+  }
+
+  override def getProcessor(implicit system: ActorSystem): Processor[_ <: 
Task] = {
+    Processor[DataSourceTask[Any, Any]](parallelism, description,
+      userConfig.withValue(GEARPUMP_STREAMING_SOURCE, dataSource))
+  }
+}
+
+/**
+ * This represents a DataSink.
+ */
+case class DataSinkOp(
+    dataSink: DataSink,
+    parallelism: Int = 1,
+    userConfig: UserConfig = UserConfig.empty,
+    description: String = "sink")
+  extends Op {
+
+  override def chain(op: Op)(implicit system: ActorSystem): Op = {
+    throw new OpChainException(this, op)
+  }
+
+  override def getProcessor(implicit system: ActorSystem): Processor[_ <: 
Task] = {
+    DataSinkProcessor(dataSink, parallelism, description)
+  }
+}
+
+/**
+ * This represents operations that can be chained together
+ * (e.g. flatMap, map, filter, reduce) and further chained
+ * to another Op to be used
+ */
+case class ChainableOp[IN, OUT](
+    fn: SingleInputFunction[IN, OUT]) extends Op {
+
+  override def description: String = fn.description
+
+  override def userConfig: UserConfig = UserConfig.empty
+
+  override def chain(other: Op)(implicit system: ActorSystem): Op = {
+    other match {
+      case op: ChainableOp[OUT, _] =>
+        // TODO: preserve type info
+        ChainableOp(fn.andThen(op.fn))
+      case _ =>
+        throw new OpChainException(this, other)
+    }
+  }
+
+  override def getProcessor(implicit system: ActorSystem): Processor[_ <: 
Task] = {
+    throw new UnsupportedOperationException("ChainedOp cannot be translated to 
Processor")
+  }
+}
+
+/**
+ * This represents a Processor with window aggregation
+ */
+case class GroupByOp[IN, GROUP](
+    groupByFn: GroupByFn[IN, GROUP],
+    parallelism: Int = 1,
+    description: String = "groupBy",
+    override val userConfig: UserConfig = UserConfig.empty)
+  extends Op {
+
+  override def chain(other: Op)(implicit system: ActorSystem): Op = {
+    other match {
+      case op: ChainableOp[_, _] =>
+        GroupByOp(groupByFn, parallelism, description,
+          userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, op.fn))
+      case _ =>
+        throw new OpChainException(this, other)
+    }
+  }
+
+  override def getProcessor(implicit system: ActorSystem): Processor[_ <: 
Task] = {
+    groupByFn.getProcessor(parallelism, description, userConfig)
+  }
+}
+
+/**
+ * This represents a Processor transforming merged streams
+ */
+case class MergeOp(description: String, userConfig: UserConfig = 
UserConfig.empty)
+  extends Op {
+
+  override def chain(other: Op)(implicit system: ActorSystem): Op = {
+    other match {
+      case op: ChainableOp[_, _] =>
+        MergeOp(description, 
userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, op.fn))
+      case _ =>
+        throw new OpChainException(this, other)
+    }
+  }
+
+  override def getProcessor(implicit system: ActorSystem): Processor[_ <: 
Task] = {
+    Processor[TransformTask[Any, Any]](1, description, userConfig)
+  }
+
+}
+
+/**
+ * This is an edge on the logical plan.
+ */
+trait OpEdge
+
+/**
+ * The upstream OP and downstream OP doesn't require network data shuffle.
+ * e.g. ChainableOp
+ */
+case object Direct extends OpEdge
+
+/**
+ * The upstream OP and downstream OP DOES require network data shuffle.
+ * e.g. GroupByOp
+ */
+case object Shuffle extends OpEdge
+
+/**
+ * Runtime exception thrown on chaining.
+ */
+class OpChainException(op1: Op, op2: Op) extends RuntimeException(s"$op1 
cannot be chained by $op2")
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala
deleted file mode 100644
index b09d9b9..0000000
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.dsl.plan
-
-import scala.collection.TraversableOnce
-import akka.actor.ActorSystem
-import org.slf4j.Logger
-import org.apache.gearpump._
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.Constants._
-import org.apache.gearpump.streaming.Processor
-import org.apache.gearpump.streaming.Processor.DefaultProcessor
-import org.apache.gearpump.streaming.dsl.op._
-import org.apache.gearpump.streaming.dsl.plan.OpTranslator._
-import org.apache.gearpump.streaming.sink.DataSinkProcessor
-import org.apache.gearpump.streaming.source.DataSourceTask
-import org.apache.gearpump.streaming.task.{Task, TaskContext}
-import org.apache.gearpump.util.LogUtil
-
-/**
- * Translates a OP to a TaskDescription
- */
-class OpTranslator extends java.io.Serializable {
-  val LOG: Logger = LogUtil.getLogger(getClass)
-
-  def translate(ops: OpChain)(implicit system: ActorSystem): Processor[_ <: 
Task] = {
-
-    val baseConfig = ops.conf
-
-    ops.ops.head match {
-      case op: MasterOp =>
-        val tail = ops.ops.tail
-        val func = toFunction(tail)
-        val userConfig = baseConfig.withValue(GEARPUMP_STREAMING_OPERATOR, 
func)
-
-        op match {
-          case DataSourceOp(dataSource, parallelism, conf, description) =>
-            Processor[DataSourceTask[Any, Any]](parallelism,
-              description = description + "." + func.description,
-              userConfig.withValue(GEARPUMP_STREAMING_SOURCE, dataSource))
-          case groupby@GroupByOp(_, parallelism, description, _) =>
-            Processor[GroupByTask[Object, Object, Object]](parallelism,
-              description = description + "." + func.description,
-              userConfig.withValue(GEARPUMP_STREAMING_GROUPBY_FUNCTION, 
groupby))
-          case merge: MergeOp =>
-            Processor[TransformTask[Object, Object]](1,
-              description = op.description + "." + func.description,
-              userConfig)
-          case ProcessorOp(processor, parallelism, conf, description) =>
-            DefaultProcessor(parallelism,
-              description = description + " " + func.description,
-              userConfig, processor)
-          case DataSinkOp(dataSink, parallelism, conf, description) =>
-            DataSinkProcessor(dataSink, parallelism, description + 
func.description)
-        }
-      case op: SlaveOp[_] =>
-        val func = toFunction(ops.ops)
-        val userConfig = baseConfig.withValue(GEARPUMP_STREAMING_OPERATOR, 
func)
-
-        Processor[TransformTask[Object, Object]](1,
-          description = func.description,
-          taskConf = userConfig)
-      case chain: OpChain =>
-        throw new RuntimeException("Not supposed to be called!")
-    }
-  }
-
-  private def toFunction(ops: List[Op]): SingleInputFunction[Object, Object] = 
{
-    val func: SingleInputFunction[Object, Object] = new 
DummyInputFunction[Object]()
-    val totalFunction = ops.foldLeft(func) { (fun, op) =>
-
-      val opFunction = op match {
-        case flatmap: FlatMapOp[Object @unchecked, Object @unchecked] =>
-          new FlatMapFunction(flatmap.fun, flatmap.description)
-        case reduce: ReduceOp[Object @unchecked] =>
-          new ReduceFunction(reduce.fun, reduce.description)
-        case _ =>
-          throw new RuntimeException("Not supposed to be called!")
-      }
-      fun.andThen(opFunction.asInstanceOf[SingleInputFunction[Object, Object]])
-    }
-    totalFunction.asInstanceOf[SingleInputFunction[Object, Object]]
-  }
-}
-
-object OpTranslator {
-
-  trait SingleInputFunction[IN, OUT] extends Serializable {
-    def process(value: IN): TraversableOnce[OUT]
-    def andThen[OUTER](other: SingleInputFunction[OUT, OUTER]): 
SingleInputFunction[IN, OUTER] = {
-      new AndThen(this, other)
-    }
-
-    def description: String
-  }
-
-  class DummyInputFunction[T] extends SingleInputFunction[T, T] {
-    override def andThen[OUTER](other: SingleInputFunction[T, OUTER])
-    : SingleInputFunction[T, OUTER] = {
-      other
-    }
-
-    // Should never be called
-    override def process(value: T): TraversableOnce[T] = None
-
-    override def description: String = ""
-  }
-
-  class AndThen[IN, MIDDLE, OUT](
-      first: SingleInputFunction[IN, MIDDLE], second: 
SingleInputFunction[MIDDLE, OUT])
-    extends SingleInputFunction[IN, OUT] {
-
-    override def process(value: IN): TraversableOnce[OUT] = {
-      first.process(value).flatMap(second.process)
-    }
-
-    override def description: String = {
-      Option(first.description).flatMap { description =>
-        Option(second.description).map(description + "." + _)
-      }.orNull
-    }
-  }
-
-  class FlatMapFunction[IN, OUT](fun: IN => TraversableOnce[OUT], 
descriptionMessage: String)
-    extends SingleInputFunction[IN, OUT] {
-
-    override def process(value: IN): TraversableOnce[OUT] = {
-      fun(value)
-    }
-
-    override def description: String = {
-      this.descriptionMessage
-    }
-  }
-
-  class ReduceFunction[T](fun: (T, T) => T, descriptionMessage: String)
-    extends SingleInputFunction[T, T] {
-
-    private var state: Any = _
-
-    override def process(value: T): TraversableOnce[T] = {
-      if (state == null) {
-        state = value
-      } else {
-        state = fun(state.asInstanceOf[T], value)
-      }
-      Some(state.asInstanceOf[T])
-    }
-
-    override def description: String = descriptionMessage
-  }
-
-  class GroupByTask[IN, GROUP, OUT](
-      groupBy: IN => GROUP, taskContext: TaskContext, userConf: UserConfig)
-    extends Task(taskContext, userConf) {
-
-    def this(taskContext: TaskContext, userConf: UserConfig) = {
-      this(userConf.getValue[GroupByOp[IN, GROUP]](
-        GEARPUMP_STREAMING_GROUPBY_FUNCTION )(taskContext.system).get.fun,
-        taskContext, userConf)
-    }
-
-    private var groups = Map.empty[GROUP, SingleInputFunction[IN, OUT]]
-
-    override def onNext(msg: Message): Unit = {
-      val time = msg.timestamp
-
-      val group = groupBy(msg.msg.asInstanceOf[IN])
-      if (!groups.contains(group)) {
-        val operator =
-          userConf.getValue[SingleInputFunction[IN, 
OUT]](GEARPUMP_STREAMING_OPERATOR).get
-        groups += group -> operator
-      }
-
-      val operator = groups(group)
-
-      operator.process(msg.msg.asInstanceOf[IN]).foreach { msg =>
-        taskContext.output(new Message(msg.asInstanceOf[AnyRef], time))
-      }
-    }
-  }
-
-  class TransformTask[IN, OUT](
-      operator: Option[SingleInputFunction[IN, OUT]], taskContext: TaskContext,
-      userConf: UserConfig) extends Task(taskContext, userConf) {
-
-    def this(taskContext: TaskContext, userConf: UserConfig) = {
-      this(userConf.getValue[SingleInputFunction[IN, OUT]](
-        GEARPUMP_STREAMING_OPERATOR)(taskContext.system), taskContext, 
userConf)
-    }
-
-    override def onNext(msg: Message): Unit = {
-      val time = msg.timestamp
-
-      operator match {
-        case Some(op) =>
-          op.process(msg.msg.asInstanceOf[IN]).foreach { msg =>
-            taskContext.output(new Message(msg.asInstanceOf[AnyRef], time))
-          }
-        case None =>
-          taskContext.output(new Message(msg.msg, time))
-      }
-    }
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
index f5bbd65..16d5c06 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
@@ -22,7 +22,6 @@ import akka.actor.ActorSystem
 
 import org.apache.gearpump.partitioner.{CoLocationPartitioner, 
HashPartitioner, Partitioner}
 import org.apache.gearpump.streaming.Processor
-import org.apache.gearpump.streaming.dsl.op._
 import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner
 import org.apache.gearpump.streaming.task.Task
 import org.apache.gearpump.util.Graph
@@ -33,64 +32,60 @@ class Planner {
    * Converts Dag of Op to Dag of TaskDescription. TaskDescription is part of 
the low
    * level Graph API.
    */
-  def plan(dag: Graph[Op, OpEdge])(implicit system: ActorSystem)
-    : Graph[Processor[_ <: Task], _ <: Partitioner] = {
+  def plan(dag: Graph[Op, OpEdge])
+    (implicit system: ActorSystem): Graph[Processor[_ <: Task], _ <: 
Partitioner] = {
 
-    val opTranslator = new OpTranslator()
-
-    val newDag = optimize(dag)
-    newDag.mapEdge { (node1, edge, node2) =>
+    val graph = optimize(dag)
+    graph.mapEdge { (node1, edge, node2) =>
       edge match {
         case Shuffle =>
-          node2.head match {
-            case groupBy: GroupByOp[Any @unchecked, Any @unchecked] =>
-              new GroupByPartitioner(groupBy.fun)
+          node2 match {
+            case groupBy: GroupByOp[_, _] =>
+              new GroupByPartitioner(groupBy.groupByFn)
             case _ => new HashPartitioner
           }
         case Direct =>
           new CoLocationPartitioner
       }
-    }.mapVertex { opChain =>
-      opTranslator.translate(opChain)
-    }
+    }.mapVertex(_.getProcessor)
   }
 
-  private def optimize(dag: Graph[Op, OpEdge]): Graph[OpChain, OpEdge] = {
-    val newGraph = dag.mapVertex(op => OpChain(List(op)))
-
-    val nodes = newGraph.topologicalOrderWithCirclesIterator.toList.reverse
+  private def optimize(dag: Graph[Op, OpEdge])
+    (implicit system: ActorSystem): Graph[Op, OpEdge] = {
+    val graph = dag.copy
+    val nodes = graph.topologicalOrderWithCirclesIterator.toList.reverse
     for (node <- nodes) {
-      val outGoingEdges = newGraph.outgoingEdgesOf(node)
+      val outGoingEdges = graph.outgoingEdgesOf(node)
       for (edge <- outGoingEdges) {
-        merge(newGraph, edge._1, edge._3)
+        merge(graph, edge._1, edge._3)
       }
     }
-    newGraph
+    graph
   }
 
-  private def merge(dag: Graph[OpChain, OpEdge], node1: OpChain, node2: 
OpChain)
-    : Graph[OpChain, OpEdge] = {
-    if (dag.outDegreeOf(node1) == 1 &&
-      dag.inDegreeOf(node2) == 1 &&
+  private def merge(graph: Graph[Op, OpEdge], node1: Op, node2: Op)
+    (implicit system: ActorSystem): Unit = {
+    if (graph.outDegreeOf(node1) == 1 &&
+      graph.inDegreeOf(node2) == 1 &&
       // For processor node, we don't allow it to merge with downstream 
operators
-      !node1.head.isInstanceOf[ProcessorOp[_ <: Task]]) {
-      val (_, edge, _) = dag.outgoingEdgesOf(node1).head
+      !node1.isInstanceOf[ProcessorOp[_ <: Task]] &&
+      !node2.isInstanceOf[ProcessorOp[_ <: Task]]) {
+      val (_, edge, _) = graph.outgoingEdgesOf(node1).head
       if (edge == Direct) {
-        val opList = OpChain(node1.ops ++ node2.ops)
-        dag.addVertex(opList)
-        for (incomingEdge <- dag.incomingEdgesOf(node1)) {
-          dag.addEdge(incomingEdge._1, incomingEdge._2, opList)
+        val chainedOp = node1.chain(node2)
+        graph.addVertex(chainedOp)
+        for (incomingEdge <- graph.incomingEdgesOf(node1)) {
+          graph.addEdge(incomingEdge._1, incomingEdge._2, chainedOp)
         }
 
-        for (outgoingEdge <- dag.outgoingEdgesOf(node2)) {
-          dag.addEdge(opList, outgoingEdge._2, outgoingEdge._3)
+        for (outgoingEdge <- graph.outgoingEdgesOf(node2)) {
+          graph.addEdge(chainedOp, outgoingEdge._2, outgoingEdge._3)
         }
 
         // Remove the old vertex
-        dag.removeVertex(node1)
-        dag.removeVertex(node2)
+        graph.removeVertex(node1)
+        graph.removeVertex(node2)
       }
     }
-    dag
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala
new file mode 100644
index 0000000..609fbb0
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.plan.functions
+
+trait SingleInputFunction[IN, OUT] extends Serializable {
+  def process(value: IN): TraversableOnce[OUT]
+  def andThen[OUTER](other: SingleInputFunction[OUT, OUTER]): 
SingleInputFunction[IN, OUTER] = {
+    new AndThen(this, other)
+  }
+  def finish(): TraversableOnce[OUT] = None
+  def clearState(): Unit = {}
+  def description: String
+}
+
+class AndThen[IN, MIDDLE, OUT](
+    first: SingleInputFunction[IN, MIDDLE], second: 
SingleInputFunction[MIDDLE, OUT])
+  extends SingleInputFunction[IN, OUT] {
+
+  override def process(value: IN): TraversableOnce[OUT] = {
+    first.process(value).flatMap(second.process)
+  }
+
+  override def finish(): TraversableOnce[OUT] = {
+    val firstResult = first.finish().flatMap(second.process)
+    if (firstResult.isEmpty) {
+      second.finish()
+    } else {
+      firstResult
+    }
+  }
+
+  override def clearState(): Unit = {
+    first.clearState()
+    second.clearState()
+  }
+
+  override def description: String = {
+    Option(first.description).flatMap { description =>
+      Option(second.description).map(description + "." + _)
+    }.orNull
+  }
+}
+
+class FlatMapFunction[IN, OUT](fn: IN => TraversableOnce[OUT], 
descriptionMessage: String)
+  extends SingleInputFunction[IN, OUT] {
+
+  override def process(value: IN): TraversableOnce[OUT] = {
+    fn(value)
+  }
+
+  override def description: String = descriptionMessage
+}
+
+
+class ReduceFunction[T](fn: (T, T) => T, descriptionMessage: String)
+  extends SingleInputFunction[T, T] {
+
+  private var state: Option[T] = None
+
+  override def process(value: T): TraversableOnce[T] = {
+    if (state.isEmpty) {
+      state = Option(value)
+    } else {
+      state = state.map(fn(_, value))
+    }
+    None
+  }
+
+  override def finish(): TraversableOnce[T] = {
+    state
+  }
+
+  override def clearState(): Unit = {
+    state = None
+  }
+
+  override def description: String = descriptionMessage
+}
+
+class EmitFunction[T](emit: T => Unit) extends SingleInputFunction[T, Unit] {
+
+  override def process(value: T): TraversableOnce[Unit] = {
+    emit(value)
+    None
+  }
+
+  override def andThen[R](other: SingleInputFunction[Unit, R]): 
SingleInputFunction[T, R] = {
+    throw new UnsupportedOperationException("andThen is not supposed to be 
called on EmitFunction")
+  }
+
+  override def description: String = ""
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/bc394035/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala
new file mode 100644
index 0000000..4ee2fa8
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.task
+
+import java.time.Instant
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.Constants._
+import org.apache.gearpump.streaming.dsl.window.api.CountWindowFn
+import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, 
GroupAlsoByWindow, WindowRunner}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
+
+/**
+ * This task triggers output on number of messages in a window.
+ */
+class CountTriggerTask[IN, GROUP](
+    groupBy: GroupAlsoByWindow[IN, GROUP],
+    windowRunner: WindowRunner,
+    taskContext: TaskContext,
+    userConfig: UserConfig)
+  extends Task(taskContext, userConfig) {
+
+  def this(groupBy: GroupAlsoByWindow[IN, GROUP],
+      taskContext: TaskContext, userConfig: UserConfig) = {
+    this(groupBy, new DefaultWindowRunner(taskContext, userConfig, 
groupBy)(taskContext.system),
+      taskContext, userConfig)
+  }
+
+  def this(taskContext: TaskContext, userConfig: UserConfig) = {
+    this(userConfig.getValue[GroupAlsoByWindow[IN, GROUP]](
+      GEARPUMP_STREAMING_GROUPBY_FUNCTION)(taskContext.system).get,
+      taskContext, userConfig)
+  }
+
+  private val windowSize = 
groupBy.window.windowFn.asInstanceOf[CountWindowFn].size
+  private var num = 0
+
+  override def onNext(msg: Message): Unit = {
+    windowRunner.process(msg)
+    num += 1
+    if (windowSize == num) {
+      windowRunner.trigger(Instant.ofEpochMilli(windowSize))
+      num = 0
+    }
+  }
+}


Reply via email to