Repository: incubator-gearpump
Updated Branches:
  refs/heads/master 2783559db -> 956d52a35


[GEARPUMP-249] Force eager evaluation on chained operations

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [x] Make sure the commit message is formatted like:
   `[GEARPUMP-<Jira issue #>] Meaningful description of pull request`
 - [x] Make sure tests pass via `sbt clean test`.
 - [x] Make sure old documentation affected by the pull request has been 
updated and new documentation added for new functionality.

Author: manuzhang <[email protected]>

Closes #127 from manuzhang/eager-eval.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/956d52a3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/956d52a3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/956d52a3

Branch: refs/heads/master
Commit: 956d52a35193fe1da116f65264db0cf138b39665
Parents: 2783559
Author: manuzhang <[email protected]>
Authored: Wed Jan 4 12:53:33 2017 +0800
Committer: manuzhang <[email protected]>
Committed: Wed Jan 4 12:53:33 2017 +0800

----------------------------------------------------------------------
 .../gearpump/streaming/dsl/window/impl/WindowRunner.scala      | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/956d52a3/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
index b3ecf2d..d87a9e4 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
@@ -92,10 +92,12 @@ class DefaultWindowRunner[IN, GROUP, OUT](
                 .andThen[Unit](new EmitFunction[OUT](emitResult(_, time)))
               inputs.forEach(new Procedure[IN] {
                 override def value(t: IN): Unit = {
-                  reduceFn.process(t)
+                  // .toList forces eager evaluation
+                  reduceFn.process(t).toList
                 }
               })
-              reduceFn.finish()
+              // .toList forces eager evaluation
+              reduceFn.finish().toList
               if (groupBy.window.accumulationMode == Discarding) {
                 reduceFn.clearState()
               }

Reply via email to