Repository: incubator-gearpump Updated Branches: refs/heads/master 2783559db -> 956d52a35
[GEARPUMP-249] Force eager evaluation on chained operations Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [x] Make sure the commit message is formatted like: `[GEARPUMP-<Jira issue #>] Meaningful description of pull request` - [x] Make sure tests pass via `sbt clean test`. - [x] Make sure old documentation affected by the pull request has been updated and new documentation added for new functionality. Author: manuzhang <[email protected]> Closes #127 from manuzhang/eager-eval. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/956d52a3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/956d52a3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/956d52a3 Branch: refs/heads/master Commit: 956d52a35193fe1da116f65264db0cf138b39665 Parents: 2783559 Author: manuzhang <[email protected]> Authored: Wed Jan 4 12:53:33 2017 +0800 Committer: manuzhang <[email protected]> Committed: Wed Jan 4 12:53:33 2017 +0800 ---------------------------------------------------------------------- .../gearpump/streaming/dsl/window/impl/WindowRunner.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/956d52a3/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala index b3ecf2d..d87a9e4 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala @@ -92,10 +92,12 @@ class DefaultWindowRunner[IN, GROUP, OUT]( .andThen[Unit](new EmitFunction[OUT](emitResult(_, time))) inputs.forEach(new Procedure[IN] { override def value(t: IN): Unit = { - reduceFn.process(t) + // .toList forces eager evaluation + reduceFn.process(t).toList } }) - reduceFn.finish() + // .toList forces eager evaluation + reduceFn.finish().toList if (groupBy.window.accumulationMode == Discarding) { reduceFn.clearState() }
