Add InProcess Override for CreatePCollectionView Because windowing is used to retrieve values from a PCollectionView, the elements must go through a GroupByKey.
Provide a PTransform override for use in the InProcess runner to apply global grouping by window and pane, and a WriteView primitive to store the contents of the view in a PCollectionView. Update the View PTransform to make the view it returns available outside of the application. ----Release Notes---- [] ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=115409320 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/87b28e7d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/87b28e7d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/87b28e7d Branch: refs/heads/master Commit: 87b28e7dd1b2f8ca31eb155cc5bad4f98717664b Parents: 6613031 Author: tgroh <[email protected]> Authored: Tue Feb 23 19:12:37 2016 -0800 Committer: Davor Bonaci <[email protected]> Committed: Thu Feb 25 23:58:27 2016 -0800 ---------------------------------------------------------------------- .../inprocess/InProcessPipelineRunner.java | 5 +- .../runners/inprocess/ViewEvaluatorFactory.java | 69 +++++++++++++++++--- .../cloud/dataflow/sdk/transforms/View.java | 5 ++ .../inprocess/ViewEvaluatorFactoryTest.java | 33 ++++++++-- 4 files changed, 96 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87b28e7d/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java index 72642da..26c5061 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java @@ -20,10 +20,12 @@ import static com.google.common.base.Preconditions.checkArgument; import com.google.cloud.dataflow.sdk.Pipeline; import com.google.cloud.dataflow.sdk.annotations.Experimental; import com.google.cloud.dataflow.sdk.runners.inprocess.GroupByKeyEvaluatorFactory.InProcessGroupByKey; +import com.google.cloud.dataflow.sdk.runners.inprocess.ViewEvaluatorFactory.InProcessCreatePCollectionView; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; import com.google.cloud.dataflow.sdk.transforms.GroupByKey; import com.google.cloud.dataflow.sdk.transforms.GroupByKey.GroupByKeyOnly; import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.transforms.View.CreatePCollectionView; import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; import com.google.cloud.dataflow.sdk.transforms.windowing.Trigger; import com.google.cloud.dataflow.sdk.util.BaseExecutionContext; @@ -60,6 +62,7 @@ public class InProcessPipelineRunner { defaultTransformOverrides = ImmutableMap.<Class<? extends PTransform>, Class<? extends PTransform>>builder() .put(GroupByKey.class, InProcessGroupByKey.class) + .put(CreatePCollectionView.class, InProcessCreatePCollectionView.class) .build(); private static Map<Class<?>, TransformEvaluatorFactory> defaultEvaluatorFactories = @@ -222,7 +225,7 @@ public class InProcessPipelineRunner { * Create a bundle whose elements will be used in a PCollectionView. */ <ElemT, ViewT> PCollectionViewWriter<ElemT, ViewT> createPCollectionViewWriter( - PCollection<ElemT> input, PCollectionView<ViewT> output); + PCollection<Iterable<ElemT>> input, PCollectionView<ViewT> output); /** * Get the options used by this {@link Pipeline}. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87b28e7d/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactory.java index 654652c..f47cd1d 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactory.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactory.java @@ -15,11 +15,16 @@ */ package com.google.cloud.dataflow.sdk.runners.inprocess; +import com.google.cloud.dataflow.sdk.coders.KvCoder; +import com.google.cloud.dataflow.sdk.coders.VoidCoder; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.PCollectionViewWriter; import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; +import com.google.cloud.dataflow.sdk.transforms.GroupByKey; import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.transforms.Values; import com.google.cloud.dataflow.sdk.transforms.View.CreatePCollectionView; +import com.google.cloud.dataflow.sdk.transforms.WithKeys; import com.google.cloud.dataflow.sdk.util.WindowedValue; import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.cloud.dataflow.sdk.values.PCollectionView; @@ -30,6 +35,12 @@ import java.util.List; /** * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the * {@link CreatePCollectionView} primitive {@link PTransform}. + * + * <p>The {@link ViewEvaluatorFactory} produces {@link TransformEvaluator TransformEvaluators} for + * the {@link WriteView} {@link PTransform}, which is part of the + * {@link InProcessCreatePCollectionView} composite transform. This transform is an override for the + * {@link CreatePCollectionView} transform that applies windowing and triggers before the view is + * written. */ class ViewEvaluatorFactory implements TransformEvaluatorFactory { @SuppressWarnings({"rawtypes", "unchecked"}) @@ -42,19 +53,21 @@ class ViewEvaluatorFactory implements TransformEvaluatorFactory { (AppliedPTransform) application, evaluationContext); } - private <InT, OuT> TransformEvaluator<InT> createEvaluator( - final AppliedPTransform<PCollection<InT>, PCollectionView<OuT>, - CreatePCollectionView<InT, OuT>> application, + private <InT, OuT> TransformEvaluator<Iterable<InT>> createEvaluator( + final AppliedPTransform<PCollection<Iterable<InT>>, PCollectionView<OuT>, WriteView<InT, OuT>> + application, InProcessEvaluationContext context) { - PCollection<InT> input = application.getInput(); + PCollection<Iterable<InT>> input = application.getInput(); final PCollectionViewWriter<InT, OuT> writer = context.createPCollectionViewWriter(input, application.getOutput()); - return new TransformEvaluator<InT>() { + return new TransformEvaluator<Iterable<InT>>() { private final List<WindowedValue<InT>> elements = new ArrayList<>(); @Override - public void processElement(WindowedValue<InT> element) { - elements.add(element); + public void processElement(WindowedValue<Iterable<InT>> element) { + for (InT input : element.getValue()) { + elements.add(element.withValue(input)); + } } @Override @@ -64,5 +77,45 @@ class ViewEvaluatorFactory implements TransformEvaluatorFactory { } }; } -} + /** + * An in-process override for {@link CreatePCollectionView}. + */ + public static class InProcessCreatePCollectionView<ElemT, ViewT> + extends PTransform<PCollection<ElemT>, PCollectionView<ViewT>> { + private final CreatePCollectionView<ElemT, ViewT> og; + + private InProcessCreatePCollectionView(CreatePCollectionView<ElemT, ViewT> og) { + this.og = og; + } + + @Override + public PCollectionView<ViewT> apply(PCollection<ElemT> input) { + return input.apply(WithKeys.<Void, ElemT>of((Void) null)) + .setCoder(KvCoder.of(VoidCoder.of(), input.getCoder())) + .apply(GroupByKey.<Void, ElemT>create()) + .apply(Values.<Iterable<ElemT>>create()) + .apply(new WriteView<ElemT, ViewT>(og)); + } + } + + /** + * An in-process implementation of the {@link CreatePCollectionView} primitive. + * + * This implementation requires the input {@link PCollection} to be an iterable, which is provided + * to {@link PCollectionView#fromIterableInternal(Iterable)}. + */ + public static final class WriteView<ElemT, ViewT> + extends PTransform<PCollection<Iterable<ElemT>>, PCollectionView<ViewT>> { + private final CreatePCollectionView<ElemT, ViewT> og; + + WriteView(CreatePCollectionView<ElemT, ViewT> og) { + this.og = og; + } + + @Override + public PCollectionView<ViewT> apply(PCollection<Iterable<ElemT>> input) { + return og.getView(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87b28e7d/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/View.java ---------------------------------------------------------------------- diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/View.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/View.java index a41da34..e2c4487 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/View.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/View.java @@ -17,6 +17,7 @@ package com.google.cloud.dataflow.sdk.transforms; import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner; +import com.google.cloud.dataflow.sdk.runners.PipelineRunner; import com.google.cloud.dataflow.sdk.util.PCollectionViews; import com.google.cloud.dataflow.sdk.util.WindowedValue; import com.google.cloud.dataflow.sdk.values.KV; @@ -435,6 +436,10 @@ public class View { return new CreatePCollectionView<>(view); } + public PCollectionView<ViewT> getView() { + return view; + } + @Override public PCollectionView<ViewT> apply(PCollection<ElemT> input) { return view; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87b28e7d/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactoryTest.java index c29308f..021709b 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactoryTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactoryTest.java @@ -21,16 +21,24 @@ import static org.junit.Assert.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import com.google.cloud.dataflow.sdk.coders.KvCoder; +import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; +import com.google.cloud.dataflow.sdk.coders.VoidCoder; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext; import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.PCollectionViewWriter; import com.google.cloud.dataflow.sdk.runners.inprocess.util.InProcessBundle; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.View; +import com.google.cloud.dataflow.sdk.transforms.GroupByKey; +import com.google.cloud.dataflow.sdk.transforms.Values; +import com.google.cloud.dataflow.sdk.transforms.View.CreatePCollectionView; +import com.google.cloud.dataflow.sdk.transforms.WithKeys; +import com.google.cloud.dataflow.sdk.util.PCollectionViews; import com.google.cloud.dataflow.sdk.util.WindowedValue; import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.cloud.dataflow.sdk.values.PCollectionView; +import com.google.common.collect.ImmutableList; import org.joda.time.Instant; import org.junit.Test; @@ -45,19 +53,30 @@ public class ViewEvaluatorFactoryTest { @Test public void testInMemoryEvaluator() throws Exception { TestPipeline p = TestPipeline.create(); + PCollection<String> input = p.apply(Create.of("foo", "bar")); - PCollectionView<Iterable<String>> view = input.apply(View.<String>asIterable()); + CreatePCollectionView<String, Iterable<String>> createView = + CreatePCollectionView.of( + PCollectionViews.iterableView(p, input.getWindowingStrategy(), StringUtf8Coder.of())); + PCollection<Iterable<String>> concat = + input.apply(WithKeys.<Void, String>of((Void) null)) + .setCoder(KvCoder.of(VoidCoder.of(), StringUtf8Coder.of())) + .apply(GroupByKey.<Void, String>create()) + .apply(Values.<Iterable<String>>create()); + PCollectionView<Iterable<String>> view = + concat.apply(new ViewEvaluatorFactory.WriteView<>(createView)); InProcessEvaluationContext context = mock(InProcessEvaluationContext.class); TestViewWriter<String, Iterable<String>> viewWriter = new TestViewWriter<>(); - when(context.createPCollectionViewWriter(input, view)).thenReturn(viewWriter); + when(context.createPCollectionViewWriter(concat, view)).thenReturn(viewWriter); CommittedBundle<String> inputBundle = InProcessBundle.unkeyed(input).commit(Instant.now()); - TransformEvaluator<String> evaluator = new ViewEvaluatorFactory().forApplication( - view.getProducingTransformInternal(), inputBundle, context); + TransformEvaluator<Iterable<String>> evaluator = + new ViewEvaluatorFactory() + .forApplication(view.getProducingTransformInternal(), inputBundle, context); - evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); - evaluator.processElement(WindowedValue.valueInGlobalWindow("bar")); + evaluator.processElement( + WindowedValue.<Iterable<String>>valueInGlobalWindow(ImmutableList.of("foo", "bar"))); assertThat(viewWriter.latest, nullValue()); evaluator.finishBundle();
