Add a Dataflow-specific primitive for creating a view Allows overrides of CreatePCollectionView to work with the batch override API.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1ce88e1d Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1ce88e1d Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1ce88e1d Branch: refs/heads/master Commit: 1ce88e1db5c75bedf55bea38786f55818627e29e Parents: fe1d412 Author: Thomas Groh <[email protected]> Authored: Thu Mar 30 18:09:26 2017 -0700 Committer: Thomas Groh <[email protected]> Committed: Mon Apr 3 09:18:06 2017 -0700 ---------------------------------------------------------------------- .../runners/dataflow/BatchViewOverrides.java | 8 +- .../runners/dataflow/CreateDataflowView.java | 46 +++++++++++ .../dataflow/DataflowPipelineTranslator.java | 9 +-- .../dataflow/StreamingViewOverrides.java | 3 +- .../DataflowPipelineTranslatorTest.java | 80 +------------------- 5 files changed, 58 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/1ce88e1d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java index af96403..86bfeb6 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java @@ -831,7 +831,7 @@ class BatchViewOverrides { return Pipeline.applyTransform(outputs, Flatten.<IsmRecord<WindowedValue<V>>>pCollections()) - .apply(CreatePCollectionView.<IsmRecord<WindowedValue<V>>, + .apply(CreateDataflowView.<IsmRecord<WindowedValue<V>>, ViewT>of(view)); } @@ -975,7 +975,7 @@ class BatchViewOverrides { runner.addPCollectionRequiringIndexedFormat(reifiedPerWindowAndSorted); return reifiedPerWindowAndSorted.apply( - CreatePCollectionView.<IsmRecord<WindowedValue<FinalT>>, ViewT>of(view)); + CreateDataflowView.<IsmRecord<WindowedValue<FinalT>>, ViewT>of(view)); } @Override @@ -1119,7 +1119,7 @@ class BatchViewOverrides { runner.addPCollectionRequiringIndexedFormat(reifiedPerWindowAndSorted); return reifiedPerWindowAndSorted.apply( - CreatePCollectionView.<IsmRecord<WindowedValue<T>>, ViewT>of(view)); + CreateDataflowView.<IsmRecord<WindowedValue<T>>, ViewT>of(view)); } PCollection<IsmRecord<WindowedValue<T>>> reifiedPerWindowAndSorted = input @@ -1129,7 +1129,7 @@ class BatchViewOverrides { runner.addPCollectionRequiringIndexedFormat(reifiedPerWindowAndSorted); return reifiedPerWindowAndSorted.apply( - CreatePCollectionView.<IsmRecord<WindowedValue<T>>, ViewT>of(view)); + CreateDataflowView.<IsmRecord<WindowedValue<T>>, ViewT>of(view)); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/1ce88e1d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java new file mode 100644 index 0000000..e7542cb --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.dataflow; + +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; + +/** A {@link DataflowRunner} marker class for creating a {@link PCollectionView}. */ +public class CreateDataflowView<ElemT, ViewT> + extends PTransform<PCollection<ElemT>, PCollectionView<ViewT>> { + public static <ElemT, ViewT> CreateDataflowView<ElemT, ViewT> of(PCollectionView<ViewT> view) { + return new CreateDataflowView<>(view); + } + + private final PCollectionView<ViewT> view; + + private CreateDataflowView(PCollectionView<ViewT> view) { + this.view = view; + } + + @Override + public PCollectionView<ViewT> expand(PCollection<ElemT> input) { + return view; + } + + public PCollectionView<ViewT> getView() { + return view; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/1ce88e1d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index 6d231b9..9b80756 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -79,7 +79,6 @@ import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.transforms.reflect.DoFnSignature; @@ -691,15 +690,15 @@ public class DataflowPipelineTranslator { static { registerTransformTranslator( - View.CreatePCollectionView.class, - new TransformTranslator<View.CreatePCollectionView>() { + CreateDataflowView.class, + new TransformTranslator<CreateDataflowView>() { @Override - public void translate(View.CreatePCollectionView transform, TranslationContext context) { + public void translate(CreateDataflowView transform, TranslationContext context) { translateTyped(transform, context); } private <ElemT, ViewT> void translateTyped( - View.CreatePCollectionView<ElemT, ViewT> transform, TranslationContext context) { + CreateDataflowView<ElemT, ViewT> transform, TranslationContext context) { StepTranslationContext stepContext = context.addStep(transform, "CollectionToSingleton"); PCollection<ElemT> input = context.getInput(transform); http://git-wip-us.apache.org/repos/asf/beam/blob/1ce88e1d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java index 5f0cb26..c407517 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java @@ -29,7 +29,6 @@ import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.View.CreatePCollectionView; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; @@ -61,7 +60,7 @@ class StreamingViewOverrides { return input .apply(Combine.globally(new Concatenate<ElemT>()).withoutDefaults()) .apply(ParDo.of(StreamingPCollectionViewWriterFn.create(view, input.getCoder()))) - .apply(View.CreatePCollectionView.<ElemT, ViewT>of(view)); + .apply(CreateDataflowView.<ElemT, ViewT>of(view)); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/1ce88e1d/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index eb55566..5016d88 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -135,6 +135,8 @@ public class DataflowPipelineTranslatorTest implements Serializable { p.apply("ReadMyFile", TextIO.Read.from("gs://bucket/object")) .apply("WriteMyFile", TextIO.Write.to("gs://bucket/object")); + DataflowRunner runner = DataflowRunner.fromOptions(options); + runner.replaceTransforms(p); return p; } @@ -185,7 +187,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { DataflowPipelineOptions options = buildPipelineOptions(); options.setRunner(DataflowRunner.class); - Pipeline p = buildPipeline(options); + Pipeline p = Pipeline.create(options); p.traverseTopologically(new RecordingPipelineVisitor()); Job job = DataflowPipelineTranslator.fromOptions(options) @@ -769,82 +771,6 @@ public class DataflowPipelineTranslatorTest implements Serializable { Collections.<DataflowPackage>emptyList()); } - @Test - public void testToSingletonTranslation() throws Exception { - // A "change detector" test that makes sure the translation - // of getting a PCollectionView<T> does not change - // in bad ways during refactor - - DataflowPipelineOptions options = buildPipelineOptions(); - options.setExperiments(ImmutableList.of("disable_ism_side_input")); - DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options); - - Pipeline pipeline = Pipeline.create(options); - pipeline.apply(Create.of(1)) - .apply(View.<Integer>asSingleton()); - Job job = - translator - .translate( - pipeline, - DataflowRunner.fromOptions(options), - Collections.<DataflowPackage>emptyList()) - .getJob(); - assertAllStepOutputsHaveUniqueIds(job); - - List<Step> steps = job.getSteps(); - assertEquals(6, steps.size()); - - Step createStep = steps.get(0); - assertEquals("ParallelRead", createStep.getKind()); - - Step addNullKeyStep = steps.get(1); - assertEquals("ParallelDo", addNullKeyStep.getKind()); - - Step groupByKeyStep = steps.get(2); - assertEquals("GroupByKey", groupByKeyStep.getKind()); - - Step combineGroupedValuesStep = steps.get(3); - assertEquals("ParallelDo", combineGroupedValuesStep.getKind()); - - Step dropKeysStep = steps.get(4); - assertEquals("ParallelDo", dropKeysStep.getKind()); - - Step collectionToSingletonStep = steps.get(5); - assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind()); - } - - @Test - public void testToIterableTranslation() throws Exception { - // A "change detector" test that makes sure the translation - // of getting a PCollectionView<Iterable<T>> does not change - // in bad ways during refactor - - DataflowPipelineOptions options = buildPipelineOptions(); - options.setExperiments(ImmutableList.of("disable_ism_side_input")); - DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options); - - Pipeline pipeline = Pipeline.create(options); - pipeline.apply(Create.of(1, 2, 3)) - .apply(View.<Integer>asIterable()); - Job job = - translator - .translate( - pipeline, - DataflowRunner.fromOptions(options), - Collections.<DataflowPackage>emptyList()) - .getJob(); - assertAllStepOutputsHaveUniqueIds(job); - - List<Step> steps = job.getSteps(); - assertEquals(2, steps.size()); - - Step createStep = steps.get(0); - assertEquals("ParallelRead", createStep.getKind()); - - Step collectionToSingletonStep = steps.get(1); - assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind()); - } - /** * Smoke test to fail fast if translation of a stateful ParDo * in batch breaks.
