Fix getAdditionalInputs for SplittableParDo transforms
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a66bcd68 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a66bcd68 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a66bcd68 Branch: refs/heads/gearpump-runner Commit: a66bcd68a1e56d5d38fccfce2ffeec28ba1c82de Parents: 58fba59 Author: Kenneth Knowles <[email protected]> Authored: Tue Jun 13 10:00:09 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Tue Jun 27 21:08:10 2017 -0700 ---------------------------------------------------------------------- .../apache/beam/runners/apex/ApexRunner.java | 2 +- .../core/construction/SplittableParDo.java | 66 +++++++++++++++----- .../core/construction/SplittableParDoTest.java | 8 +-- .../direct/ParDoMultiOverrideFactory.java | 2 +- .../flink/FlinkStreamingPipelineTranslator.java | 2 +- .../dataflow/SplittableParDoOverrides.java | 2 +- 6 files changed, 57 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/a66bcd68/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java index 95b354a..fd0a1c9 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java @@ -381,7 +381,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> { AppliedPTransform<PCollection<InputT>, PCollectionTuple, MultiOutput<InputT, OutputT>> transform) { return PTransformReplacement.of(PTransformReplacements.getSingletonMainInput(transform), - new SplittableParDo<>(transform.getTransform())); + SplittableParDo.forJavaParDo(transform.getTransform())); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/a66bcd68/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java index 5ccafcb..f31b495 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java @@ -18,9 +18,9 @@ package org.apache.beam.runners.core.construction; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; import java.util.List; +import java.util.Map; import java.util.UUID; import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform; import org.apache.beam.sdk.annotations.Experimental; @@ -40,6 +40,8 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PCollectionViews; +import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.sdk.values.WindowingStrategy; @@ -64,7 +66,11 @@ import org.apache.beam.sdk.values.WindowingStrategy; @Experimental(Experimental.Kind.SPLITTABLE_DO_FN) public class SplittableParDo<InputT, OutputT, RestrictionT> extends PTransform<PCollection<InputT>, PCollectionTuple> { - private final ParDo.MultiOutput<InputT, OutputT> parDo; + + private final DoFn<InputT, OutputT> doFn; + private final List<PCollectionView<?>> sideInputs; + private final TupleTag<OutputT> mainOutputTag; + private final TupleTagList additionalOutputTags; public static final String SPLITTABLE_PROCESS_URN = "urn:beam:runners_core:transforms:splittable_process:v1"; @@ -75,24 +81,39 @@ public class SplittableParDo<InputT, OutputT, RestrictionT> public static final String SPLITTABLE_GBKIKWI_URN = "urn:beam:runners_core:transforms:splittable_gbkikwi:v1"; + private SplittableParDo( + DoFn<InputT, OutputT> doFn, + TupleTag<OutputT> mainOutputTag, + List<PCollectionView<?>> sideInputs, + TupleTagList additionalOutputTags) { + checkArgument( + DoFnSignatures.getSignature(doFn.getClass()).processElement().isSplittable(), + "fn must be a splittable DoFn"); + this.doFn = doFn; + this.mainOutputTag = mainOutputTag; + this.sideInputs = sideInputs; + this.additionalOutputTags = additionalOutputTags; + } + /** - * Creates the transform for the given original multi-output {@link ParDo}. + * Creates a {@link SplittableParDo} from an original Java {@link ParDo}. * * @param parDo The splittable {@link ParDo} transform. */ - public SplittableParDo(ParDo.MultiOutput<InputT, OutputT> parDo) { - checkNotNull(parDo, "parDo must not be null"); - this.parDo = parDo; - checkArgument( - DoFnSignatures.getSignature(parDo.getFn().getClass()).processElement().isSplittable(), - "fn must be a splittable DoFn"); + public static <InputT, OutputT> SplittableParDo<InputT, OutputT, ?> forJavaParDo( + ParDo.MultiOutput<InputT, OutputT> parDo) { + checkArgument(parDo != null, "parDo must not be null"); + return new SplittableParDo( + parDo.getFn(), + parDo.getMainOutputTag(), + parDo.getSideInputs(), + parDo.getAdditionalOutputTags()); } @Override public PCollectionTuple expand(PCollection<InputT> input) { - DoFn<InputT, OutputT> fn = parDo.getFn(); Coder<RestrictionT> restrictionCoder = - DoFnInvokers.invokerFor(fn) + DoFnInvokers.invokerFor(doFn) .invokeGetRestrictionCoder(input.getPipeline().getCoderRegistry()); Coder<KV<InputT, RestrictionT>> splitCoder = KvCoder.of(input.getCoder(), restrictionCoder); @@ -100,9 +121,10 @@ public class SplittableParDo<InputT, OutputT, RestrictionT> input .apply( "Pair with initial restriction", - ParDo.of(new PairWithRestrictionFn<InputT, OutputT, RestrictionT>(fn))) + ParDo.of(new PairWithRestrictionFn<InputT, OutputT, RestrictionT>(doFn))) .setCoder(splitCoder) - .apply("Split restriction", ParDo.of(new SplitRestrictionFn<InputT, RestrictionT>(fn))) + .apply( + "Split restriction", ParDo.of(new SplitRestrictionFn<InputT, RestrictionT>(doFn))) .setCoder(splitCoder) // ProcessFn requires all input elements to be in a single window and have a single // element per work item. This must precede the unique keying so each key has a single @@ -115,13 +137,18 @@ public class SplittableParDo<InputT, OutputT, RestrictionT> return keyedRestrictions.apply( "ProcessKeyedElements", new ProcessKeyedElements<>( - fn, + doFn, input.getCoder(), restrictionCoder, (WindowingStrategy<InputT, ?>) input.getWindowingStrategy(), - parDo.getSideInputs(), - parDo.getMainOutputTag(), - parDo.getAdditionalOutputTags())); + sideInputs, + mainOutputTag, + additionalOutputTags)); + } + + @Override + public Map<TupleTag<?>, PValue> getAdditionalInputs() { + return PCollectionViews.toAdditionalInputs(sideInputs); } /** @@ -231,6 +258,11 @@ public class SplittableParDo<InputT, OutputT, RestrictionT> } @Override + public Map<TupleTag<?>, PValue> getAdditionalInputs() { + return PCollectionViews.toAdditionalInputs(sideInputs); + } + + @Override public String getUrn() { return SPLITTABLE_PROCESS_KEYED_ELEMENTS_URN; } http://git-wip-us.apache.org/repos/asf/beam/blob/a66bcd68/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java index 6e4d6c4..f4c596e 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SplittableParDoTest.java @@ -122,14 +122,14 @@ public class SplittableParDoTest { "Applying a bounded SDF to a bounded collection produces a bounded collection", PCollection.IsBounded.BOUNDED, makeBoundedCollection(pipeline) - .apply("bounded to bounded", new SplittableParDo<>(makeParDo(boundedFn))) + .apply("bounded to bounded", SplittableParDo.forJavaParDo(makeParDo(boundedFn))) .get(MAIN_OUTPUT_TAG) .isBounded()); assertEquals( "Applying a bounded SDF to an unbounded collection produces an unbounded collection", PCollection.IsBounded.UNBOUNDED, makeUnboundedCollection(pipeline) - .apply("bounded to unbounded", new SplittableParDo<>(makeParDo(boundedFn))) + .apply("bounded to unbounded", SplittableParDo.forJavaParDo(makeParDo(boundedFn))) .get(MAIN_OUTPUT_TAG) .isBounded()); } @@ -143,14 +143,14 @@ public class SplittableParDoTest { "Applying an unbounded SDF to a bounded collection produces a bounded collection", PCollection.IsBounded.UNBOUNDED, makeBoundedCollection(pipeline) - .apply("unbounded to bounded", new SplittableParDo<>(makeParDo(unboundedFn))) + .apply("unbounded to bounded", SplittableParDo.forJavaParDo(makeParDo(unboundedFn))) .get(MAIN_OUTPUT_TAG) .isBounded()); assertEquals( "Applying an unbounded SDF to an unbounded collection produces an unbounded collection", PCollection.IsBounded.UNBOUNDED, makeUnboundedCollection(pipeline) - .apply("unbounded to unbounded", new SplittableParDo<>(makeParDo(unboundedFn))) + .apply("unbounded to unbounded", SplittableParDo.forJavaParDo(makeParDo(unboundedFn))) .get(MAIN_OUTPUT_TAG) .isBounded()); } http://git-wip-us.apache.org/repos/asf/beam/blob/a66bcd68/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java index b20113e..9a26283 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java @@ -81,7 +81,7 @@ class ParDoMultiOverrideFactory<InputT, OutputT> DoFn<InputT, OutputT> fn = transform.getFn(); DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass()); if (signature.processElement().isSplittable()) { - return new SplittableParDo(transform); + return (PTransform) SplittableParDo.forJavaParDo(transform); } else if (signature.stateDeclarations().size() > 0 || signature.timerDeclarations().size() > 0) { http://git-wip-us.apache.org/repos/asf/beam/blob/a66bcd68/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java index 27bb4ec..ebc9345 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java @@ -188,7 +188,7 @@ class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator { transform) { return PTransformReplacement.of( PTransformReplacements.getSingletonMainInput(transform), - new SplittableParDo<>(transform.getTransform())); + SplittableParDo.forJavaParDo(transform.getTransform())); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/a66bcd68/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java index 9322878..fc010f8 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/SplittableParDoOverrides.java @@ -64,7 +64,7 @@ class SplittableParDoOverrides { appliedTransform) { return PTransformReplacement.of( PTransformReplacements.getSingletonMainInput(appliedTransform), - new SplittableParDo<>(appliedTransform.getTransform())); + SplittableParDo.forJavaParDo(appliedTransform.getTransform())); } @Override
