Enable SplittableParDo on rehydrated ParDo transform
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e5ca058b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e5ca058b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e5ca058b Branch: refs/heads/DSL_SQL Commit: e5ca058bd7ad5f2150fef3e57649bcfb487a711f Parents: bdece9d Author: Kenneth Knowles <[email protected]> Authored: Thu Jun 8 14:27:02 2017 -0700 Committer: Tyler Akidau <[email protected]> Committed: Wed Jul 12 20:01:01 2017 -0700 ---------------------------------------------------------------------- .../core/construction/SplittableParDo.java | 25 ++++++++++++++ .../direct/ParDoMultiOverrideFactory.java | 36 ++++++++++++++------ .../flink/FlinkStreamingPipelineTranslator.java | 2 +- 3 files changed, 52 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/e5ca058b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java index f31b495..e71187b 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java @@ -19,6 +19,7 @@ package org.apache.beam.runners.core.construction; import static com.google.common.base.Preconditions.checkArgument; +import java.io.IOException; import java.util.List; import java.util.Map; import java.util.UUID; @@ -26,6 +27,7 @@ import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTrans import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -103,6 +105,9 @@ public class SplittableParDo<InputT, OutputT, RestrictionT> public static <InputT, OutputT> SplittableParDo<InputT, OutputT, ?> forJavaParDo( ParDo.MultiOutput<InputT, OutputT> parDo) { checkArgument(parDo != null, "parDo must not be null"); + checkArgument( + DoFnSignatures.getSignature(parDo.getFn().getClass()).processElement().isSplittable(), + "fn must be a splittable DoFn"); return new SplittableParDo( parDo.getFn(), parDo.getMainOutputTag(), @@ -110,6 +115,26 @@ public class SplittableParDo<InputT, OutputT, RestrictionT> parDo.getAdditionalOutputTags()); } + /** + * Creates the transform for a {@link ParDo}-compatible {@link AppliedPTransform}. + * + * <p>The input may generally be a deserialized transform so it may not actually be a {@link + * ParDo}. Instead {@link ParDoTranslation} will be used to extract fields. + */ + public static SplittableParDo<?, ?, ?> forAppliedParDo(AppliedPTransform<?, ?, ?> parDo) { + checkArgument(parDo != null, "parDo must not be null"); + + try { + return new SplittableParDo<>( + ParDoTranslation.getDoFn(parDo), + (TupleTag) ParDoTranslation.getMainOutputTag(parDo), + ParDoTranslation.getSideInputs(parDo), + ParDoTranslation.getAdditionalOutputTags(parDo)); + } catch (IOException exc) { + throw new RuntimeException(exc); + } + } + @Override public PCollectionTuple expand(PCollection<InputT> input) { Coder<RestrictionT> restrictionCoder = http://git-wip-us.apache.org/repos/asf/beam/blob/e5ca058b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java index 2904bc1..8881967 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java @@ -19,6 +19,7 @@ package org.apache.beam.runners.direct; import static com.google.common.base.Preconditions.checkState; +import java.io.IOException; import java.util.List; import java.util.Map; import org.apache.beam.runners.core.KeyedWorkItem; @@ -26,6 +27,7 @@ import org.apache.beam.runners.core.KeyedWorkItemCoder; import org.apache.beam.runners.core.KeyedWorkItems; import org.apache.beam.runners.core.construction.PTransformReplacements; import org.apache.beam.runners.core.construction.PTransformTranslation; +import org.apache.beam.runners.core.construction.ParDoTranslation; import org.apache.beam.runners.core.construction.ReplacementOutputs; import org.apache.beam.runners.core.construction.SplittableParDo; import org.apache.beam.sdk.coders.Coder; @@ -62,29 +64,43 @@ import org.apache.beam.sdk.values.WindowingStrategy; */ class ParDoMultiOverrideFactory<InputT, OutputT> implements PTransformOverrideFactory< - PCollection<? extends InputT>, PCollectionTuple, MultiOutput<InputT, OutputT>> { + PCollection<? extends InputT>, PCollectionTuple, + PTransform<PCollection<? extends InputT>, PCollectionTuple>> { @Override public PTransformReplacement<PCollection<? extends InputT>, PCollectionTuple> getReplacementTransform( AppliedPTransform< - PCollection<? extends InputT>, PCollectionTuple, MultiOutput<InputT, OutputT>> - transform) { + PCollection<? extends InputT>, PCollectionTuple, + PTransform<PCollection<? extends InputT>, PCollectionTuple>> + application) { return PTransformReplacement.of( - PTransformReplacements.getSingletonMainInput(transform), - getReplacementTransform(transform.getTransform())); + PTransformReplacements.getSingletonMainInput(application), + getReplacementForApplication(application)); } @SuppressWarnings("unchecked") - private PTransform<PCollection<? extends InputT>, PCollectionTuple> getReplacementTransform( - MultiOutput<InputT, OutputT> transform) { + private PTransform<PCollection<? extends InputT>, PCollectionTuple> getReplacementForApplication( + AppliedPTransform< + PCollection<? extends InputT>, PCollectionTuple, + PTransform<PCollection<? extends InputT>, PCollectionTuple>> + application) { + + DoFn<InputT, OutputT> fn; + try { + fn = (DoFn<InputT, OutputT>) ParDoTranslation.getDoFn(application); + } catch (IOException exc) { + throw new RuntimeException(exc); + } - DoFn<InputT, OutputT> fn = transform.getFn(); DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass()); if (signature.processElement().isSplittable()) { - return (PTransform) SplittableParDo.forJavaParDo(transform); + return (PTransform) SplittableParDo.forAppliedParDo(application); } else if (signature.stateDeclarations().size() > 0 || signature.timerDeclarations().size() > 0) { + MultiOutput<InputT, OutputT> transform = + (MultiOutput<InputT, OutputT>) application.getTransform(); + // Based on the fact that the signature is stateful, DoFnSignatures ensures // that it is also keyed return new GbkThenStatefulParDo( @@ -93,7 +109,7 @@ class ParDoMultiOverrideFactory<InputT, OutputT> transform.getAdditionalOutputTags(), transform.getSideInputs()); } else { - return transform; + return application.getTransform(); } } http://git-wip-us.apache.org/repos/asf/beam/blob/e5ca058b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java index ebc9345..f733e2e 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java @@ -188,7 +188,7 @@ class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator { transform) { return PTransformReplacement.of( PTransformReplacements.getSingletonMainInput(transform), - SplittableParDo.forJavaParDo(transform.getTransform())); + (SplittableParDo<InputT, OutputT, ?>) SplittableParDo.forAppliedParDo(transform)); } @Override
