Port DirectRunner ParDo overrides to SDK-agnostic APIs
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/16d4a154 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/16d4a154 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/16d4a154 Branch: refs/heads/DSL_SQL Commit: 16d4a154d8667dd1ebdf4993e816c680f4c982e6 Parents: e5ca058 Author: Kenneth Knowles <[email protected]> Authored: Thu Jun 8 13:44:52 2017 -0700 Committer: Tyler Akidau <[email protected]> Committed: Wed Jul 12 20:01:01 2017 -0700 ---------------------------------------------------------------------- .../core/construction/ParDoTranslation.java | 16 ++++++--- .../construction/RunnerPCollectionView.java | 16 +++++++++ .../direct/ParDoMultiOverrideFactory.java | 35 +++++++++----------- 3 files changed, 43 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/16d4a154/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java index fe8c5aa..90c9aad 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java @@ -19,6 +19,7 @@ package org.apache.beam.runners.core.construction; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import static org.apache.beam.runners.core.construction.PTransformTranslation.PAR_DO_TRANSFORM_URN; @@ -262,12 +263,19 @@ public class ParDoTranslation { ParDoPayload payload = parDoProto.getSpec().getParameter().unpack(ParDoPayload.class); List<PCollectionView<?>> views = new ArrayList<>(); - for (Map.Entry<String, SideInput> sideInput : payload.getSideInputsMap().entrySet()) { + for (Map.Entry<String, SideInput> sideInputEntry : payload.getSideInputsMap().entrySet()) { + String sideInputTag = sideInputEntry.getKey(); + RunnerApi.SideInput sideInput = sideInputEntry.getValue(); + PCollection<?> originalPCollection = + checkNotNull( + (PCollection<?>) application.getInputs().get(new TupleTag<>(sideInputTag)), + "no input with tag %s", + sideInputTag); views.add( viewFromProto( - application.getPipeline(), - sideInput.getValue(), - sideInput.getKey(), + sideInput, + sideInputTag, + originalPCollection, parDoProto, sdkComponents.toComponents())); } http://git-wip-us.apache.org/repos/asf/beam/blob/16d4a154/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java index b275188..85139e8 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java @@ -19,6 +19,7 @@ package org.apache.beam.runners.core.construction; import java.util.Map; +import java.util.Objects; import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.common.runner.v1.RunnerApi.SideInput; @@ -94,4 +95,19 @@ class RunnerPCollectionView<T> extends PValueBase implements PCollectionView<T> throw new UnsupportedOperationException(String.format( "A %s cannot be expanded", RunnerPCollectionView.class.getSimpleName())); } + + @Override + public boolean equals(Object other) { + if (!(other instanceof PCollectionView)) { + return false; + } + @SuppressWarnings("unchecked") + PCollectionView<?> otherView = (PCollectionView<?>) other; + return tag.equals(otherView.getTagInternal()); + } + + @Override + public int hashCode() { + return Objects.hash(tag); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/16d4a154/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java index 8881967..891d102 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java @@ -38,7 +38,6 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.ParDo.MultiOutput; import org.apache.beam.sdk.transforms.reflect.DoFnSignature; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.windowing.AfterPane; @@ -73,9 +72,14 @@ class ParDoMultiOverrideFactory<InputT, OutputT> PCollection<? extends InputT>, PCollectionTuple, PTransform<PCollection<? extends InputT>, PCollectionTuple>> application) { - return PTransformReplacement.of( - PTransformReplacements.getSingletonMainInput(application), - getReplacementForApplication(application)); + + try { + return PTransformReplacement.of( + PTransformReplacements.getSingletonMainInput(application), + getReplacementForApplication(application)); + } catch (IOException exc) { + throw new RuntimeException(exc); + } } @SuppressWarnings("unchecked") @@ -83,31 +87,22 @@ class ParDoMultiOverrideFactory<InputT, OutputT> AppliedPTransform< PCollection<? extends InputT>, PCollectionTuple, PTransform<PCollection<? extends InputT>, PCollectionTuple>> - application) { + application) + throws IOException { - DoFn<InputT, OutputT> fn; - try { - fn = (DoFn<InputT, OutputT>) ParDoTranslation.getDoFn(application); - } catch (IOException exc) { - throw new RuntimeException(exc); - } + DoFn<InputT, OutputT> fn = (DoFn<InputT, OutputT>) ParDoTranslation.getDoFn(application); DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass()); + if (signature.processElement().isSplittable()) { return (PTransform) SplittableParDo.forAppliedParDo(application); } else if (signature.stateDeclarations().size() > 0 || signature.timerDeclarations().size() > 0) { - - MultiOutput<InputT, OutputT> transform = - (MultiOutput<InputT, OutputT>) application.getTransform(); - - // Based on the fact that the signature is stateful, DoFnSignatures ensures - // that it is also keyed return new GbkThenStatefulParDo( fn, - transform.getMainOutputTag(), - transform.getAdditionalOutputTags(), - transform.getSideInputs()); + ParDoTranslation.getMainOutputTag(application), + ParDoTranslation.getAdditionalOutputTags(application), + ParDoTranslation.getSideInputs(application)); } else { return application.getTransform(); }
