Extracts interface from PushbackSideInputDoFnRunner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7e1a2675 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7e1a2675 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7e1a2675 Branch: refs/heads/DSL_SQL Commit: 7e1a2675699ef14291e8c112010be66fff4b8581 Parents: 1cc16b0 Author: Eugene Kirpichov <[email protected]> Authored: Mon Apr 17 14:41:53 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Tue Apr 18 18:02:06 2017 -0700 ---------------------------------------------------------------------- .../operators/ApexParDoOperator.java | 3 +- .../core/PushbackSideInputDoFnRunner.java | 106 +------ .../core/SimplePushbackSideInputDoFnRunner.java | 115 ++++++++ .../core/PushbackSideInputDoFnRunnerTest.java | 282 ------------------- .../SimplePushbackSideInputDoFnRunnerTest.java | 282 +++++++++++++++++++ .../beam/runners/direct/ParDoEvaluator.java | 3 +- .../wrappers/streaming/DoFnOperator.java | 12 +- .../streaming/SplittableDoFnOperator.java | 2 +- .../wrappers/streaming/WindowDoFnOperator.java | 2 +- 9 files changed, 424 insertions(+), 383 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/7e1a2675/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java index bad5be2..52d1d43 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java @@ -48,6 +48,7 @@ import org.apache.beam.runners.core.InMemoryTimerInternals; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; import org.apache.beam.runners.core.SideInputHandler; +import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner; import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StatefulDoFnRunner; @@ -368,7 +369,7 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements } pushbackDoFnRunner = - PushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler); + SimplePushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler); } http://git-wip-us.apache.org/repos/asf/beam/blob/7e1a2675/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java index 4ad20b5..bab1dc7 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java @@ -17,113 +17,35 @@ */ package org.apache.beam.runners.core; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.ReadyCheckingSideInputReader; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.PCollectionView; import org.joda.time.Instant; /** - * A {@link DoFnRunner} that can refuse to process elements that are not ready, instead returning - * them via the {@link #processElementInReadyWindows(WindowedValue)}. + * Interface for runners of {@link DoFn}'s that support pushback when reading side inputs, + * i.e. return elements that could not be processed because they require reading a side input + * window that is not ready. */ -public class PushbackSideInputDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> { - private final DoFnRunner<InputT, OutputT> underlying; - private final Collection<PCollectionView<?>> views; - private final ReadyCheckingSideInputReader sideInputReader; - - private Set<BoundedWindow> notReadyWindows; - - public static <InputT, OutputT> PushbackSideInputDoFnRunner<InputT, OutputT> create( - DoFnRunner<InputT, OutputT> underlying, - Collection<PCollectionView<?>> views, - ReadyCheckingSideInputReader sideInputReader) { - return new PushbackSideInputDoFnRunner<>(underlying, views, sideInputReader); - } - - private PushbackSideInputDoFnRunner( - DoFnRunner<InputT, OutputT> underlying, - Collection<PCollectionView<?>> views, - ReadyCheckingSideInputReader sideInputReader) { - this.underlying = underlying; - this.views = views; - this.sideInputReader = sideInputReader; - } - - @Override - public void startBundle() { - notReadyWindows = new HashSet<>(); - underlying.startBundle(); - } +public interface PushbackSideInputDoFnRunner<InputT, OutputT> { + /** Calls the underlying {@link DoFn.StartBundle} method. */ + void startBundle(); /** - * Call the underlying {@link DoFnRunner#processElement(WindowedValue)} for the provided element + * Call the underlying {@link DoFn.ProcessElement} method for the provided element * for each window the element is in that is ready. * * @param elem the element to process in all ready windows * @return each element that could not be processed because it requires a side input window * that is not ready. */ - public Iterable<WindowedValue<InputT>> processElementInReadyWindows(WindowedValue<InputT> elem) { - if (views.isEmpty()) { - // When there are no side inputs, we can preserve the compressed representation. - processElement(elem); - return Collections.emptyList(); - } - ImmutableList.Builder<WindowedValue<InputT>> pushedBack = ImmutableList.builder(); - for (WindowedValue<InputT> windowElem : elem.explodeWindows()) { - BoundedWindow mainInputWindow = Iterables.getOnlyElement(windowElem.getWindows()); - if (isReady(mainInputWindow)) { - // When there are any side inputs, we have to process the element in each window - // individually, to disambiguate access to per-window side inputs. - processElement(windowElem); - } else { - notReadyWindows.add(mainInputWindow); - pushedBack.add(windowElem); - } - } - return pushedBack.build(); - } - - private boolean isReady(BoundedWindow mainInputWindow) { - if (notReadyWindows.contains(mainInputWindow)) { - return false; - } - for (PCollectionView<?> view : views) { - BoundedWindow sideInputWindow = - view.getWindowMappingFn().getSideInputWindow(mainInputWindow); - if (!sideInputReader.isReady(view, sideInputWindow)) { - return false; - } - } - return true; - } + Iterable<WindowedValue<InputT>> processElementInReadyWindows(WindowedValue<InputT> elem); - @Override - public void processElement(WindowedValue<InputT> elem) { - underlying.processElement(elem); - } + /** Calls the underlying {@link DoFn.OnTimer} method. */ + void onTimer(String timerId, BoundedWindow window, Instant timestamp, + TimeDomain timeDomain); - @Override - public void onTimer(String timerId, BoundedWindow window, Instant timestamp, - TimeDomain timeDomain) { - underlying.onTimer(timerId, window, timestamp, timeDomain); - } - - /** - * Call the underlying {@link DoFnRunner#finishBundle()}. - */ - @Override - public void finishBundle() { - notReadyWindows = null; - underlying.finishBundle(); - } + /** Calls the underlying {@link DoFn.FinishBundle} method. */ + void finishBundle(); } - http://git-wip-us.apache.org/repos/asf/beam/blob/7e1a2675/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java new file mode 100644 index 0000000..50d301b --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.ReadyCheckingSideInputReader; +import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollectionView; +import org.joda.time.Instant; + +/** + * A {@link DoFnRunner} that can refuse to process elements that are not ready, instead returning + * them via the {@link #processElementInReadyWindows(WindowedValue)}. + */ +public class SimplePushbackSideInputDoFnRunner<InputT, OutputT> + implements PushbackSideInputDoFnRunner<InputT, OutputT> { + private final DoFnRunner<InputT, OutputT> underlying; + private final Collection<PCollectionView<?>> views; + private final ReadyCheckingSideInputReader sideInputReader; + + private Set<BoundedWindow> notReadyWindows; + + public static <InputT, OutputT> SimplePushbackSideInputDoFnRunner<InputT, OutputT> create( + DoFnRunner<InputT, OutputT> underlying, + Collection<PCollectionView<?>> views, + ReadyCheckingSideInputReader sideInputReader) { + return new SimplePushbackSideInputDoFnRunner<>(underlying, views, sideInputReader); + } + + private SimplePushbackSideInputDoFnRunner( + DoFnRunner<InputT, OutputT> underlying, + Collection<PCollectionView<?>> views, + ReadyCheckingSideInputReader sideInputReader) { + this.underlying = underlying; + this.views = views; + this.sideInputReader = sideInputReader; + } + + @Override + public void startBundle() { + notReadyWindows = new HashSet<>(); + underlying.startBundle(); + } + + @Override + public Iterable<WindowedValue<InputT>> processElementInReadyWindows(WindowedValue<InputT> elem) { + if (views.isEmpty()) { + // When there are no side inputs, we can preserve the compressed representation. + underlying.processElement(elem); + return Collections.emptyList(); + } + ImmutableList.Builder<WindowedValue<InputT>> pushedBack = ImmutableList.builder(); + for (WindowedValue<InputT> windowElem : elem.explodeWindows()) { + BoundedWindow mainInputWindow = Iterables.getOnlyElement(windowElem.getWindows()); + if (isReady(mainInputWindow)) { + // When there are any side inputs, we have to process the element in each window + // individually, to disambiguate access to per-window side inputs. + underlying.processElement(windowElem); + } else { + notReadyWindows.add(mainInputWindow); + pushedBack.add(windowElem); + } + } + return pushedBack.build(); + } + + private boolean isReady(BoundedWindow mainInputWindow) { + if (notReadyWindows.contains(mainInputWindow)) { + return false; + } + for (PCollectionView<?> view : views) { + BoundedWindow sideInputWindow = + view.getWindowMappingFn().getSideInputWindow(mainInputWindow); + if (!sideInputReader.isReady(view, sideInputWindow)) { + return false; + } + } + return true; + } + + @Override + public void onTimer(String timerId, BoundedWindow window, Instant timestamp, + TimeDomain timeDomain) { + underlying.onTimer(timerId, window, timestamp, timeDomain); + } + + @Override + public void finishBundle() { + notReadyWindows = null; + underlying.finishBundle(); + } +} + http://git-wip-us.apache.org/repos/asf/beam/blob/7e1a2675/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java deleted file mode 100644 index cb057b8..0000000 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java +++ /dev/null @@ -1,282 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.core; - -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.emptyIterable; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.when; - -import com.google.common.collect.ImmutableList; -import java.util.ArrayList; -import java.util.List; -import org.apache.beam.runners.core.TimerInternals.TimerData; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.Sum; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.util.IdentitySideInputWindowFn; -import org.apache.beam.sdk.util.ReadyCheckingSideInputReader; -import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionView; -import org.hamcrest.Matchers; -import org.joda.time.Instant; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.MockitoAnnotations; - -/** - * Tests for {@link PushbackSideInputDoFnRunner}. - */ -@RunWith(JUnit4.class) -public class PushbackSideInputDoFnRunnerTest { - @Mock private ReadyCheckingSideInputReader reader; - private TestDoFnRunner<Integer, Integer> underlying; - private PCollectionView<Integer> singletonView; - - @Rule - public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); - - @Before - public void setup() { - MockitoAnnotations.initMocks(this); - PCollection<Integer> created = p.apply(Create.of(1, 2, 3)); - singletonView = - created - .apply(Window.into(new IdentitySideInputWindowFn())) - .apply(Sum.integersGlobally().asSingletonView()); - - underlying = new TestDoFnRunner<>(); - } - - private PushbackSideInputDoFnRunner<Integer, Integer> createRunner( - ImmutableList<PCollectionView<?>> views) { - PushbackSideInputDoFnRunner<Integer, Integer> runner = - PushbackSideInputDoFnRunner.create(underlying, views, reader); - runner.startBundle(); - return runner; - } - - @Test - public void startFinishBundleDelegates() { - PushbackSideInputDoFnRunner runner = - createRunner(ImmutableList.<PCollectionView<?>>of(singletonView)); - - assertThat(underlying.started, is(true)); - assertThat(underlying.finished, is(false)); - runner.finishBundle(); - assertThat(underlying.finished, is(true)); - } - - @Test - public void processElementSideInputNotReady() { - when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class))) - .thenReturn(false); - - PushbackSideInputDoFnRunner<Integer, Integer> runner = - createRunner(ImmutableList.<PCollectionView<?>>of(singletonView)); - - WindowedValue<Integer> oneWindow = - WindowedValue.of( - 2, - new Instant(-2), - new IntervalWindow(new Instant(-500L), new Instant(0L)), - PaneInfo.ON_TIME_AND_ONLY_FIRING); - Iterable<WindowedValue<Integer>> oneWindowPushback = - runner.processElementInReadyWindows(oneWindow); - assertThat(oneWindowPushback, containsInAnyOrder(oneWindow)); - assertThat(underlying.inputElems, Matchers.<WindowedValue<Integer>>emptyIterable()); - } - - @Test - public void processElementSideInputNotReadyMultipleWindows() { - when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class))) - .thenReturn(false); - - PushbackSideInputDoFnRunner<Integer, Integer> runner = - createRunner(ImmutableList.<PCollectionView<?>>of(singletonView)); - - WindowedValue<Integer> multiWindow = - WindowedValue.of( - 2, - new Instant(-2), - ImmutableList.of( - new IntervalWindow(new Instant(-500L), new Instant(0L)), - new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)), - GlobalWindow.INSTANCE), - PaneInfo.ON_TIME_AND_ONLY_FIRING); - Iterable<WindowedValue<Integer>> multiWindowPushback = - runner.processElementInReadyWindows(multiWindow); - assertThat(multiWindowPushback, equalTo(multiWindow.explodeWindows())); - assertThat(underlying.inputElems, Matchers.<WindowedValue<Integer>>emptyIterable()); - } - - @Test - public void processElementSideInputNotReadySomeWindows() { - when(reader.isReady(Mockito.eq(singletonView), Mockito.eq(GlobalWindow.INSTANCE))) - .thenReturn(false); - when( - reader.isReady( - Mockito.eq(singletonView), - org.mockito.AdditionalMatchers.not(Mockito.eq(GlobalWindow.INSTANCE)))) - .thenReturn(true); - - PushbackSideInputDoFnRunner<Integer, Integer> runner = - createRunner(ImmutableList.<PCollectionView<?>>of(singletonView)); - - IntervalWindow littleWindow = new IntervalWindow(new Instant(-500L), new Instant(0L)); - IntervalWindow bigWindow = - new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)); - WindowedValue<Integer> multiWindow = - WindowedValue.of( - 2, - new Instant(-2), - ImmutableList.of(littleWindow, bigWindow, GlobalWindow.INSTANCE), - PaneInfo.NO_FIRING); - Iterable<WindowedValue<Integer>> multiWindowPushback = - runner.processElementInReadyWindows(multiWindow); - assertThat( - multiWindowPushback, - containsInAnyOrder(WindowedValue.timestampedValueInGlobalWindow(2, new Instant(-2L)))); - assertThat( - underlying.inputElems, - containsInAnyOrder( - WindowedValue.of( - 2, new Instant(-2), ImmutableList.of(littleWindow), PaneInfo.NO_FIRING), - WindowedValue.of(2, new Instant(-2), ImmutableList.of(bigWindow), PaneInfo.NO_FIRING))); - } - - @Test - public void processElementSideInputReadyAllWindows() { - when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class))) - .thenReturn(true); - - ImmutableList<PCollectionView<?>> views = ImmutableList.<PCollectionView<?>>of(singletonView); - PushbackSideInputDoFnRunner<Integer, Integer> runner = createRunner(views); - - WindowedValue<Integer> multiWindow = - WindowedValue.of( - 2, - new Instant(-2), - ImmutableList.of( - new IntervalWindow(new Instant(-500L), new Instant(0L)), - new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)), - GlobalWindow.INSTANCE), - PaneInfo.ON_TIME_AND_ONLY_FIRING); - Iterable<WindowedValue<Integer>> multiWindowPushback = - runner.processElementInReadyWindows(multiWindow); - assertThat(multiWindowPushback, emptyIterable()); - assertThat( - underlying.inputElems, - containsInAnyOrder(ImmutableList.copyOf(multiWindow.explodeWindows()).toArray())); - } - - @Test - public void processElementNoSideInputs() { - PushbackSideInputDoFnRunner<Integer, Integer> runner = - createRunner(ImmutableList.<PCollectionView<?>>of()); - - WindowedValue<Integer> multiWindow = - WindowedValue.of( - 2, - new Instant(-2), - ImmutableList.of( - new IntervalWindow(new Instant(-500L), new Instant(0L)), - new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)), - GlobalWindow.INSTANCE), - PaneInfo.ON_TIME_AND_ONLY_FIRING); - Iterable<WindowedValue<Integer>> multiWindowPushback = - runner.processElementInReadyWindows(multiWindow); - assertThat(multiWindowPushback, emptyIterable()); - // Should preserve the compressed representation when there's no side inputs. - assertThat(underlying.inputElems, containsInAnyOrder(multiWindow)); - } - - /** Tests that a call to onTimer gets delegated. */ - @Test - public void testOnTimerCalled() { - PushbackSideInputDoFnRunner<Integer, Integer> runner = - createRunner(ImmutableList.<PCollectionView<?>>of()); - - String timerId = "fooTimer"; - IntervalWindow window = new IntervalWindow(new Instant(4), new Instant(16)); - Instant timestamp = new Instant(72); - - // Mocking is not easily compatible with annotation analysis, so we manually record - // the method call. - runner.onTimer(timerId, window, new Instant(timestamp), TimeDomain.EVENT_TIME); - - assertThat( - underlying.firedTimers, - contains( - TimerData.of( - timerId, - StateNamespaces.window(IntervalWindow.getCoder(), window), - timestamp, - TimeDomain.EVENT_TIME))); - } - - private static class TestDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> { - List<WindowedValue<InputT>> inputElems; - List<TimerData> firedTimers; - private boolean started = false; - private boolean finished = false; - - @Override - public void startBundle() { - started = true; - inputElems = new ArrayList<>(); - firedTimers = new ArrayList<>(); - } - - @Override - public void processElement(WindowedValue<InputT> elem) { - inputElems.add(elem); - } - - @Override - public void onTimer(String timerId, BoundedWindow window, Instant timestamp, - TimeDomain timeDomain) { - firedTimers.add( - TimerData.of( - timerId, - StateNamespaces.window(IntervalWindow.getCoder(), (IntervalWindow) window), - timestamp, - timeDomain)); - } - - @Override - public void finishBundle() { - finished = true; - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/7e1a2675/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java new file mode 100644 index 0000000..ba3f926 --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.emptyIterable; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableList; +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.runners.core.TimerInternals.TimerData; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.util.IdentitySideInputWindowFn; +import org.apache.beam.sdk.util.ReadyCheckingSideInputReader; +import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.hamcrest.Matchers; +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +/** + * Tests for {@link SimplePushbackSideInputDoFnRunner}. + */ +@RunWith(JUnit4.class) +public class SimplePushbackSideInputDoFnRunnerTest { + @Mock private ReadyCheckingSideInputReader reader; + private TestDoFnRunner<Integer, Integer> underlying; + private PCollectionView<Integer> singletonView; + + @Rule + public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); + + @Before + public void setup() { + MockitoAnnotations.initMocks(this); + PCollection<Integer> created = p.apply(Create.of(1, 2, 3)); + singletonView = + created + .apply(Window.into(new IdentitySideInputWindowFn())) + .apply(Sum.integersGlobally().asSingletonView()); + + underlying = new TestDoFnRunner<>(); + } + + private SimplePushbackSideInputDoFnRunner<Integer, Integer> createRunner( + ImmutableList<PCollectionView<?>> views) { + SimplePushbackSideInputDoFnRunner<Integer, Integer> runner = + SimplePushbackSideInputDoFnRunner.create(underlying, views, reader); + runner.startBundle(); + return runner; + } + + @Test + public void startFinishBundleDelegates() { + PushbackSideInputDoFnRunner runner = + createRunner(ImmutableList.<PCollectionView<?>>of(singletonView)); + + assertThat(underlying.started, is(true)); + assertThat(underlying.finished, is(false)); + runner.finishBundle(); + assertThat(underlying.finished, is(true)); + } + + @Test + public void processElementSideInputNotReady() { + when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class))) + .thenReturn(false); + + SimplePushbackSideInputDoFnRunner<Integer, Integer> runner = + createRunner(ImmutableList.<PCollectionView<?>>of(singletonView)); + + WindowedValue<Integer> oneWindow = + WindowedValue.of( + 2, + new Instant(-2), + new IntervalWindow(new Instant(-500L), new Instant(0L)), + PaneInfo.ON_TIME_AND_ONLY_FIRING); + Iterable<WindowedValue<Integer>> oneWindowPushback = + runner.processElementInReadyWindows(oneWindow); + assertThat(oneWindowPushback, containsInAnyOrder(oneWindow)); + assertThat(underlying.inputElems, Matchers.<WindowedValue<Integer>>emptyIterable()); + } + + @Test + public void processElementSideInputNotReadyMultipleWindows() { + when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class))) + .thenReturn(false); + + SimplePushbackSideInputDoFnRunner<Integer, Integer> runner = + createRunner(ImmutableList.<PCollectionView<?>>of(singletonView)); + + WindowedValue<Integer> multiWindow = + WindowedValue.of( + 2, + new Instant(-2), + ImmutableList.of( + new IntervalWindow(new Instant(-500L), new Instant(0L)), + new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)), + GlobalWindow.INSTANCE), + PaneInfo.ON_TIME_AND_ONLY_FIRING); + Iterable<WindowedValue<Integer>> multiWindowPushback = + runner.processElementInReadyWindows(multiWindow); + assertThat(multiWindowPushback, equalTo(multiWindow.explodeWindows())); + assertThat(underlying.inputElems, Matchers.<WindowedValue<Integer>>emptyIterable()); + } + + @Test + public void processElementSideInputNotReadySomeWindows() { + when(reader.isReady(Mockito.eq(singletonView), Mockito.eq(GlobalWindow.INSTANCE))) + .thenReturn(false); + when( + reader.isReady( + Mockito.eq(singletonView), + org.mockito.AdditionalMatchers.not(Mockito.eq(GlobalWindow.INSTANCE)))) + .thenReturn(true); + + SimplePushbackSideInputDoFnRunner<Integer, Integer> runner = + createRunner(ImmutableList.<PCollectionView<?>>of(singletonView)); + + IntervalWindow littleWindow = new IntervalWindow(new Instant(-500L), new Instant(0L)); + IntervalWindow bigWindow = + new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)); + WindowedValue<Integer> multiWindow = + WindowedValue.of( + 2, + new Instant(-2), + ImmutableList.of(littleWindow, bigWindow, GlobalWindow.INSTANCE), + PaneInfo.NO_FIRING); + Iterable<WindowedValue<Integer>> multiWindowPushback = + runner.processElementInReadyWindows(multiWindow); + assertThat( + multiWindowPushback, + containsInAnyOrder(WindowedValue.timestampedValueInGlobalWindow(2, new Instant(-2L)))); + assertThat( + underlying.inputElems, + containsInAnyOrder( + WindowedValue.of( + 2, new Instant(-2), ImmutableList.of(littleWindow), PaneInfo.NO_FIRING), + WindowedValue.of(2, new Instant(-2), ImmutableList.of(bigWindow), PaneInfo.NO_FIRING))); + } + + @Test + public void processElementSideInputReadyAllWindows() { + when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class))) + .thenReturn(true); + + ImmutableList<PCollectionView<?>> views = ImmutableList.<PCollectionView<?>>of(singletonView); + SimplePushbackSideInputDoFnRunner<Integer, Integer> runner = createRunner(views); + + WindowedValue<Integer> multiWindow = + WindowedValue.of( + 2, + new Instant(-2), + ImmutableList.of( + new IntervalWindow(new Instant(-500L), new Instant(0L)), + new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)), + GlobalWindow.INSTANCE), + PaneInfo.ON_TIME_AND_ONLY_FIRING); + Iterable<WindowedValue<Integer>> multiWindowPushback = + runner.processElementInReadyWindows(multiWindow); + assertThat(multiWindowPushback, emptyIterable()); + assertThat( + underlying.inputElems, + containsInAnyOrder(ImmutableList.copyOf(multiWindow.explodeWindows()).toArray())); + } + + @Test + public void processElementNoSideInputs() { + SimplePushbackSideInputDoFnRunner<Integer, Integer> runner = + createRunner(ImmutableList.<PCollectionView<?>>of()); + + WindowedValue<Integer> multiWindow = + WindowedValue.of( + 2, + new Instant(-2), + ImmutableList.of( + new IntervalWindow(new Instant(-500L), new Instant(0L)), + new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)), + GlobalWindow.INSTANCE), + PaneInfo.ON_TIME_AND_ONLY_FIRING); + Iterable<WindowedValue<Integer>> multiWindowPushback = + runner.processElementInReadyWindows(multiWindow); + assertThat(multiWindowPushback, emptyIterable()); + // Should preserve the compressed representation when there's no side inputs. + assertThat(underlying.inputElems, containsInAnyOrder(multiWindow)); + } + + /** Tests that a call to onTimer gets delegated. */ + @Test + public void testOnTimerCalled() { + PushbackSideInputDoFnRunner<Integer, Integer> runner = + createRunner(ImmutableList.<PCollectionView<?>>of()); + + String timerId = "fooTimer"; + IntervalWindow window = new IntervalWindow(new Instant(4), new Instant(16)); + Instant timestamp = new Instant(72); + + // Mocking is not easily compatible with annotation analysis, so we manually record + // the method call. + runner.onTimer(timerId, window, new Instant(timestamp), TimeDomain.EVENT_TIME); + + assertThat( + underlying.firedTimers, + contains( + TimerData.of( + timerId, + StateNamespaces.window(IntervalWindow.getCoder(), window), + timestamp, + TimeDomain.EVENT_TIME))); + } + + private static class TestDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> { + List<WindowedValue<InputT>> inputElems; + List<TimerData> firedTimers; + private boolean started = false; + private boolean finished = false; + + @Override + public void startBundle() { + started = true; + inputElems = new ArrayList<>(); + firedTimers = new ArrayList<>(); + } + + @Override + public void processElement(WindowedValue<InputT> elem) { + inputElems.add(elem); + } + + @Override + public void onTimer(String timerId, BoundedWindow window, Instant timestamp, + TimeDomain timeDomain) { + firedTimers.add( + TimerData.of( + timerId, + StateNamespaces.window(IntervalWindow.getCoder(), (IntervalWindow) window), + timestamp, + timeDomain)); + } + + @Override + public void finishBundle() { + finished = true; + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/7e1a2675/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java index 131716f..bab7b2c 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java @@ -26,6 +26,7 @@ import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.DoFnRunners.OutputManager; import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; +import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner; import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; @@ -85,7 +86,7 @@ class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> { aggregatorChanges, windowingStrategy); PushbackSideInputDoFnRunner<InputT, OutputT> runner = - PushbackSideInputDoFnRunner.create(underlying, sideInputs, sideInputReader); + SimplePushbackSideInputDoFnRunner.create(underlying, sideInputs, sideInputReader); try { runner.startBundle(); http://git-wip-us.apache.org/repos/asf/beam/blob/7e1a2675/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 5496f71..8a09286 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -37,6 +37,7 @@ import org.apache.beam.runners.core.ExecutionContext; import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn; import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; import org.apache.beam.runners.core.SideInputHandler; +import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner; import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateNamespaces; @@ -119,6 +120,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> protected final OutputManagerFactory<OutputT> outputManagerFactory; + protected transient DoFnRunner<InputT, FnOutputT> doFnRunner; protected transient PushbackSideInputDoFnRunner<InputT, FnOutputT> pushbackDoFnRunner; protected transient SideInputHandler sideInputHandler; @@ -269,7 +271,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> ExecutionContext.StepContext stepContext = createStepContext(); - DoFnRunner<InputT, FnOutputT> doFnRunner = DoFnRunners.simpleRunner( + doFnRunner = DoFnRunners.simpleRunner( serializedOptions.getPipelineOptions(), doFn, sideInputReader, @@ -320,7 +322,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> } pushbackDoFnRunner = - PushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler); + SimplePushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler); } @Override @@ -362,9 +364,9 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> @Override public final void processElement( StreamRecord<WindowedValue<InputT>> streamRecord) throws Exception { - pushbackDoFnRunner.startBundle(); - pushbackDoFnRunner.processElement(streamRecord.getValue()); - pushbackDoFnRunner.finishBundle(); + doFnRunner.startBundle(); + doFnRunner.processElement(streamRecord.getValue()); + doFnRunner.finishBundle(); } private void setPushedBackWatermark(long watermark) { http://git-wip-us.apache.org/repos/asf/beam/blob/7e1a2675/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java index 1a636c9..40f70e4 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java @@ -142,7 +142,7 @@ public class SplittableDoFnOperator< @Override public void fireTimer(InternalTimer<?, TimerInternals.TimerData> timer) { - pushbackDoFnRunner.processElement(WindowedValue.valueInGlobalWindow( + doFnRunner.processElement(WindowedValue.valueInGlobalWindow( KeyedWorkItems.<String, ElementAndRestriction<InputT, RestrictionT>>timersWorkItem( (String) stateInternals.getKey(), Collections.singletonList(timer.getNamespace())))); http://git-wip-us.apache.org/repos/asf/beam/blob/7e1a2675/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java index 7b899f4..9b2136c 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java @@ -108,7 +108,7 @@ public class WindowDoFnOperator<K, InputT, OutputT> @Override public void fireTimer(InternalTimer<?, TimerData> timer) { - pushbackDoFnRunner.processElement(WindowedValue.valueInGlobalWindow( + doFnRunner.processElement(WindowedValue.valueInGlobalWindow( KeyedWorkItems.<K, InputT>timersWorkItem( (K) stateInternals.getKey(), Collections.singletonList(timer.getNamespace()))));
