Repository: incubator-beam
Updated Branches:
  refs/heads/master d6adbbf96 -> d53e96a0d


Spark runner: Assign windows when re-windowing into global window

Previously, window assignment was elided when the window was the
global window. But when the source windows are not the global window,
this elision is not correct. Now window assignment is run except
when both the source *and* the destination window are the global window
(which remains a common case in globally windowed batch tests
using PAssert).


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d53e96a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d53e96a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d53e96a0

Branch: refs/heads/master
Commit: d53e96a0d1f5f26ad0e3efc90dc9f7b53135443b
Parents: f222df1
Author: Kenneth Knowles <[email protected]>
Authored: Thu Jun 9 09:15:39 2016 -0700
Committer: Kenneth Knowles <[email protected]>
Committed: Thu Jun 9 14:41:09 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/spark/translation/TransformTranslator.java   | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d53e96a0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index b462d35..ebceb6b 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -699,13 +699,16 @@ public final class TransformTranslator {
         JavaRDDLike<WindowedValue<T>, ?> inRDD =
             (JavaRDDLike<WindowedValue<T>, ?>) context.getInputRDD(transform);
         WindowFn<? super T, W> windowFn = WINDOW_FG.get("windowFn", transform);
-        if (windowFn instanceof GlobalWindows) {
+        // Avoid running assign windows if both source and destination are 
global window
+        if (context.getInput(transform).getWindowingStrategy().getWindowFn()
+                instanceof GlobalWindows
+            && windowFn instanceof GlobalWindows) {
           context.setOutputRDD(transform, inRDD);
         } else {
           @SuppressWarnings("unchecked")
           DoFn<T, T> addWindowsDoFn = new AssignWindowsDoFn<>(windowFn);
           DoFnFunction<T, T> dofn =
-                  new DoFnFunction<>(addWindowsDoFn, 
context.getRuntimeContext(), null);
+              new DoFnFunction<>(addWindowsDoFn, context.getRuntimeContext(), 
null);
           context.setOutputRDD(transform, inRDD.mapPartitions(dofn));
         }
       }

Reply via email to