Repository: incubator-beam Updated Branches: refs/heads/master 7c917a6ee -> 487052588
Use PushbackDoFnRunner in the ParDoInProcessEvaluator This ensures that the evaluator does not block while processing an input bundle. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dd4ef6ff Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dd4ef6ff Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dd4ef6ff Branch: refs/heads/master Commit: dd4ef6ffc67d4776d115bd6a77483c6f2fd66ae5 Parents: d3b96bc Author: Thomas Groh <[email protected]> Authored: Wed Apr 27 17:27:57 2016 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Tue May 10 10:15:14 2016 -0700 ---------------------------------------------------------------------- .../runners/direct/ParDoInProcessEvaluator.java | 24 ++- .../direct/ParDoInProcessEvaluatorTest.java | 214 +++++++++++++++++++ .../sdk/util/PushbackSideInputDoFnRunner.java | 2 +- .../sdk/util/IdentitySideInputWindowFn.java | 2 +- 4 files changed, 235 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd4ef6ff/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoInProcessEvaluator.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoInProcessEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoInProcessEvaluator.java index 1c51738..2cdf6cb 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoInProcessEvaluator.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoInProcessEvaluator.java @@ -25,6 +25,8 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.util.DoFnRunner; import org.apache.beam.sdk.util.DoFnRunners; import org.apache.beam.sdk.util.DoFnRunners.OutputManager; +import org.apache.beam.sdk.util.PushbackSideInputDoFnRunner; +import org.apache.beam.sdk.util.ReadyCheckingSideInputReader; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; @@ -34,6 +36,8 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; +import com.google.common.collect.ImmutableList; + import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -65,17 +69,21 @@ class ParDoInProcessEvaluator<T> implements TransformEvaluator<T> { evaluationContext.createBundle(inputBundle, outputEntry.getValue())); } - DoFnRunner<InputT, OutputT> runner = + ReadyCheckingSideInputReader sideInputReader = + evaluationContext.createSideInputReader(sideInputs); + DoFnRunner<InputT, OutputT> underlying = DoFnRunners.createDefault( evaluationContext.getPipelineOptions(), SerializableUtils.clone(fn), - evaluationContext.createSideInputReader(sideInputs), + sideInputReader, BundleOutputManager.create(outputBundles), mainOutputTag, sideOutputTags, stepContext, counters.getAddCounterMutator(), application.getInput().getWindowingStrategy()); + PushbackSideInputDoFnRunner<InputT, OutputT> runner = + PushbackSideInputDoFnRunner.create(underlying, sideInputs, sideInputReader); try { runner.startBundle(); @@ -89,14 +97,16 @@ class ParDoInProcessEvaluator<T> implements TransformEvaluator<T> { //////////////////////////////////////////////////////////////////////////////////////////////// - private final DoFnRunner<T, ?> fnRunner; + private final PushbackSideInputDoFnRunner<T, ?> fnRunner; private final AppliedPTransform<PCollection<T>, ?, ?> transform; private final CounterSet counters; private final Collection<UncommittedBundle<?>> outputBundles; private final InProcessStepContext stepContext; + private final ImmutableList.Builder<WindowedValue<T>> unprocessedElements; + private ParDoInProcessEvaluator( - DoFnRunner<T, ?> fnRunner, + PushbackSideInputDoFnRunner<T, ?> fnRunner, AppliedPTransform<PCollection<T>, ?, ?> transform, CounterSet counters, Collection<UncommittedBundle<?>> outputBundles, @@ -106,12 +116,15 @@ class ParDoInProcessEvaluator<T> implements TransformEvaluator<T> { this.counters = counters; this.outputBundles = outputBundles; this.stepContext = stepContext; + + this.unprocessedElements = ImmutableList.builder(); } @Override public void processElement(WindowedValue<T> element) { try { - fnRunner.processElement(element); + Iterable<WindowedValue<T>> unprocessed = fnRunner.processElementInReadyWindows(element); + unprocessedElements.addAll(unprocessed); } catch (Exception e) { throw UserCodeException.wrap(e); } @@ -137,6 +150,7 @@ class ParDoInProcessEvaluator<T> implements TransformEvaluator<T> { .addOutput(outputBundles) .withTimerUpdate(stepContext.getTimerUpdate()) .withCounters(counters) + .addUnprocessedElements(unprocessedElements.build()) .build(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd4ef6ff/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoInProcessEvaluatorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoInProcessEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoInProcessEvaluatorTest.java new file mode 100644 index 0000000..ca15d9c --- /dev/null +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoInProcessEvaluatorTest.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate; +import org.apache.beam.runners.direct.InProcessExecutionContext.InProcessStepContext; +import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.util.IdentitySideInputWindowFn; +import org.apache.beam.sdk.util.ReadyCheckingSideInputReader; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.common.CounterSet; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; + +import org.hamcrest.Matchers; +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import javax.annotation.Nullable; + +/** + * Tests for {@link ParDoInProcessEvaluator}. + */ +@RunWith(JUnit4.class) +public class ParDoInProcessEvaluatorTest { + @Mock private InProcessEvaluationContext evaluationContext; + private PCollection<Integer> inputPc; + private TupleTag<Integer> mainOutputTag; + private List<TupleTag<?>> sideOutputTags; + private BundleFactory bundleFactory; + + @Before + public void setup() { + MockitoAnnotations.initMocks(this); + TestPipeline p = TestPipeline.create(); + inputPc = p.apply(Create.of(1, 2, 3)); + mainOutputTag = new TupleTag<Integer>() {}; + sideOutputTags = TupleTagList.empty().getAll(); + + bundleFactory = InProcessBundleFactory.create(); + } + + @Test + public void sideInputsNotReadyResultHasUnprocessedElements() { + PCollectionView<Integer> singletonView = + inputPc + .apply(Window.into(new IdentitySideInputWindowFn())) + .apply(View.<Integer>asSingleton().withDefaultValue(0)); + RecorderFn fn = new RecorderFn(singletonView); + PCollection<Integer> output = inputPc.apply(ParDo.of(fn).withSideInputs(singletonView)); + + CommittedBundle<Integer> inputBundle = + bundleFactory.createRootBundle(inputPc).commit(Instant.now()); + UncommittedBundle<Integer> outputBundle = bundleFactory.createBundle(inputBundle, output); + when(evaluationContext.createBundle(inputBundle, output)) + .thenReturn(outputBundle); + + ParDoInProcessEvaluator<Integer> evaluator = + createEvaluator(singletonView, fn, inputBundle, output); + + IntervalWindow nonGlobalWindow = new IntervalWindow(new Instant(0), new Instant(10_000L)); + WindowedValue<Integer> first = WindowedValue.valueInGlobalWindow(3); + WindowedValue<Integer> second = + WindowedValue.of(2, new Instant(1234L), nonGlobalWindow, PaneInfo.NO_FIRING); + WindowedValue<Integer> third = + WindowedValue.of( + 1, + new Instant(2468L), + ImmutableList.of(nonGlobalWindow, GlobalWindow.INSTANCE), + PaneInfo.NO_FIRING); + + evaluator.processElement(first); + evaluator.processElement(second); + evaluator.processElement(third); + InProcessTransformResult result = evaluator.finishBundle(); + + assertThat( + result.getUnprocessedElements(), + Matchers.<WindowedValue<?>>containsInAnyOrder( + second, WindowedValue.of(1, new Instant(2468L), nonGlobalWindow, PaneInfo.NO_FIRING))); + assertThat(result.getOutputBundles(), Matchers.<UncommittedBundle<?>>contains(outputBundle)); + assertThat(RecorderFn.processed, containsInAnyOrder(1, 3)); + assertThat( + Iterables.getOnlyElement(result.getOutputBundles()).commit(Instant.now()).getElements(), + Matchers.<WindowedValue<?>>containsInAnyOrder( + first.withValue(8), + WindowedValue.timestampedValueInGlobalWindow(6, new Instant(2468L)))); + } + + private ParDoInProcessEvaluator<Integer> createEvaluator( + PCollectionView<Integer> singletonView, + RecorderFn fn, + InProcessPipelineRunner.CommittedBundle<Integer> inputBundle, + PCollection<Integer> output) { + when( + evaluationContext.createSideInputReader( + ImmutableList.<PCollectionView<?>>of(singletonView))) + .thenReturn(new ReadyInGlobalWindowReader()); + InProcessExecutionContext executionContext = mock(InProcessExecutionContext.class); + InProcessStepContext stepContext = mock(InProcessStepContext.class); + when( + executionContext.getOrCreateStepContext( + Mockito.any(String.class), Mockito.any(String.class))) + .thenReturn(stepContext); + when(stepContext.getTimerUpdate()).thenReturn(TimerUpdate.empty()); + when( + evaluationContext.getExecutionContext( + Mockito.any(AppliedPTransform.class), Mockito.any(Object.class))) + .thenReturn(executionContext); + when(evaluationContext.createCounterSet()).thenReturn(new CounterSet()); + + return ParDoInProcessEvaluator.create( + evaluationContext, + inputBundle, + (AppliedPTransform<PCollection<Integer>, ?, ?>) output.getProducingTransformInternal(), + fn, + ImmutableList.<PCollectionView<?>>of(singletonView), + mainOutputTag, + sideOutputTags, + ImmutableMap.<TupleTag<?>, PCollection<?>>of(mainOutputTag, output)); + } + + private static class RecorderFn extends DoFn<Integer, Integer> { + private static Collection<Integer> processed; + private final PCollectionView<Integer> view; + + public RecorderFn(PCollectionView<Integer> view) { + processed = new ArrayList<>(); + this.view = view; + } + + @Override + public void processElement(DoFn<Integer, Integer>.ProcessContext c) throws Exception { + processed.add(c.element()); + c.output(c.element() + c.sideInput(view)); + } + } + + private static class ReadyInGlobalWindowReader implements ReadyCheckingSideInputReader { + @Override + @Nullable + public <T> T get(PCollectionView<T> view, BoundedWindow window) { + if (window.equals(GlobalWindow.INSTANCE)) { + return (T) (Integer) 5; + } + fail("Should only call get in the Global Window, others are not ready"); + throw new AssertionError("Unreachable"); + } + + @Override + public <T> boolean contains(PCollectionView<T> view) { + return true; + } + + @Override + public boolean isEmpty() { + return false; + } + + @Override + public boolean isReady(PCollectionView<?> view, BoundedWindow window) { + return window.equals(GlobalWindow.INSTANCE); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd4ef6ff/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java index 4eeedf6..b1442dd 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java @@ -32,7 +32,7 @@ import java.util.Set; * A {@link DoFnRunner} that can refuse to process elements that are not ready, instead returning * them via the {@link #processElementInReadyWindows(WindowedValue)}. */ -class PushbackSideInputDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> { +public class PushbackSideInputDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> { private final DoFnRunner<InputT, OutputT> underlying; private final Collection<PCollectionView<?>> views; private final ReadyCheckingSideInputReader sideInputReader; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd4ef6ff/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java index ecab6f8..db6f425 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java @@ -29,7 +29,7 @@ import java.util.Collection; * A {@link WindowFn} for use during tests that returns the input window for calls to * {@link #getSideInputWindow(BoundedWindow)}. */ -class IdentitySideInputWindowFn extends NonMergingWindowFn<Integer, BoundedWindow> { +public class IdentitySideInputWindowFn extends NonMergingWindowFn<Integer, BoundedWindow> { @Override public Collection<BoundedWindow> assignWindows(WindowFn<Integer, BoundedWindow>.AssignContext c) throws Exception {
