Update Apex Overrides Only override CreatePCollectionView transforms
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8eb09aad Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8eb09aad Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8eb09aad Branch: refs/heads/master Commit: 8eb09aad9c975f787ba8afac83394cc8b56eb94f Parents: bd1dfdf Author: Thomas Groh <[email protected]> Authored: Thu May 25 10:41:56 2017 -0700 Committer: Thomas Groh <[email protected]> Committed: Fri May 26 07:50:37 2017 -0700 ---------------------------------------------------------------------- .../apache/beam/runners/apex/ApexRunner.java | 119 ++++--------------- 1 file changed, 21 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/8eb09aad/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java index f91d8e5..c595b3f 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java @@ -57,7 +57,6 @@ import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.runners.PTransformOverride; import org.apache.beam.sdk.runners.PTransformOverrideFactory; import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.Combine.GloballyAsSingletonView; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; @@ -65,7 +64,7 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.ParDo.MultiOutput; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.View.AsIterable; -import org.apache.beam.sdk.transforms.View.AsSingleton; +import org.apache.beam.sdk.transforms.View.CreatePCollectionView; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PCollectionView; @@ -111,16 +110,12 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> { new PrimitiveCreate.Factory())) .add( PTransformOverride.of( - PTransformMatchers.classEqualTo(View.AsSingleton.class), - new StreamingViewAsSingleton.Factory())) - .add( - PTransformOverride.of( - PTransformMatchers.classEqualTo(View.AsIterable.class), + PTransformMatchers.createViewWithViewFn(PCollectionViews.IterableViewFn.class), new StreamingViewAsIterable.Factory())) .add( PTransformOverride.of( - PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class), - new StreamingCombineGloballyAsSingletonView.Factory())) + PTransformMatchers.createViewWithViewFn(PCollectionViews.SingletonViewFn.class), + new StreamingWrapSingletonInList.Factory())) .add( PTransformOverride.of( PTransformMatchers.splittableParDoMulti(), @@ -245,117 +240,45 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> { } } - private static class StreamingCombineGloballyAsSingletonView<InputT, OutputT> - extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> { + private static class StreamingWrapSingletonInList<T> + extends PTransform<PCollection<T>, PCollectionView<T>> { private static final long serialVersionUID = 1L; - Combine.GloballyAsSingletonView<InputT, OutputT> transform; + CreatePCollectionView<T, T> transform; /** * Builds an instance of this class from the overridden transform. */ - private StreamingCombineGloballyAsSingletonView( - Combine.GloballyAsSingletonView<InputT, OutputT> transform) { + private StreamingWrapSingletonInList( + CreatePCollectionView<T, T> transform) { this.transform = transform; } @Override - public PCollectionView<OutputT> expand(PCollection<InputT> input) { - PCollection<OutputT> combined = input - .apply(Combine.globally(transform.getCombineFn()) - .withoutDefaults().withFanout(transform.getFanout())); - - PCollectionView<OutputT> view = PCollectionViews.singletonView(combined, - combined.getWindowingStrategy(), transform.getInsertDefault(), - transform.getInsertDefault() ? transform.getCombineFn().defaultValue() : null, - combined.getCoder()); - return combined.apply(ParDo.of(new WrapAsList<OutputT>())) - .apply(CreateApexPCollectionView.<OutputT, OutputT> of(view)); + public PCollectionView<T> expand(PCollection<T> input) { + return input + .apply(ParDo.of(new WrapAsList<T>())) + .apply(CreateApexPCollectionView.<T, T>of(transform.getView())); } @Override protected String getKindString() { - return "StreamingCombineGloballyAsSingletonView"; + return "StreamingWrapSingletonInList"; } - static class Factory<InputT, OutputT> + static class Factory<T> extends SingleInputOutputOverrideFactory< - PCollection<InputT>, PCollectionView<OutputT>, - Combine.GloballyAsSingletonView<InputT, OutputT>> { + PCollection<T>, PCollectionView<T>, + CreatePCollectionView<T, T>> { @Override - public PTransformReplacement<PCollection<InputT>, PCollectionView<OutputT>> + public PTransformReplacement<PCollection<T>, PCollectionView<T>> getReplacementTransform( AppliedPTransform< - PCollection<InputT>, PCollectionView<OutputT>, - GloballyAsSingletonView<InputT, OutputT>> + PCollection<T>, PCollectionView<T>, + CreatePCollectionView<T, T>> transform) { return PTransformReplacement.of( PTransformReplacements.getSingletonMainInput(transform), - new StreamingCombineGloballyAsSingletonView<>(transform.getTransform())); - } - } - } - - private static class StreamingViewAsSingleton<T> - extends PTransform<PCollection<T>, PCollectionView<T>> { - private static final long serialVersionUID = 1L; - - private View.AsSingleton<T> transform; - - public StreamingViewAsSingleton(View.AsSingleton<T> transform) { - this.transform = transform; - } - - @Override - public PCollectionView<T> expand(PCollection<T> input) { - Combine.Globally<T, T> combine = Combine - .globally(new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue())); - if (!transform.hasDefaultValue()) { - combine = combine.withoutDefaults(); - } - return input.apply(combine.asSingletonView()); - } - - @Override - protected String getKindString() { - return "StreamingViewAsSingleton"; - } - - private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> { - private boolean hasDefaultValue; - private T defaultValue; - - SingletonCombine(boolean hasDefaultValue, T defaultValue) { - this.hasDefaultValue = hasDefaultValue; - this.defaultValue = defaultValue; - } - - @Override - public T apply(T left, T right) { - throw new IllegalArgumentException("PCollection with more than one element " - + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to " - + "combine the PCollection into a single value"); - } - - @Override - public T identity() { - if (hasDefaultValue) { - return defaultValue; - } else { - throw new IllegalArgumentException("Empty PCollection accessed as a singleton view. " - + "Consider setting withDefault to provide a default value"); - } - } - } - - static class Factory<T> - extends SingleInputOutputOverrideFactory< - PCollection<T>, PCollectionView<T>, View.AsSingleton<T>> { - @Override - public PTransformReplacement<PCollection<T>, PCollectionView<T>> getReplacementTransform( - AppliedPTransform<PCollection<T>, PCollectionView<T>, AsSingleton<T>> transform) { - return PTransformReplacement.of( - PTransformReplacements.getSingletonMainInput(transform), - new StreamingViewAsSingleton<>(transform.getTransform())); + new StreamingWrapSingletonInList<>(transform.getTransform())); } } }
