Repository: incubator-gearpump Updated Branches: refs/heads/master 8cd0f0c9b -> 58a1a63e6
[GEARPUMP-286] Fix window and group implementation in WindowRunner Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [x] Make sure the commit message is formatted like: `[GEARPUMP-<Jira issue #>] Meaningful description of pull request` - [x] Make sure tests pass via `sbt clean test`. - [x] Make sure old documentation affected by the pull request has been updated and new documentation added for new functionality. Author: manuzhang <[email protected]> Closes #164 from manuzhang/GEARPUMP-286. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/58a1a63e Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/58a1a63e Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/58a1a63e Branch: refs/heads/master Commit: 58a1a63e648793577b416680c6b81aead7d66fac Parents: 8cd0f0c Author: manuzhang <[email protected]> Authored: Tue Feb 28 10:09:00 2017 +0800 Committer: manuzhang <[email protected]> Committed: Tue Feb 28 10:09:08 2017 +0800 ---------------------------------------------------------------------- .../streaming/dsl/window/impl/Window.scala | 21 +----- .../dsl/window/impl/WindowRunner.scala | 74 +++++++++++--------- 2 files changed, 43 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/58a1a63e/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala index e1d9cee..5f9d19b 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala @@ -65,33 +65,16 @@ case class Window(startTime: Instant, endTime: Instant) extends Comparable[Windo } } -case class WindowAndGroup[GROUP](window: Window, group: GROUP) - extends Comparable[WindowAndGroup[GROUP]] { - - def intersects(other: WindowAndGroup[GROUP]): Boolean = { - window.intersects(other.window) && group.equals(other.group) - } - - override def compareTo(o: WindowAndGroup[GROUP]): Int = { - val ret = window.compareTo(o.window) - if (ret != 0) { - ret - } else { - group.hashCode() - o.group.hashCode() - } - } -} - case class GroupAlsoByWindow[T, GROUP](groupByFn: T => GROUP, window: Windows[T]) { - def groupBy(message: Message): List[WindowAndGroup[GROUP]] = { + def groupBy(message: Message): (GROUP, List[Window]) = { val ele = message.msg.asInstanceOf[T] val group = groupByFn(ele) val windows = window.windowFn(new WindowFunction.Context[T] { override def element: T = ele override def timestamp: Instant = message.timestamp }) - windows.map(WindowAndGroup(_, group)).toList + group -> windows.toList } def getProcessor(parallelism: Int, description: String, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/58a1a63e/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala index 1ada42c..91edd73 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala @@ -23,7 +23,7 @@ import akka.actor.ActorSystem import com.gs.collections.api.block.predicate.Predicate import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig -import com.gs.collections.api.block.procedure.Procedure +import com.gs.collections.api.block.procedure.{Procedure, Procedure2} import com.gs.collections.impl.list.mutable.FastList import com.gs.collections.impl.map.mutable.UnifiedMap import com.gs.collections.impl.map.sorted.mutable.TreeSortedMap @@ -53,60 +53,68 @@ class DefaultWindowRunner[IN, GROUP, OUT]( extends WindowRunner { private val windowFn = groupBy.window.windowFn - private val groupedInputs = new TreeSortedMap[WindowAndGroup[GROUP], FastList[IN]] + private val groupedWindowInputs = new UnifiedMap[GROUP, TreeSortedMap[Window, FastList[IN]]] private val groupedFnRunners = new UnifiedMap[GROUP, FunctionRunner[IN, OUT]] override def process(message: Message): Unit = { val input = message.msg.asInstanceOf[IN] - val wgs = groupBy.groupBy(message) - wgs.foreach { wg => + val (group, windows) = groupBy.groupBy(message) + if (!groupedWindowInputs.containsKey(group)) { + groupedWindowInputs.put(group, new TreeSortedMap[Window, FastList[IN]]()) + } + val windowInputs = groupedWindowInputs.get(group) + windows.foreach { win => if (windowFn.isNonMerging) { - if (!groupedInputs.containsKey(wg)) { + if (!windowInputs.containsKey(win)) { val inputs = new FastList[IN](1) - groupedInputs.put(wg, inputs) + windowInputs.put(win, inputs) } - groupedInputs.get(wg).add(input) + windowInputs.get(win).add(input) } else { - merge(wg, input) + merge(windowInputs, win, input) } + } - if (!groupedFnRunners.containsKey(wg.group)) { - val fn = userConfig.getValue[FunctionRunner[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get - fn.setup() - groupedFnRunners.put(wg.group, fn) - } + if (!groupedFnRunners.containsKey(group)) { + val fn = userConfig.getValue[FunctionRunner[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get + fn.setup() + groupedFnRunners.put(group, fn) } - def merge(wg: WindowAndGroup[GROUP], input: IN): Unit = { - val intersected = groupedInputs.keySet.select(new Predicate[WindowAndGroup[GROUP]] { - override def accept(each: WindowAndGroup[GROUP]): Boolean = { - wg.intersects(each) + def merge(windowInputs: TreeSortedMap[Window, FastList[IN]], win: Window, input: IN): Unit = { + val intersected = windowInputs.keySet.select(new Predicate[Window] { + override def accept(each: Window): Boolean = { + win.intersects(each) } }) - var mergedWin = wg.window + var mergedWin = win val mergedInputs = FastList.newListWith(input) - intersected.forEach(new Procedure[WindowAndGroup[GROUP]] { - override def value(each: WindowAndGroup[GROUP]): Unit = { - mergedWin = mergedWin.span(each.window) - mergedInputs.addAll(groupedInputs.remove(each)) + intersected.forEach(new Procedure[Window] { + override def value(each: Window): Unit = { + mergedWin = mergedWin.span(each) + mergedInputs.addAll(windowInputs.remove(each)) } }) - groupedInputs.put(WindowAndGroup(mergedWin, wg.group), mergedInputs) + windowInputs.put(mergedWin, mergedInputs) } } override def trigger(time: Instant): Unit = { - onTrigger() + groupedWindowInputs.forEachKeyValue(new Procedure2[GROUP, TreeSortedMap[Window, FastList[IN]]] { + override def value(group: GROUP, windowInputs: TreeSortedMap[Window, FastList[IN]]): Unit = { + onTrigger(group, windowInputs) + } + }) @annotation.tailrec - def onTrigger(): Unit = { - if (groupedInputs.notEmpty()) { - val first = groupedInputs.firstKey - if (!time.isBefore(first.window.endTime)) { - val inputs = groupedInputs.remove(first) - if (groupedFnRunners.containsKey(first.group)) { - val reduceFn = AndThen(groupedFnRunners.get(first.group), + def onTrigger(group: GROUP, windowInputs: TreeSortedMap[Window, FastList[IN]]): Unit = { + if (windowInputs.notEmpty()) { + val firstWin = windowInputs.firstKey + if (!time.isBefore(firstWin.endTime)) { + val inputs = windowInputs.remove(firstWin) + if (groupedFnRunners.containsKey(group)) { + val reduceFn = AndThen(groupedFnRunners.get(group), new Emit[OUT](output => emitResult(output, time))) inputs.forEach(new Procedure[IN] { override def value(t: IN): Unit = { @@ -119,9 +127,9 @@ class DefaultWindowRunner[IN, GROUP, OUT]( if (groupBy.window.accumulationMode == Discarding) { reduceFn.teardown() } - onTrigger() + onTrigger(group, windowInputs) } else { - throw new RuntimeException(s"FunctionRunner not found for group ${first.group}") + throw new RuntimeException(s"FunctionRunner not found for group $group") } } }
