Repository: incubator-beam Updated Branches: refs/heads/master d6adbbf96 -> d53e96a0d
Spark runner: Assign windows when re-windowing into global window Previously, window assignment was elided when the window was the global window. But when the source windows are not the global window, this elision is not correct. Now window assignment is run except when both the source *and* the destination window are the global window (which remains a common case in globally windowed batch tests using PAssert). Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d53e96a0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d53e96a0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d53e96a0 Branch: refs/heads/master Commit: d53e96a0d1f5f26ad0e3efc90dc9f7b53135443b Parents: f222df1 Author: Kenneth Knowles <[email protected]> Authored: Thu Jun 9 09:15:39 2016 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Thu Jun 9 14:41:09 2016 -0700 ---------------------------------------------------------------------- .../beam/runners/spark/translation/TransformTranslator.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d53e96a0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java index b462d35..ebceb6b 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java @@ -699,13 +699,16 @@ public final class TransformTranslator { JavaRDDLike<WindowedValue<T>, ?> inRDD = (JavaRDDLike<WindowedValue<T>, ?>) context.getInputRDD(transform); WindowFn<? super T, W> windowFn = WINDOW_FG.get("windowFn", transform); - if (windowFn instanceof GlobalWindows) { + // Avoid running assign windows if both source and destination are global window + if (context.getInput(transform).getWindowingStrategy().getWindowFn() + instanceof GlobalWindows + && windowFn instanceof GlobalWindows) { context.setOutputRDD(transform, inRDD); } else { @SuppressWarnings("unchecked") DoFn<T, T> addWindowsDoFn = new AssignWindowsDoFn<>(windowFn); DoFnFunction<T, T> dofn = - new DoFnFunction<>(addWindowsDoFn, context.getRuntimeContext(), null); + new DoFnFunction<>(addWindowsDoFn, context.getRuntimeContext(), null); context.setOutputRDD(transform, inRDD.mapPartitions(dofn)); } }
