[BEAM-2421] Swap to use an Impulse primitive + DoFn for Create when executing with the Fn API.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1cdb80cb Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1cdb80cb Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1cdb80cb Branch: refs/heads/DSL_SQL Commit: 1cdb80cb6319c04fa94961c14c038a5e15736d68 Parents: 5f7e73b Author: Luke Cwik <[email protected]> Authored: Wed Jun 7 08:53:14 2017 -0700 Committer: Luke Cwik <[email protected]> Committed: Wed Jun 7 13:41:20 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/dataflow/DataflowRunner.java | 158 +++++++++++++++++-- 1 file changed, 145 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/1cdb80cb/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index ed29330..3e7c8ce 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -49,6 +49,7 @@ import java.net.URLClassLoader; import java.nio.channels.Channels; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -57,6 +58,7 @@ import java.util.Random; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; +import org.apache.beam.runners.core.construction.CoderTranslation; import org.apache.beam.runners.core.construction.DeduplicatedFlattenFactory; import org.apache.beam.runners.core.construction.EmptyFlattenAsCreateFactory; import org.apache.beam.runners.core.construction.PTransformMatchers; @@ -79,10 +81,12 @@ import org.apache.beam.sdk.Pipeline.PipelineVisitor; import org.apache.beam.sdk.PipelineResult.State; import org.apache.beam.sdk.PipelineRunner; import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.NonDeterministicException; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.apache.beam.sdk.extensions.gcp.storage.PathValidator; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.FileSystems; @@ -103,6 +107,7 @@ import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.runners.TransformHierarchy.Node; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Combine.GroupedValues; +import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -113,6 +118,7 @@ import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.InstanceBuilder; import org.apache.beam.sdk.util.MimeTypes; import org.apache.beam.sdk.util.NameUtils; @@ -312,6 +318,12 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { PTransformMatchers.classEqualTo(PubsubUnboundedSink.class), new StreamingPubsubIOWriteOverrideFactory(this))); } + if (hasExperiment(options, "beam_fn_api")) { + overridesBuilder.add( + PTransformOverride.of( + PTransformMatchers.classEqualTo(Create.Values.class), + new StreamingFnApiCreateOverrideFactory())); + } overridesBuilder .add( // Streaming Bounded Read is implemented in terms of Streaming Unbounded Read, and @@ -428,15 +440,12 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { public PTransformReplacement<PBegin, PCollection<T>> getReplacementTransform( AppliedPTransform<PBegin, PCollection<T>, PTransform<PInput, PCollection<T>>> transform) { PTransform<PInput, PCollection<T>> original = transform.getTransform(); - PCollection<T> output = - (PCollection) Iterables.getOnlyElement(transform.getOutputs().values()); return PTransformReplacement.of( transform.getPipeline().begin(), InstanceBuilder.ofType(replacement) .withArg(DataflowRunner.class, runner) .withArg( (Class<? super PTransform<PInput, PCollection<T>>>) original.getClass(), original) - .withArg((Class<? super PCollection<T>>) output.getClass(), output) .build()); } @@ -814,10 +823,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { /** Builds an instance of this class from the overridden transform. */ @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply() - public StreamingPubsubIORead( - DataflowRunner runner, - PubsubUnboundedSource transform, - PCollection<PubsubMessage> originalOutput) { + public StreamingPubsubIORead(DataflowRunner runner, PubsubUnboundedSource transform) { this.transform = transform; } @@ -986,6 +992,136 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { // ================================================================================ /** + * A PTranform override factory which maps Create.Values PTransforms for streaming pipelines + * into a Dataflow specific variant. + */ + private static class StreamingFnApiCreateOverrideFactory<T> + implements PTransformOverrideFactory<PBegin, PCollection<T>, Create.Values<T>> { + + @Override + public PTransformReplacement<PBegin, PCollection<T>> getReplacementTransform( + AppliedPTransform<PBegin, PCollection<T>, Create.Values<T>> transform) { + Create.Values<T> original = transform.getTransform(); + PCollection<T> output = + (PCollection) Iterables.getOnlyElement(transform.getOutputs().values()); + return PTransformReplacement.of( + transform.getPipeline().begin(), + new StreamingFnApiCreate<>(original, output)); + } + + @Override + public Map<PValue, ReplacementOutput> mapOutputs( + Map<TupleTag<?>, PValue> outputs, PCollection<T> newOutput) { + return ReplacementOutputs.singleton(outputs, newOutput); + } + } + + /** + * Specialized implementation for + * {@link org.apache.beam.sdk.transforms.Create.Values Create.Values} for the Dataflow runner in + * streaming mode over the Fn API. + */ + private static class StreamingFnApiCreate<T> extends PTransform<PBegin, PCollection<T>> { + private final Create.Values<T> transform; + private final PCollection<T> originalOutput; + + private StreamingFnApiCreate( + Create.Values<T> transform, + PCollection<T> originalOutput) { + this.transform = transform; + this.originalOutput = originalOutput; + } + + @Override + public final PCollection<T> expand(PBegin input) { + try { + PCollection<T> pc = Pipeline + .applyTransform(input, new Impulse(IsBounded.BOUNDED)) + .apply(ParDo.of(DecodeAndEmitDoFn + .fromIterable(transform.getElements(), originalOutput.getCoder()))); + pc.setCoder(originalOutput.getCoder()); + return pc; + } catch (IOException e) { + throw new IllegalStateException("Unable to encode elements.", e); + } + } + + /** + * A DoFn which stores encoded versions of elements and a representation of a Coder + * capable of decoding those elements. + * + * <p>TODO: BEAM-2422 - Make this a SplittableDoFn. + */ + private static class DecodeAndEmitDoFn<T> extends DoFn<byte[], T> { + public static <T> DecodeAndEmitDoFn<T> fromIterable(Iterable<T> elements, Coder<T> elemCoder) + throws IOException { + ImmutableList.Builder<byte[]> allElementsBytes = ImmutableList.builder(); + for (T element : elements) { + byte[] bytes = CoderUtils.encodeToByteArray(elemCoder, element); + allElementsBytes.add(bytes); + } + return new DecodeAndEmitDoFn<>(allElementsBytes.build(), elemCoder); + } + + private final Collection<byte[]> elements; + private final RunnerApi.MessageWithComponents coderSpec; + + private DecodeAndEmitDoFn(Collection<byte[]> elements, Coder<T> coder) throws IOException { + this.elements = elements; + this.coderSpec = CoderTranslation.toProto(coder); + } + + @ProcessElement + public void processElement(ProcessContext context) throws IOException { + Coder<T> coder = + (Coder) CoderTranslation.fromProto(coderSpec.getCoder(), coderSpec.getComponents()); + for (byte[] element : elements) { + context.output(CoderUtils.decodeFromByteArray(coder, element)); + } + } + } + } + + /** The Dataflow specific override for the impulse primitive. */ + private static class Impulse extends PTransform<PBegin, PCollection<byte[]>> { + private final IsBounded isBounded; + + private Impulse(IsBounded isBounded) { + this.isBounded = isBounded; + } + + @Override + public PCollection<byte[]> expand(PBegin input) { + return PCollection.createPrimitiveOutputInternal( + input.getPipeline(), WindowingStrategy.globalDefault(), isBounded); + } + + @Override + protected Coder<?> getDefaultOutputCoder() { + return ByteArrayCoder.of(); + } + + private static class Translator implements TransformTranslator<Impulse> { + @Override + public void translate(Impulse transform, TranslationContext context) { + if (context.getPipelineOptions().isStreaming()) { + StepTranslationContext stepContext = context.addStep(transform, "ParallelRead"); + stepContext.addInput(PropertyNames.FORMAT, "pubsub"); + stepContext.addInput(PropertyNames.PUBSUB_SUBSCRIPTION, "_starting_signal/"); + stepContext.addOutput(context.getOutput(transform)); + } else { + throw new UnsupportedOperationException( + "Impulse source for batch pipelines has not been defined."); + } + } + } + + static { + DataflowPipelineTranslator.registerTransformTranslator(Impulse.class, new Translator()); + } + } + + /** * Specialized implementation for * {@link org.apache.beam.sdk.io.Read.Unbounded Read.Unbounded} for the * Dataflow runner in streaming mode. @@ -998,9 +1134,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { /** Builds an instance of this class from the overridden transform. */ @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply() - public StreamingUnboundedRead(DataflowRunner runner, - Read.Unbounded<T> transform, - PCollection<T> originalOutput) { + public StreamingUnboundedRead(DataflowRunner runner, Read.Unbounded<T> transform) { this.source = transform.getSource(); } @@ -1115,9 +1249,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { /** Builds an instance of this class from the overridden transform. */ @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply() - public StreamingBoundedRead(DataflowRunner runner, - Read.Bounded<T> transform, - PCollection<T> originalOutput) { + public StreamingBoundedRead(DataflowRunner runner, Read.Bounded<T> transform) { this.source = transform.getSource(); }
