Repository: incubator-gearpump
Updated Branches:
  refs/heads/master 8cd0f0c9b -> 58a1a63e6


[GEARPUMP-286] Fix window and group implementation in WindowRunner

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [x] Make sure the commit message is formatted like:
   `[GEARPUMP-<Jira issue #>] Meaningful description of pull request`
 - [x] Make sure tests pass via `sbt clean test`.
 - [x] Make sure old documentation affected by the pull request has been 
updated and new documentation added for new functionality.

Author: manuzhang <[email protected]>

Closes #164 from manuzhang/GEARPUMP-286.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/58a1a63e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/58a1a63e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/58a1a63e

Branch: refs/heads/master
Commit: 58a1a63e648793577b416680c6b81aead7d66fac
Parents: 8cd0f0c
Author: manuzhang <[email protected]>
Authored: Tue Feb 28 10:09:00 2017 +0800
Committer: manuzhang <[email protected]>
Committed: Tue Feb 28 10:09:08 2017 +0800

----------------------------------------------------------------------
 .../streaming/dsl/window/impl/Window.scala      | 21 +-----
 .../dsl/window/impl/WindowRunner.scala          | 74 +++++++++++---------
 2 files changed, 43 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/58a1a63e/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
index e1d9cee..5f9d19b 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
@@ -65,33 +65,16 @@ case class Window(startTime: Instant, endTime: Instant) 
extends Comparable[Windo
   }
 }
 
-case class WindowAndGroup[GROUP](window: Window, group: GROUP)
-  extends Comparable[WindowAndGroup[GROUP]] {
-
-  def intersects(other: WindowAndGroup[GROUP]): Boolean = {
-    window.intersects(other.window) && group.equals(other.group)
-  }
-
-  override def compareTo(o: WindowAndGroup[GROUP]): Int = {
-    val ret = window.compareTo(o.window)
-    if (ret != 0) {
-      ret
-    } else {
-      group.hashCode() - o.group.hashCode()
-    }
-  }
-}
-
 case class GroupAlsoByWindow[T, GROUP](groupByFn: T => GROUP, window: 
Windows[T]) {
 
-  def groupBy(message: Message): List[WindowAndGroup[GROUP]] = {
+  def groupBy(message: Message): (GROUP, List[Window]) = {
     val ele = message.msg.asInstanceOf[T]
     val group = groupByFn(ele)
     val windows = window.windowFn(new WindowFunction.Context[T] {
       override def element: T = ele
       override def timestamp: Instant = message.timestamp
     })
-    windows.map(WindowAndGroup(_, group)).toList
+    group -> windows.toList
   }
 
   def getProcessor(parallelism: Int, description: String,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/58a1a63e/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
index 1ada42c..91edd73 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
@@ -23,7 +23,7 @@ import akka.actor.ActorSystem
 import com.gs.collections.api.block.predicate.Predicate
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
-import com.gs.collections.api.block.procedure.Procedure
+import com.gs.collections.api.block.procedure.{Procedure, Procedure2}
 import com.gs.collections.impl.list.mutable.FastList
 import com.gs.collections.impl.map.mutable.UnifiedMap
 import com.gs.collections.impl.map.sorted.mutable.TreeSortedMap
@@ -53,60 +53,68 @@ class DefaultWindowRunner[IN, GROUP, OUT](
   extends WindowRunner {
 
   private val windowFn = groupBy.window.windowFn
-  private val groupedInputs = new TreeSortedMap[WindowAndGroup[GROUP], 
FastList[IN]]
+  private val groupedWindowInputs = new UnifiedMap[GROUP, 
TreeSortedMap[Window, FastList[IN]]]
   private val groupedFnRunners = new UnifiedMap[GROUP, FunctionRunner[IN, OUT]]
 
   override def process(message: Message): Unit = {
     val input = message.msg.asInstanceOf[IN]
-    val wgs = groupBy.groupBy(message)
-    wgs.foreach { wg =>
+    val (group, windows) = groupBy.groupBy(message)
+    if (!groupedWindowInputs.containsKey(group)) {
+      groupedWindowInputs.put(group, new TreeSortedMap[Window, FastList[IN]]())
+    }
+    val windowInputs = groupedWindowInputs.get(group)
+    windows.foreach { win =>
       if (windowFn.isNonMerging) {
-        if (!groupedInputs.containsKey(wg)) {
+        if (!windowInputs.containsKey(win)) {
           val inputs = new FastList[IN](1)
-          groupedInputs.put(wg, inputs)
+          windowInputs.put(win, inputs)
         }
-        groupedInputs.get(wg).add(input)
+        windowInputs.get(win).add(input)
       } else {
-        merge(wg, input)
+        merge(windowInputs, win, input)
       }
+    }
 
-      if (!groupedFnRunners.containsKey(wg.group)) {
-        val fn = userConfig.getValue[FunctionRunner[IN, 
OUT]](GEARPUMP_STREAMING_OPERATOR).get
-        fn.setup()
-        groupedFnRunners.put(wg.group, fn)
-      }
+    if (!groupedFnRunners.containsKey(group)) {
+      val fn = userConfig.getValue[FunctionRunner[IN, 
OUT]](GEARPUMP_STREAMING_OPERATOR).get
+      fn.setup()
+      groupedFnRunners.put(group, fn)
     }
 
-    def merge(wg: WindowAndGroup[GROUP], input: IN): Unit = {
-      val intersected = groupedInputs.keySet.select(new 
Predicate[WindowAndGroup[GROUP]] {
-        override def accept(each: WindowAndGroup[GROUP]): Boolean = {
-          wg.intersects(each)
+    def merge(windowInputs: TreeSortedMap[Window, FastList[IN]], win: Window, 
input: IN): Unit = {
+      val intersected = windowInputs.keySet.select(new Predicate[Window] {
+        override def accept(each: Window): Boolean = {
+          win.intersects(each)
         }
       })
-      var mergedWin = wg.window
+      var mergedWin = win
       val mergedInputs = FastList.newListWith(input)
-      intersected.forEach(new Procedure[WindowAndGroup[GROUP]] {
-        override def value(each: WindowAndGroup[GROUP]): Unit = {
-          mergedWin = mergedWin.span(each.window)
-          mergedInputs.addAll(groupedInputs.remove(each))
+      intersected.forEach(new Procedure[Window] {
+        override def value(each: Window): Unit = {
+          mergedWin = mergedWin.span(each)
+          mergedInputs.addAll(windowInputs.remove(each))
         }
       })
-      groupedInputs.put(WindowAndGroup(mergedWin, wg.group), mergedInputs)
+      windowInputs.put(mergedWin, mergedInputs)
     }
 
   }
 
   override def trigger(time: Instant): Unit = {
-    onTrigger()
+    groupedWindowInputs.forEachKeyValue(new Procedure2[GROUP, 
TreeSortedMap[Window, FastList[IN]]] {
+      override def value(group: GROUP, windowInputs: TreeSortedMap[Window, 
FastList[IN]]): Unit = {
+        onTrigger(group, windowInputs)
+      }
+    })
 
     @annotation.tailrec
-    def onTrigger(): Unit = {
-      if (groupedInputs.notEmpty()) {
-        val first = groupedInputs.firstKey
-        if (!time.isBefore(first.window.endTime)) {
-          val inputs = groupedInputs.remove(first)
-          if (groupedFnRunners.containsKey(first.group)) {
-            val reduceFn = AndThen(groupedFnRunners.get(first.group),
+    def onTrigger(group: GROUP, windowInputs: TreeSortedMap[Window, 
FastList[IN]]): Unit = {
+      if (windowInputs.notEmpty()) {
+        val firstWin = windowInputs.firstKey
+        if (!time.isBefore(firstWin.endTime)) {
+          val inputs = windowInputs.remove(firstWin)
+          if (groupedFnRunners.containsKey(group)) {
+            val reduceFn = AndThen(groupedFnRunners.get(group),
               new Emit[OUT](output => emitResult(output, time)))
             inputs.forEach(new Procedure[IN] {
               override def value(t: IN): Unit = {
@@ -119,9 +127,9 @@ class DefaultWindowRunner[IN, GROUP, OUT](
             if (groupBy.window.accumulationMode == Discarding) {
               reduceFn.teardown()
             }
-            onTrigger()
+            onTrigger(group, windowInputs)
           } else {
-            throw new RuntimeException(s"FunctionRunner not found for group 
${first.group}")
+            throw new RuntimeException(s"FunctionRunner not found for group 
$group")
           }
         }
       }

Reply via email to