Use Batch Replacement in the Flink Runner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d174a241 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d174a241 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d174a241 Branch: refs/heads/master Commit: d174a241f0da4e6a9271f75aeb0198d192dc3d46 Parents: 018513e Author: Thomas Groh <[email protected]> Authored: Thu Mar 30 15:53:16 2017 -0700 Committer: Thomas Groh <[email protected]> Committed: Mon Apr 3 11:45:51 2017 -0700 ---------------------------------------------------------------------- .../flink/FlinkStreamingPipelineTranslator.java | 81 +++++++++++--------- 1 file changed, 43 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/d174a241/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java index 0cedf66..8b5637e 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java @@ -17,7 +17,7 @@ */ package org.apache.beam.runners.flink; -import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import java.util.List; import java.util.Map; @@ -27,13 +27,13 @@ import org.apache.beam.runners.core.construction.ReplacementOutputs; import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.runners.PTransformMatcher; import org.apache.beam.sdk.runners.PTransformOverride; import org.apache.beam.sdk.runners.PTransformOverrideFactory; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.ParDo.MultiOutput; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.util.InstanceBuilder; import org.apache.beam.sdk.values.PCollection; @@ -71,43 +71,48 @@ class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator { @Override public void translate(Pipeline pipeline) { - Map<PTransformMatcher, PTransformOverrideFactory> transformOverrides = - ImmutableMap.<PTransformMatcher, PTransformOverrideFactory>builder() - .put( - PTransformMatchers.splittableParDoMulti(), new SplittableParDoOverrideFactory()) - .put( - PTransformMatchers.classEqualTo(View.AsIterable.class), - new ReflectiveOneToOneOverrideFactory( - FlinkStreamingViewOverrides.StreamingViewAsIterable.class, flinkRunner)) - .put( - PTransformMatchers.classEqualTo(View.AsList.class), - new ReflectiveOneToOneOverrideFactory( - FlinkStreamingViewOverrides.StreamingViewAsList.class, flinkRunner)) - .put( - PTransformMatchers.classEqualTo(View.AsMap.class), - new ReflectiveOneToOneOverrideFactory( - FlinkStreamingViewOverrides.StreamingViewAsMap.class, flinkRunner)) - .put( - PTransformMatchers.classEqualTo(View.AsMultimap.class), - new ReflectiveOneToOneOverrideFactory( - FlinkStreamingViewOverrides.StreamingViewAsMultimap.class, flinkRunner)) - .put( - PTransformMatchers.classEqualTo(View.AsSingleton.class), - new ReflectiveOneToOneOverrideFactory( - FlinkStreamingViewOverrides.StreamingViewAsSingleton.class, flinkRunner)) + List<PTransformOverride> transformOverrides = + ImmutableList.<PTransformOverride>builder() + .add( + PTransformOverride.of( + PTransformMatchers.splittableParDoMulti(), + new SplittableParDoOverrideFactory())) + .add( + PTransformOverride.of( + PTransformMatchers.classEqualTo(View.AsIterable.class), + new ReflectiveOneToOneOverrideFactory( + FlinkStreamingViewOverrides.StreamingViewAsIterable.class, flinkRunner))) + .add( + PTransformOverride.of( + PTransformMatchers.classEqualTo(View.AsList.class), + new ReflectiveOneToOneOverrideFactory( + FlinkStreamingViewOverrides.StreamingViewAsList.class, flinkRunner))) + .add( + PTransformOverride.of( + PTransformMatchers.classEqualTo(View.AsMap.class), + new ReflectiveOneToOneOverrideFactory( + FlinkStreamingViewOverrides.StreamingViewAsMap.class, flinkRunner))) + .add( + PTransformOverride.of( + PTransformMatchers.classEqualTo(View.AsMultimap.class), + new ReflectiveOneToOneOverrideFactory( + FlinkStreamingViewOverrides.StreamingViewAsMultimap.class, flinkRunner))) + .add( + PTransformOverride.of( + PTransformMatchers.classEqualTo(View.AsSingleton.class), + new ReflectiveOneToOneOverrideFactory( + FlinkStreamingViewOverrides.StreamingViewAsSingleton.class, flinkRunner))) // this has to be last since the ViewAsSingleton override // can expand to a Combine.GloballyAsSingletonView - .put( - PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class), - new ReflectiveOneToOneOverrideFactory( - FlinkStreamingViewOverrides.StreamingCombineGloballyAsSingletonView.class, - flinkRunner)) - .build(); - - for (Map.Entry<PTransformMatcher, PTransformOverrideFactory> override : - transformOverrides.entrySet()) { - pipeline.replace(PTransformOverride.of(override.getKey(), override.getValue())); - } + .add( + PTransformOverride.of( + PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class), + new ReflectiveOneToOneOverrideFactory( + FlinkStreamingViewOverrides.StreamingCombineGloballyAsSingletonView.class, + flinkRunner))) + .build(); + + pipeline.replaceAll(transformOverrides); super.translate(pipeline); } @@ -245,7 +250,7 @@ class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator { */ static class SplittableParDoOverrideFactory<InputT, OutputT> implements PTransformOverrideFactory< - PCollection<? extends InputT>, PCollectionTuple, ParDo.MultiOutput<InputT, OutputT>> { + PCollection<? extends InputT>, PCollectionTuple, MultiOutput<InputT, OutputT>> { @Override @SuppressWarnings("unchecked") public PTransform<PCollection<? extends InputT>, PCollectionTuple> getReplacementTransform(
