Improve Splittable ParDo translation
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1b00d95a Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1b00d95a Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1b00d95a Branch: refs/heads/DSL_SQL Commit: 1b00d95a1105d2611b985dc463da0884a6646354 Parents: 840492d Author: Kenneth Knowles <[email protected]> Authored: Thu May 25 06:29:16 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Tue Jun 6 13:13:12 2017 -0700 ---------------------------------------------------------------------- .../core/construction/ParDoTranslation.java | 20 +++++++++++ .../core/construction/SplittableParDo.java | 18 ++++++++-- .../core/construction/ParDoTranslationTest.java | 35 +++++++++++++++++++- .../core/SplittableParDoViaKeyedWorkItems.java | 10 +++++- .../direct/KeyedPValueTrackingVisitor.java | 2 +- .../src/main/proto/beam_runner_api.proto | 3 ++ 6 files changed, 82 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/1b00d95a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java index fe66179..34e0d86 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java @@ -144,6 +144,7 @@ public class ParDoTranslation { ParDoPayload.Builder builder = ParDoPayload.newBuilder(); builder.setDoFn(toProto(parDo.getFn(), parDo.getMainOutputTag())); + builder.setSplittable(signature.processElement().isSplittable()); for (PCollectionView<?> sideInput : parDo.getSideInputs()) { builder.putSideInputs(sideInput.getTagInternal().getId(), toProto(sideInput)); } @@ -496,6 +497,25 @@ public class ParDoTranslation { .build(); } + private static <T> ParDoPayload getParDoPayload(AppliedPTransform<?, ?, ?> transform) + throws IOException { + return PTransformTranslation.toProto( + transform, Collections.<AppliedPTransform<?, ?, ?>>emptyList(), SdkComponents.create()) + .getSpec() + .getParameter() + .unpack(ParDoPayload.class); + } + + public static boolean usesStateOrTimers(AppliedPTransform<?, ?, ?> transform) throws IOException { + ParDoPayload payload = getParDoPayload(transform); + return payload.getStateSpecsCount() > 0 || payload.getTimerSpecsCount() > 0; + } + + public static boolean isSplittable(AppliedPTransform<?, ?, ?> transform) throws IOException { + ParDoPayload payload = getParDoPayload(transform); + return payload.getSplittable(); + } + private static ViewFn<?, ?> viewFnFromProto(SdkFunctionSpec viewFn) throws InvalidProtocolBufferException { FunctionSpec spec = viewFn.getSpec(); http://git-wip-us.apache.org/repos/asf/beam/blob/1b00d95a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java index dfca7d2..665e39d 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java @@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import java.util.List; import java.util.UUID; +import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.DoFn; @@ -67,6 +68,12 @@ public class SplittableParDo<InputT, OutputT, RestrictionT> public static final String SPLITTABLE_PROCESS_URN = "urn:beam:runners_core:transforms:splittable_process:v1"; + public static final String SPLITTABLE_PROCESS_KEYED_ELEMENTS_URN = + "urn:beam:runners_core:transforms:splittable_process_keyed_elements:v1"; + + public static final String SPLITTABLE_GBKIKWI_URN = + "urn:beam:runners_core:transforms:splittable_gbkikwi:v1"; + /** * Creates the transform for the given original multi-output {@link ParDo}. * @@ -133,11 +140,11 @@ public class SplittableParDo<InputT, OutputT, RestrictionT> /** * Runner-specific primitive {@link PTransform} that invokes the {@link DoFn.ProcessElement} - * method for a splittable {@link DoFn} on each {@link ElementAndRestriction} of the input - * {@link PCollection} of {@link KV KVs} keyed with arbitrary but globally unique keys. + * method for a splittable {@link DoFn} on each {@link ElementAndRestriction} of the input {@link + * PCollection} of {@link KV KVs} keyed with arbitrary but globally unique keys. */ public static class ProcessKeyedElements<InputT, OutputT, RestrictionT> - extends PTransform< + extends RawPTransform< PCollection<KV<String, ElementAndRestriction<InputT, RestrictionT>>>, PCollectionTuple> { private final DoFn<InputT, OutputT> fn; private final Coder<InputT> elementCoder; @@ -227,6 +234,11 @@ public class SplittableParDo<InputT, OutputT, RestrictionT> return outputs; } + + @Override + public String getUrn() { + return SPLITTABLE_PROCESS_KEYED_ELEMENTS_URN; + } } /** http://git-wip-us.apache.org/repos/asf/beam/blob/1b00d95a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java index 46f6a80..a8490bf 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java @@ -51,6 +51,7 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.ParDo.MultiOutput; import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -111,7 +112,11 @@ public class ParDoTranslationTest { ParDo.of(new DropElementsFn()) .withOutputTags( new TupleTag<Void>(), - TupleTagList.of(new TupleTag<byte[]>() {}).and(new TupleTag<Integer>() {}))); + TupleTagList.of(new TupleTag<byte[]>() {}).and(new TupleTag<Integer>() {})), + ParDo.of(new SplittableDropElementsFn()) + .withOutputTags( + new TupleTag<Void>(), + TupleTagList.empty())); } @Parameter(0) @@ -235,6 +240,34 @@ public class ParDoTranslationTest { } } + private static class SplittableDropElementsFn extends DoFn<KV<Long, String>, Void> { + @ProcessElement + public void proc(ProcessContext context, RestrictionTracker<Integer> restriction) { + context.output(null); + } + + @GetInitialRestriction + public Integer restriction(KV<Long, String> elem) { + return 42; + } + + @NewTracker + public RestrictionTracker<Integer> newTracker(Integer restriction) { + throw new UnsupportedOperationException("Should never be called; only to test translation"); + } + + + @Override + public boolean equals(Object other) { + return other instanceof SplittableDropElementsFn; + } + + @Override + public int hashCode() { + return SplittableDropElementsFn.class.hashCode(); + } + } + @SuppressWarnings("unused") private static class StateTimerDropElementsFn extends DoFn<KV<Long, String>, Void> { private static final String BAG_STATE_ID = "bagState"; http://git-wip-us.apache.org/repos/asf/beam/blob/1b00d95a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java index b38e364..c4b086a 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java @@ -23,7 +23,9 @@ import java.util.List; import java.util.Map; import org.apache.beam.runners.core.construction.ElementAndRestriction; import org.apache.beam.runners.core.construction.PTransformReplacements; +import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform; import org.apache.beam.runners.core.construction.ReplacementOutputs; +import org.apache.beam.runners.core.construction.SplittableParDo; import org.apache.beam.runners.core.construction.SplittableParDo.ProcessKeyedElements; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; @@ -67,12 +69,18 @@ public class SplittableParDoViaKeyedWorkItems { * emits output immediately. */ public static class GBKIntoKeyedWorkItems<KeyT, InputT> - extends PTransform<PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>> { + extends RawPTransform< + PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>> { @Override public PCollection<KeyedWorkItem<KeyT, InputT>> expand(PCollection<KV<KeyT, InputT>> input) { return PCollection.createPrimitiveOutputInternal( input.getPipeline(), WindowingStrategy.globalDefault(), input.isBounded()); } + + @Override + public String getUrn() { + return SplittableParDo.SPLITTABLE_GBKIKWI_URN; + } } /** Overrides a {@link ProcessKeyedElements} into {@link SplittableProcessViaKeyedWorkItems}. */ http://git-wip-us.apache.org/repos/asf/beam/blob/1b00d95a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java index f9b2dae..6eadaba 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java @@ -47,7 +47,7 @@ import org.apache.beam.sdk.values.TupleTag; class KeyedPValueTrackingVisitor extends PipelineVisitor.Defaults { private static final Set<Class<? extends PTransform>> PRODUCES_KEYED_OUTPUTS = - ImmutableSet.of( + ImmutableSet.<Class<? extends PTransform>>of( SplittableParDoViaKeyedWorkItems.GBKIntoKeyedWorkItems.class, DirectGroupByKeyOnly.class, DirectGroupAlsoByWindow.class); http://git-wip-us.apache.org/repos/asf/beam/blob/1b00d95a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto ---------------------------------------------------------------------- diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto index 87e33f0..039ecb0 100644 --- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto +++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto @@ -220,6 +220,9 @@ message ParDoPayload { // (Optional) A mapping of local timer names to timer specifications. map<string, TimerSpec> timer_specs = 5; + + // Whether the DoFn is splittable + bool splittable = 6; } // Parameters that a UDF might require.
