[BEAM-386] Remove StreamingCreate in DataflowRunner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fc99c53f Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fc99c53f Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fc99c53f Branch: refs/heads/master Commit: fc99c53fa3503e5877bd552e8b0bd10b866ed1cb Parents: 3c6e147 Author: Pei He <[email protected]> Authored: Wed Jul 20 15:47:05 2016 -0700 Committer: Dan Halperin <[email protected]> Committed: Tue Jul 26 11:31:53 2016 -0700 ---------------------------------------------------------------------- .../beam/runners/dataflow/DataflowRunner.java | 91 -------------------- 1 file changed, 91 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fc99c53f/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 05ddf45..8f9e76e 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -46,7 +46,6 @@ import org.apache.beam.sdk.Pipeline.PipelineVisitor; import org.apache.beam.sdk.PipelineResult.State; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.BigEndianLongCoder; -import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.NonDeterministicException; import org.apache.beam.sdk.coders.CoderException; @@ -88,7 +87,6 @@ import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.View.CreatePCollectionView; import org.apache.beam.sdk.transforms.WithKeys; -import org.apache.beam.sdk.transforms.windowing.AfterPane; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; @@ -148,7 +146,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import org.joda.time.DateTimeUtils; import org.joda.time.DateTimeZone; -import org.joda.time.Duration; import org.joda.time.format.DateTimeFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -329,7 +326,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { if (options.isStreaming()) { builder.put(Combine.GloballyAsSingletonView.class, StreamingCombineGloballyAsSingletonView.class); - builder.put(Create.Values.class, StreamingCreate.class); builder.put(View.AsMap.class, StreamingViewAsMap.class); builder.put(View.AsMultimap.class, StreamingViewAsMultimap.class); builder.put(View.AsSingleton.class, StreamingViewAsSingleton.class); @@ -2377,93 +2373,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { } /** - * Specialized implementation for - * {@link org.apache.beam.sdk.transforms.Create.Values Create.Values} for the - * Dataflow runner in streaming mode. - */ - private static class StreamingCreate<T> extends PTransform<PInput, PCollection<T>> { - private final Create.Values<T> transform; - - /** - * Builds an instance of this class from the overridden transform. - */ - @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply() - public StreamingCreate(DataflowRunner runner, Create.Values<T> transform) { - this.transform = transform; - } - - /** - * {@link DoFn} that outputs a single KV.of(null, null) kick off the {@link GroupByKey} - * in the streaming create implementation. - */ - private static class OutputNullKv extends DoFn<String, KV<Void, Void>> { - @Override - public void processElement(DoFn<String, KV<Void, Void>>.ProcessContext c) throws Exception { - c.output(KV.of((Void) null, (Void) null)); - } - } - - /** - * A {@link DoFn} which outputs the specified elements by first encoding them to bytes using - * the specified {@link Coder} so that they are serialized as part of the {@link DoFn} but - * need not implement {@code Serializable}. - */ - private static class OutputElements<T> extends DoFn<Object, T> { - private final Coder<T> coder; - private final List<byte[]> encodedElements; - - public OutputElements(Iterable<T> elems, Coder<T> coder) { - this.coder = coder; - this.encodedElements = new ArrayList<>(); - for (T t : elems) { - try { - encodedElements.add(CoderUtils.encodeToByteArray(coder, t)); - } catch (CoderException e) { - throw new IllegalArgumentException("Unable to encode value " + t - + " with coder " + coder, e); - } - } - } - - @Override - public void processElement(ProcessContext c) throws IOException { - for (byte[] encodedElement : encodedElements) { - c.output(CoderUtils.decodeFromByteArray(coder, encodedElement)); - } - } - } - - @Override - public PCollection<T> apply(PInput input) { - try { - Coder<T> coder = transform.getDefaultOutputCoder(input); - return Pipeline.applyTransform( - "StartingSignal", input, PubsubIO.Read.subscription("_starting_signal/")) - .apply(ParDo.of(new OutputNullKv())) - .apply("GlobalSingleton", Window.<KV<Void, Void>>into(new GlobalWindows()) - .triggering(AfterPane.elementCountAtLeast(1)) - .withAllowedLateness(Duration.ZERO) - .discardingFiredPanes()) - .apply(GroupByKey.<Void, Void>create()) - // Go back to the default windowing strategy, so that our setting allowed lateness - // doesn't count as the user having set it. - .setWindowingStrategyInternal(WindowingStrategy.globalDefault()) - .apply(Window.<KV<Void, Iterable<Void>>>into(new GlobalWindows())) - .apply(ParDo.of(new OutputElements<>(transform.getElements(), coder))) - .setCoder(coder).setIsBoundedInternal(IsBounded.BOUNDED); - } catch (CannotProvideCoderException e) { - throw new IllegalArgumentException("Unable to infer a coder and no Coder was specified. " - + "Please set a coder by invoking Create.withCoder() explicitly.", e); - } - } - - @Override - protected String getKindString() { - return "StreamingCreate"; - } - } - - /** * A specialized {@link DoFn} for writing the contents of a {@link PCollection} * to a streaming {@link PCollectionView} backend implementation. */
