Repository: beam Updated Branches: refs/heads/master 7ce0a82b0 -> 30886acee
Fix BatchViewOverrides ViewAsSingleton to apply the combine fn that was being replaced. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/accb2087 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/accb2087 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/accb2087 Branch: refs/heads/master Commit: accb2087f88c641be9db038bbb5be715aacffb8d Parents: c2f815c Author: Luke Cwik <[email protected]> Authored: Tue Nov 14 19:19:48 2017 -0800 Committer: Luke Cwik <[email protected]> Committed: Wed Nov 15 08:59:51 2017 -0800 ---------------------------------------------------------------------- .../runners/dataflow/BatchViewOverrides.java | 16 +++- .../runners/dataflow/CreateDataflowView.java | 6 +- .../beam/runners/dataflow/DataflowRunner.java | 80 +++++++++++++++----- .../DataflowPipelineTranslatorTest.java | 6 +- 4 files changed, 80 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/accb2087/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java index 8ed41cb..2953a42 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java @@ -55,6 +55,8 @@ import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.coders.StructuredCoder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; @@ -893,15 +895,23 @@ class BatchViewOverrides { private final DataflowRunner runner; private final PCollectionView<T> view; - /** Builds an instance of this class from the overridden transform. */ - @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply() - public BatchViewAsSingleton(DataflowRunner runner, CreatePCollectionView<T, T> transform) { + private final CombineFn<T, ?, T> combineFn; + private final int fanout; + + public BatchViewAsSingleton( + DataflowRunner runner, + CreatePCollectionView<T, T> transform, + CombineFn<T, ?, T> combineFn, + int fanout) { this.runner = runner; this.view = transform.getView(); + this.combineFn = combineFn; + this.fanout = fanout; } @Override public PCollection<?> expand(PCollection<T> input) { + input = input.apply(Combine.globally(combineFn).withoutDefaults().withFanout(fanout)); @SuppressWarnings("unchecked") Coder<BoundedWindow> windowCoder = (Coder<BoundedWindow>) input.getWindowingStrategy().getWindowFn().windowCoder(); http://git-wip-us.apache.org/repos/asf/beam/blob/accb2087/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java index 10888c2..f64f3fb 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/CreateDataflowView.java @@ -25,11 +25,13 @@ import org.apache.beam.sdk.values.PCollectionView; /** A {@link DataflowRunner} marker class for creating a {@link PCollectionView}. */ public class CreateDataflowView<ElemT, ViewT> extends PTransform<PCollection<ElemT>, PCollection<ElemT>> { - public static <ElemT, ViewT> CreateDataflowView<ElemT, ViewT> forBatch(PCollectionView<ViewT> view) { + public static <ElemT, ViewT> CreateDataflowView<ElemT, ViewT> forBatch( + PCollectionView<ViewT> view) { return new CreateDataflowView<>(view, false); } - public static <ElemT, ViewT> CreateDataflowView<ElemT, ViewT> forStreaming(PCollectionView<ViewT> view) { + public static <ElemT, ViewT> CreateDataflowView<ElemT, ViewT> forStreaming( + PCollectionView<ViewT> view) { return new CreateDataflowView<>(view, true); } http://git-wip-us.apache.org/repos/asf/beam/blob/accb2087/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 72e4f83..a650092 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -116,6 +116,7 @@ import org.apache.beam.sdk.runners.TransformHierarchy.Node; import org.apache.beam.sdk.state.MapState; import org.apache.beam.sdk.state.SetState; import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.Combine.GroupedValues; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; @@ -406,9 +407,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { BatchViewOverrides.BatchViewAsMultimap.class, this))) .add( PTransformOverride.of( - PTransformMatchers.classEqualTo(View.AsSingleton.class), - new ReflectiveViewOverrideFactory( - BatchViewOverrides.BatchViewAsSingleton.class, this))) + PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class), + new CombineGloballyAsSingletonViewOverrideFactory(this))) .add( PTransformOverride.of( PTransformMatchers.classEqualTo(View.AsList.class), @@ -437,29 +437,58 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { } /** - * Replace the View.AsYYY transform with specialized view overrides for Dataflow. It is required that the - * new replacement transform uses the supplied PCollectionView and does not create another instance. + * Replace the {@link Combine.GloballyAsSingletonView} transform with a specialization which + * re-applies the {@link CombineFn} and adds a specialization specific to the Dataflow runner. + */ + private static class CombineGloballyAsSingletonViewOverrideFactory<InputT, ViewT> + extends ReflectiveViewOverrideFactory<InputT, ViewT> { + + private CombineGloballyAsSingletonViewOverrideFactory(DataflowRunner runner) { + super((Class) BatchViewOverrides.BatchViewAsSingleton.class, runner); + } + + @Override + public PTransformReplacement<PCollection<InputT>, PValue> getReplacementTransform( + AppliedPTransform< + PCollection<InputT>, + PValue, + PTransform<PCollection<InputT>, PValue>> transform) { + Combine.GloballyAsSingletonView<?, ?> combineTransform = + (Combine.GloballyAsSingletonView) transform.getTransform(); + return PTransformReplacement.of( + PTransformReplacements.getSingletonMainInput(transform), + new BatchViewOverrides.BatchViewAsSingleton( + runner, + findCreatePCollectionView(transform), + (CombineFn) combineTransform.getCombineFn(), + combineTransform.getFanout())); + } + } + + /** + * Replace the View.AsYYY transform with specialized view overrides for Dataflow. It is required + * that the new replacement transform uses the supplied PCollectionView and does not create + * another instance. */ private static class ReflectiveViewOverrideFactory<InputT, ViewT> implements PTransformOverrideFactory<PCollection<InputT>, PValue, PTransform<PCollection<InputT>, PValue>> { - private final Class<PTransform<PCollection<InputT>, PCollectionView<ViewT>>> replacement; - private final DataflowRunner runner; + final Class<PTransform<PCollection<InputT>, PValue>> replacement; + final DataflowRunner runner; private ReflectiveViewOverrideFactory( - Class<PTransform<PCollection<InputT>, PCollectionView<ViewT>>> replacement, + Class<PTransform<PCollection<InputT>, PValue>> replacement, DataflowRunner runner) { this.replacement = replacement; this.runner = runner; } - @Override - public PTransformReplacement<PCollection<InputT>, PValue> getReplacementTransform( - final AppliedPTransform<PCollection<InputT>, - PValue, - PTransform<PCollection<InputT>, PValue>> transform) { - + CreatePCollectionView<ViewT, ViewT> findCreatePCollectionView( + final AppliedPTransform< + PCollection<InputT>, + PValue, + PTransform<PCollection<InputT>, PValue>> transform) { final AtomicReference<CreatePCollectionView> viewTransformRef = new AtomicReference<>(); transform.getPipeline().traverseTopologically(new PipelineVisitor.Defaults() { // Stores whether we have entered the expected composite view transform. @@ -495,18 +524,29 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { checkState(viewTransformRef.get() != null, "Expected to find CreatePCollectionView contained within %s", transform.getTransform()); - PTransform<PCollection<InputT>, PCollectionView<ViewT>> rep = + return viewTransformRef.get(); + } + + @Override + public PTransformReplacement<PCollection<InputT>, PValue> getReplacementTransform( + final AppliedPTransform<PCollection<InputT>, + PValue, + PTransform<PCollection<InputT>, PValue>> transform) { + + PTransform<PCollection<InputT>, PValue> rep = InstanceBuilder.ofType(replacement) .withArg(DataflowRunner.class, runner) - .withArg(CreatePCollectionView.class, viewTransformRef.get()) + .withArg(CreatePCollectionView.class, findCreatePCollectionView(transform)) .build(); - return PTransformReplacement.of(PTransformReplacements.getSingletonMainInput(transform), (PTransform) rep); + return PTransformReplacement.of( + PTransformReplacements.getSingletonMainInput(transform), (PTransform) rep); } @Override - public Map<PValue, ReplacementOutput> mapOutputs(Map<TupleTag<?>, PValue> outputs, PValue newOutput) { - // We do not replace any of the outputs because we expect that the new PTransform will re-use the original - // PCollectionView that was returned. + public Map<PValue, ReplacementOutput> mapOutputs( + Map<TupleTag<?>, PValue> outputs, PValue newOutput) { + // We do not replace any of the outputs because we expect that the new PTransform will + // re-use the original PCollectionView that was returned. return ImmutableMap.of(); } } http://git-wip-us.apache.org/repos/asf/beam/blob/accb2087/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index e03abb9..81e7a97 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -991,15 +991,15 @@ public class DataflowPipelineTranslatorTest implements Serializable { assertAllStepOutputsHaveUniqueIds(job); List<Step> steps = job.getSteps(); - assertEquals(5, steps.size()); + assertEquals(9, steps.size()); @SuppressWarnings("unchecked") List<Map<String, Object>> toIsmRecordOutputs = - (List<Map<String, Object>>) steps.get(3).getProperties().get(PropertyNames.OUTPUT_INFO); + (List<Map<String, Object>>) steps.get(7).getProperties().get(PropertyNames.OUTPUT_INFO); assertTrue( Structs.getBoolean(Iterables.getOnlyElement(toIsmRecordOutputs), "use_indexed_format")); - Step collectionToSingletonStep = steps.get(4); + Step collectionToSingletonStep = steps.get(8); assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind()); }
