Repository: incubator-beam Updated Branches: refs/heads/master fab7b2402 -> 68623e91f
Short-circuit side input window checks in PushbackDoFnRunner This uses the collection of not-ready windows to avoid checking when the answer must be false. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5bf23ac0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5bf23ac0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5bf23ac0 Branch: refs/heads/master Commit: 5bf23ac0d3f41a3b3e2088024996b1247d246131 Parents: fab7b24 Author: Thomas Groh <tg...@google.com> Authored: Fri Oct 28 09:21:51 2016 -0700 Committer: Thomas Groh <tg...@google.com> Committed: Tue Nov 1 10:44:56 2016 -0700 ---------------------------------------------------------------------- .../core/PushbackSideInputDoFnRunner.java | 28 +++++++++++--------- 1 file changed, 15 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5bf23ac0/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java index deeac3c..8c169da 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java @@ -23,7 +23,6 @@ import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.Set; - import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.ReadyCheckingSideInputReader; import org.apache.beam.sdk.util.WindowedValue; @@ -78,18 +77,7 @@ public class PushbackSideInputDoFnRunner<InputT, OutputT> implements DoFnRunner< ImmutableList.Builder<WindowedValue<InputT>> pushedBack = ImmutableList.builder(); for (WindowedValue<InputT> windowElem : elem.explodeWindows()) { BoundedWindow mainInputWindow = Iterables.getOnlyElement(windowElem.getWindows()); - boolean isReady = !notReadyWindows.contains(mainInputWindow); - for (PCollectionView<?> view : views) { - BoundedWindow sideInputWindow = - view.getWindowingStrategyInternal() - .getWindowFn() - .getSideInputWindow(mainInputWindow); - if (!sideInputReader.isReady(view, sideInputWindow)) { - isReady = false; - break; - } - } - if (isReady) { + if (isReady(mainInputWindow)) { processElement(windowElem); } else { notReadyWindows.add(mainInputWindow); @@ -99,6 +87,20 @@ public class PushbackSideInputDoFnRunner<InputT, OutputT> implements DoFnRunner< return pushedBack.build(); } + private boolean isReady(BoundedWindow mainInputWindow) { + if (notReadyWindows.contains(mainInputWindow)) { + return false; + } + for (PCollectionView<?> view : views) { + BoundedWindow sideInputWindow = + view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(mainInputWindow); + if (!sideInputReader.isReady(view, sideInputWindow)) { + return false; + } + } + return true; + } + @Override public void processElement(WindowedValue<InputT> elem) { underlying.processElement(elem);