Handle Undeclared Side Outputs in ParDoInProcessEvaluator The value of an Undeclared Side Output is ignored by the InProcessPipelineRunner.
----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=115489641 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/045e3436 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/045e3436 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/045e3436 Branch: refs/heads/master Commit: 045e3436e6b2ce9593c1f8ebeaa57ea7d229134e Parents: 1cc0211 Author: tgroh <[email protected]> Authored: Wed Feb 24 13:56:45 2016 -0800 Committer: Davor Bonaci <[email protected]> Committed: Thu Feb 25 23:58:27 2016 -0800 ---------------------------------------------------------------------- .../inprocess/ParDoInProcessEvaluator.java | 17 ++++- .../ParDoMultiEvaluatorFactoryTest.java | 74 ++++++++++++++++++++ .../ParDoSingleEvaluatorFactoryTest.java | 42 +++++++++++ 3 files changed, 131 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/045e3436/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoInProcessEvaluator.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoInProcessEvaluator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoInProcessEvaluator.java index a2b083b..f0b2ca2 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoInProcessEvaluator.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoInProcessEvaluator.java @@ -27,7 +27,10 @@ import com.google.cloud.dataflow.sdk.values.TupleTag; import org.joda.time.Instant; +import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; +import java.util.List; import java.util.Map; class ParDoInProcessEvaluator<T> { @@ -61,6 +64,7 @@ class ParDoInProcessEvaluator<T> { static class BundleOutputManager implements OutputManager { private final Map<TupleTag<?>, UncommittedBundle<?>> bundles; + private final Map<TupleTag<?>, List<?>> undeclaredOutputs; public static BundleOutputManager create(Map<TupleTag<?>, UncommittedBundle<?>> outputBundles) { return new BundleOutputManager(outputBundles); @@ -68,6 +72,7 @@ class ParDoInProcessEvaluator<T> { private BundleOutputManager(Map<TupleTag<?>, UncommittedBundle<?>> bundles) { this.bundles = bundles; + undeclaredOutputs = new HashMap<>(); } @SuppressWarnings("unchecked") @@ -75,8 +80,16 @@ class ParDoInProcessEvaluator<T> { public <T> void output(TupleTag<T> tag, WindowedValue<T> output) { @SuppressWarnings("rawtypes") UncommittedBundle bundle = bundles.get(tag); - bundle.add(output); + if (bundle == null) { + List undeclaredContents = undeclaredOutputs.get(tag); + if (undeclaredContents == null) { + undeclaredContents = new ArrayList<T>(); + undeclaredOutputs.put(tag, undeclaredContents); + } + undeclaredContents.add(output); + } else { + bundle.add(output); + } } } } - http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/045e3436/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java index 5251a76..c55a9d5 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java @@ -137,5 +137,79 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable { WindowedValue.timestampedValueInGlobalWindow(4, new Instant(1000)), WindowedValue.valueInGlobalWindow(5, PaneInfo.ON_TIME_AND_ONLY_FIRING))); } + + @Test + public void testParDoMultiUndeclaredSideOutput() throws Exception { + TestPipeline p = TestPipeline.create(); + + PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam")); + + TupleTag<KV<String, Integer>> mainOutputTag = new TupleTag<KV<String, Integer>>() {}; + final TupleTag<String> elementTag = new TupleTag<>(); + final TupleTag<Integer> lengthTag = new TupleTag<>(); + + BoundMulti<String, KV<String, Integer>> pardo = + ParDo.of(new DoFn<String, KV<String, Integer>>() { + @Override + public void processElement(ProcessContext c) { + c.output(KV.<String, Integer>of(c.element(), c.element().length())); + c.sideOutput(elementTag, c.element()); + c.sideOutput(lengthTag, c.element().length()); + } + }).withOutputTags(mainOutputTag, TupleTagList.of(elementTag)); + PCollectionTuple outputTuple = input.apply(pardo); + + CommittedBundle<String> inputBundle = InProcessBundle.unkeyed(input).commit(Instant.now()); + + PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag); + PCollection<String> elementOutput = outputTuple.get(elementTag); + + InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class); + UncommittedBundle<KV<String, Integer>> mainOutputBundle = InProcessBundle.unkeyed(mainOutput); + UncommittedBundle<String> elementOutputBundle = InProcessBundle.unkeyed(elementOutput); + + when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle); + when(evaluationContext.createBundle(inputBundle, elementOutput)) + .thenReturn(elementOutputBundle); + + InProcessExecutionContext executionContext = + new InProcessExecutionContext(); + when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal())) + .thenReturn(executionContext); + CounterSet counters = new CounterSet(); + when(evaluationContext.createCounterSet()).thenReturn(counters); + + com.google.cloud.dataflow.sdk.runners.inprocess.TransformEvaluator<String> evaluator = + new ParDoMultiEvaluatorFactory().forApplication( + mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext); + + evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); + evaluator.processElement( + WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000))); + evaluator.processElement( + WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)); + + InProcessTransformResult result = evaluator.finishBundle(); + assertThat( + result.getOutputBundles(), + Matchers.<UncommittedBundle<?>>containsInAnyOrder( + mainOutputBundle, elementOutputBundle)); + assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE)); + assertThat(result.getCounters(), equalTo(counters)); + + assertThat( + mainOutputBundle.commit(Instant.now()).getElements(), + Matchers.<WindowedValue<KV<String, Integer>>>containsInAnyOrder( + WindowedValue.valueInGlobalWindow(KV.of("foo", 3)), + WindowedValue.timestampedValueInGlobalWindow(KV.of("bara", 4), new Instant(1000)), + WindowedValue.valueInGlobalWindow( + KV.of("bazam", 5), PaneInfo.ON_TIME_AND_ONLY_FIRING))); + assertThat( + elementOutputBundle.commit(Instant.now()).getElements(), + Matchers.<WindowedValue<String>>containsInAnyOrder( + WindowedValue.valueInGlobalWindow("foo"), + WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)), + WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING))); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/045e3436/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java index c2e148b..4fc765c 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java @@ -34,6 +34,7 @@ import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo; import com.google.cloud.dataflow.sdk.util.WindowedValue; import com.google.cloud.dataflow.sdk.util.common.CounterSet; import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.TupleTag; import org.hamcrest.Matchers; import org.joda.time.Instant; @@ -92,5 +93,46 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable { WindowedValue.timestampedValueInGlobalWindow(4, new Instant(1000)), WindowedValue.valueInGlobalWindow(5, PaneInfo.ON_TIME_AND_ONLY_FIRING))); } + + @Test + public void testSideOutputToUndeclaredSideOutputSucceeds() throws Exception { + TestPipeline p = TestPipeline.create(); + + PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam")); + final TupleTag<Integer> sideOutputTag = new TupleTag<Integer>() {}; + PCollection<Integer> collection = input.apply(ParDo.of(new DoFn<String, Integer>() { + @Override public void processElement(ProcessContext c) { + c.sideOutput(sideOutputTag, c.element().length()); + } + })); + CommittedBundle<String> inputBundle = InProcessBundle.unkeyed(input).commit(Instant.now()); + + InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class); + UncommittedBundle<Integer> outputBundle = + InProcessBundle.unkeyed(collection); + when(evaluationContext.createBundle(inputBundle, collection)).thenReturn(outputBundle); + InProcessExecutionContext executionContext = + new InProcessExecutionContext(); + when(evaluationContext.getExecutionContext(collection.getProducingTransformInternal())) + .thenReturn(executionContext); + CounterSet counters = new CounterSet(); + when(evaluationContext.createCounterSet()).thenReturn(counters); + + TransformEvaluator<String> evaluator = + new ParDoSingleEvaluatorFactory().forApplication( + collection.getProducingTransformInternal(), inputBundle, evaluationContext); + + evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); + evaluator.processElement( + WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000))); + evaluator.processElement( + WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)); + + InProcessTransformResult result = evaluator.finishBundle(); + assertThat( + result.getOutputBundles(), Matchers.<UncommittedBundle<?>>containsInAnyOrder(outputBundle)); + assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE)); + assertThat(result.getCounters(), equalTo(counters)); + } }
