Repository: incubator-gearpump Updated Branches: refs/heads/master 6677b6a11 -> 215531cd8
[GEARPUMP-245] Apply groupBy and fix group implementations Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [x] Make sure the commit message is formatted like: `[GEARPUMP-<Jira issue #>] Meaningful description of pull request` - [x] Make sure tests pass via `sbt clean test`. - [x] Make sure old documentation affected by the pull request has been updated and new documentation added for new functionality. Author: manuzhang <[email protected]> Closes #122 from manuzhang/GEARPUMP-245. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/215531cd Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/215531cd Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/215531cd Branch: refs/heads/master Commit: 215531cd822f5a07dd6925220f962a3b64d4aebd Parents: 6677b6a Author: manuzhang <[email protected]> Authored: Wed Dec 14 12:45:48 2016 +0800 Committer: manuzhang <[email protected]> Committed: Wed Dec 14 12:45:48 2016 +0800 ---------------------------------------------------------------------- .../streaming/dsl/javaapi/JavaStream.scala | 2 +- .../dsl/window/impl/WindowRunner.scala | 52 ++++++++++---------- 2 files changed, 27 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/215531cd/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala index f68731e..f2654ea 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala @@ -84,6 +84,6 @@ class JavaWindowStream[T](stream: WindowStream[T]) { def groupBy[GROUP](fn: GroupByFunction[T, GROUP], parallelism: Int, description: String): JavaStream[T] = { - new JavaStream[T](stream.groupBy((t: T) => fn, parallelism, description)) + new JavaStream[T](stream.groupBy(fn.apply, parallelism, description)) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/215531cd/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala index 640d090..b3ecf2d 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala @@ -26,6 +26,7 @@ import com.gs.collections.api.block.procedure.Procedure import com.gs.collections.impl.list.mutable.FastList import com.gs.collections.impl.map.mutable.UnifiedMap import com.gs.collections.impl.map.sorted.mutable.TreeSortedMap +import com.gs.collections.impl.set.mutable.UnifiedSet import org.apache.gearpump.streaming.Constants._ import org.apache.gearpump.streaming.dsl.plan.functions.{EmitFunction, SingleInputFunction} import org.apache.gearpump.streaming.dsl.window.api.Discarding @@ -46,18 +47,6 @@ object DefaultWindowRunner { private val LOG: Logger = LogUtil.getLogger(classOf[DefaultWindowRunner[_, _, _]]) case class WindowGroup[GROUP](bucket: Bucket, group: GROUP) - extends Comparable[WindowGroup[GROUP]] { - override def compareTo(o: WindowGroup[GROUP]): Int = { - val ret = bucket.compareTo(o.bucket) - if (ret != 0) { - ret - } else if (group.equals(o.group)) { - 0 - } else { - -1 - } - } - } } class DefaultWindowRunner[IN, GROUP, OUT]( @@ -66,7 +55,8 @@ class DefaultWindowRunner[IN, GROUP, OUT]( extends WindowRunner { import org.apache.gearpump.streaming.dsl.window.impl.DefaultWindowRunner._ - private val windowGroups = new TreeSortedMap[WindowGroup[GROUP], FastList[IN]] + private val windows = new TreeSortedMap[Bucket, UnifiedSet[WindowGroup[GROUP]]] + private val windowGroups = new UnifiedMap[WindowGroup[GROUP], FastList[IN]] private val groupFns = new UnifiedMap[GROUP, SingleInputFunction[IN, OUT]] @@ -74,6 +64,10 @@ class DefaultWindowRunner[IN, GROUP, OUT]( val (group, buckets) = groupBy.groupBy(message) buckets.foreach { bucket => val wg = WindowGroup(bucket, group) + val wgs = windows.getOrDefault(bucket, new UnifiedSet[WindowGroup[GROUP]](1)) + wgs.add(wg) + windows.put(bucket, wgs) + val inputs = windowGroups.getOrDefault(wg, new FastList[IN](1)) inputs.add(message.msg.asInstanceOf[IN]) windowGroups.put(wg, inputs) @@ -87,21 +81,27 @@ class DefaultWindowRunner[IN, GROUP, OUT]( @annotation.tailrec def onTrigger(): Unit = { - if (windowGroups.notEmpty()) { - val first = windowGroups.firstKey - if (!time.isBefore(first.bucket.endTime)) { - val inputs = windowGroups.remove(first) - val reduceFn = groupFns.get(first.group) - .andThen[Unit](new EmitFunction[OUT](emitResult(_, time))) - inputs.forEach(new Procedure[IN] { - override def value(t: IN): Unit = { - reduceFn.process(t) + if (windows.notEmpty()) { + val first = windows.firstKey + if (!time.isBefore(first.endTime)) { + val wgs = windows.remove(first) + wgs.forEach(new Procedure[WindowGroup[GROUP]] { + override def value(each: WindowGroup[GROUP]): Unit = { + val inputs = windowGroups.remove(each) + val reduceFn = groupFns.get(each.group) + .andThen[Unit](new EmitFunction[OUT](emitResult(_, time))) + inputs.forEach(new Procedure[IN] { + override def value(t: IN): Unit = { + reduceFn.process(t) + } + }) + reduceFn.finish() + if (groupBy.window.accumulationMode == Discarding) { + reduceFn.clearState() + } } }) - reduceFn.finish() - if (groupBy.window.accumulationMode == Discarding) { - reduceFn.clearState() - } + onTrigger() } }
