[BEAM-386] Remove StreamingCreate in DataflowRunner

Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fc99c53f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fc99c53f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fc99c53f

Branch: refs/heads/master
Commit: fc99c53fa3503e5877bd552e8b0bd10b866ed1cb
Parents: 3c6e147
Author: Pei He <[email protected]>
Authored: Wed Jul 20 15:47:05 2016 -0700
Committer: Dan Halperin <[email protected]>
Committed: Tue Jul 26 11:31:53 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowRunner.java   | 91 --------------------
 1 file changed, 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fc99c53f/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 05ddf45..8f9e76e 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -46,7 +46,6 @@ import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.PipelineResult.State;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.BigEndianLongCoder;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
 import org.apache.beam.sdk.coders.CoderException;
@@ -88,7 +87,6 @@ import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
 import org.apache.beam.sdk.transforms.WithKeys;
-import org.apache.beam.sdk.transforms.windowing.AfterPane;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
@@ -148,7 +146,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 
 import org.joda.time.DateTimeUtils;
 import org.joda.time.DateTimeZone;
-import org.joda.time.Duration;
 import org.joda.time.format.DateTimeFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -329,7 +326,6 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
     if (options.isStreaming()) {
       builder.put(Combine.GloballyAsSingletonView.class,
                   StreamingCombineGloballyAsSingletonView.class);
-      builder.put(Create.Values.class, StreamingCreate.class);
       builder.put(View.AsMap.class, StreamingViewAsMap.class);
       builder.put(View.AsMultimap.class, StreamingViewAsMultimap.class);
       builder.put(View.AsSingleton.class, StreamingViewAsSingleton.class);
@@ -2377,93 +2373,6 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
   }
 
   /**
-   * Specialized implementation for
-   * {@link org.apache.beam.sdk.transforms.Create.Values Create.Values} for the
-   * Dataflow runner in streaming mode.
-   */
-  private static class StreamingCreate<T> extends PTransform<PInput, 
PCollection<T>> {
-    private final Create.Values<T> transform;
-
-    /**
-     * Builds an instance of this class from the overridden transform.
-     */
-    @SuppressWarnings("unused") // used via reflection in 
DataflowRunner#apply()
-    public StreamingCreate(DataflowRunner runner, Create.Values<T> transform) {
-      this.transform = transform;
-    }
-
-    /**
-     * {@link DoFn} that outputs a single KV.of(null, null) kick off the 
{@link GroupByKey}
-     * in the streaming create implementation.
-     */
-    private static class OutputNullKv extends DoFn<String, KV<Void, Void>> {
-      @Override
-      public void processElement(DoFn<String, KV<Void, Void>>.ProcessContext 
c) throws Exception {
-        c.output(KV.of((Void) null, (Void) null));
-      }
-    }
-
-    /**
-     * A {@link DoFn} which outputs the specified elements by first encoding 
them to bytes using
-     * the specified {@link Coder} so that they are serialized as part of the 
{@link DoFn} but
-     * need not implement {@code Serializable}.
-     */
-    private static class OutputElements<T> extends DoFn<Object, T> {
-      private final Coder<T> coder;
-      private final List<byte[]> encodedElements;
-
-      public OutputElements(Iterable<T> elems, Coder<T> coder) {
-        this.coder = coder;
-        this.encodedElements = new ArrayList<>();
-        for (T t : elems) {
-          try {
-            encodedElements.add(CoderUtils.encodeToByteArray(coder, t));
-          } catch (CoderException e) {
-            throw new IllegalArgumentException("Unable to encode value " + t
-                + " with coder " + coder, e);
-          }
-        }
-      }
-
-      @Override
-      public void processElement(ProcessContext c) throws IOException {
-        for (byte[] encodedElement : encodedElements) {
-          c.output(CoderUtils.decodeFromByteArray(coder, encodedElement));
-        }
-      }
-    }
-
-    @Override
-    public PCollection<T> apply(PInput input) {
-      try {
-        Coder<T> coder = transform.getDefaultOutputCoder(input);
-        return Pipeline.applyTransform(
-            "StartingSignal", input, 
PubsubIO.Read.subscription("_starting_signal/"))
-            .apply(ParDo.of(new OutputNullKv()))
-            .apply("GlobalSingleton", Window.<KV<Void, Void>>into(new 
GlobalWindows())
-                .triggering(AfterPane.elementCountAtLeast(1))
-                .withAllowedLateness(Duration.ZERO)
-                .discardingFiredPanes())
-            .apply(GroupByKey.<Void, Void>create())
-            // Go back to the default windowing strategy, so that our setting 
allowed lateness
-            // doesn't count as the user having set it.
-            .setWindowingStrategyInternal(WindowingStrategy.globalDefault())
-            .apply(Window.<KV<Void, Iterable<Void>>>into(new GlobalWindows()))
-            .apply(ParDo.of(new OutputElements<>(transform.getElements(), 
coder)))
-            .setCoder(coder).setIsBoundedInternal(IsBounded.BOUNDED);
-      } catch (CannotProvideCoderException e) {
-        throw new IllegalArgumentException("Unable to infer a coder and no 
Coder was specified. "
-            + "Please set a coder by invoking Create.withCoder() explicitly.", 
e);
-      }
-    }
-
-    @Override
-    protected String getKindString() {
-      return "StreamingCreate";
-    }
-  }
-
-  /**
    * A specialized {@link DoFn} for writing the contents of a {@link 
PCollection}
    * to a streaming {@link PCollectionView} backend implementation.
    */

Reply via email to