This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 7fb8f7b  Replaces BoundedSource with a composite transform when using 
Fn API
7fb8f7b is described below

commit 7fb8f7b6b3280ff991bd90e39006e72e4126697e
Author: Eugene Kirpichov <[email protected]>
AuthorDate: Mon Oct 9 11:30:27 2017 -0700

    Replaces BoundedSource with a composite transform when using Fn API
---
 .../beam/runners/dataflow/DataflowRunner.java      | 100 +++++++++++++++++++--
 .../beam/runners/dataflow/util/PropertyNames.java  |   1 +
 2 files changed, 92 insertions(+), 9 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index a650092..ddad43f 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -21,6 +21,7 @@ import static com.google.common.base.MoreObjects.firstNonNull;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.base.Strings.isNullOrEmpty;
+import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray;
 import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray;
 import static org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString;
 
@@ -91,6 +92,7 @@ import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.extensions.gcp.storage.PathValidator;
 import org.apache.beam.sdk.io.BoundedSource;
@@ -132,6 +134,7 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.InstanceBuilder;
 import org.apache.beam.sdk.util.MimeTypes;
@@ -420,6 +423,13 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
                   new ReflectiveViewOverrideFactory(
                       BatchViewOverrides.BatchViewAsIterable.class, this)));
     }
+    // Expands into Reshuffle and single-output ParDo, so has to be before the 
overrides below.
+    if (hasExperiment(options, "beam_fn_api")) {
+      overridesBuilder.add(
+          PTransformOverride.of(
+              PTransformMatchers.classEqualTo(Read.Bounded.class),
+              new FnApiBoundedReadOverrideFactory()));
+    }
     overridesBuilder
         .add(
             PTransformOverride.of(
@@ -1185,7 +1195,7 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
     public final PCollection<T> expand(PBegin input) {
       try {
         PCollection<T> pc = Pipeline
-            .applyTransform(input, new Impulse(IsBounded.BOUNDED))
+            .applyTransform(input, new Impulse())
             .apply(ParDo.of(DecodeAndEmitDoFn
                 .fromIterable(transform.getElements(), 
originalOutput.getCoder())));
         pc.setCoder(originalOutput.getCoder());
@@ -1206,7 +1216,7 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
           throws IOException {
         ImmutableList.Builder<byte[]> allElementsBytes = 
ImmutableList.builder();
         for (T element : elements) {
-          byte[] bytes = CoderUtils.encodeToByteArray(elemCoder, element);
+          byte[] bytes = encodeToByteArray(elemCoder, element);
           allElementsBytes.add(bytes);
         }
         return new DecodeAndEmitDoFn<>(allElementsBytes.build(), elemCoder);
@@ -1244,16 +1254,16 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
 
   /** The Dataflow specific override for the impulse primitive. */
   private static class Impulse extends PTransform<PBegin, PCollection<byte[]>> 
{
-    private final IsBounded isBounded;
-
-    private Impulse(IsBounded isBounded) {
-      this.isBounded = isBounded;
+    private Impulse() {
     }
 
     @Override
     public PCollection<byte[]> expand(PBegin input) {
       return PCollection.createPrimitiveOutputInternal(
-          input.getPipeline(), WindowingStrategy.globalDefault(), isBounded, 
ByteArrayCoder.of());
+          input.getPipeline(),
+          WindowingStrategy.globalDefault(),
+          IsBounded.BOUNDED,
+          ByteArrayCoder.of());
     }
 
     private static class Translator implements TransformTranslator<Impulse> {
@@ -1265,8 +1275,21 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
           stepContext.addInput(PropertyNames.PUBSUB_SUBSCRIPTION, 
"_starting_signal/");
           stepContext.addOutput(context.getOutput(transform));
         } else {
-          throw new UnsupportedOperationException(
-              "Impulse source for batch pipelines has not been defined.");
+          StepTranslationContext stepContext = context.addStep(transform, 
"ParallelRead");
+          stepContext.addInput(PropertyNames.FORMAT, "impulse");
+          WindowedValue.FullWindowedValueCoder<byte[]> coder =
+              WindowedValue.getFullCoder(
+                  context.getOutput(transform).getCoder(), 
GlobalWindow.Coder.INSTANCE);
+          byte[] encodedImpulse;
+          try {
+            encodedImpulse =
+                encodeToByteArray(coder, WindowedValue.valueInGlobalWindow(new 
byte[0]));
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+          stepContext.addInput(
+              PropertyNames.IMPULSE_ELEMENT, 
byteArrayToJsonString(encodedImpulse));
+          stepContext.addOutput(context.getOutput(transform));
         }
       }
     }
@@ -1438,6 +1461,65 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
     }
   }
 
+  private static class FnApiBoundedReadOverrideFactory<T>
+      implements PTransformOverrideFactory<PBegin, PCollection<T>, 
Read.Bounded<T>> {
+    @Override
+    public PTransformReplacement<PBegin, PCollection<T>> 
getReplacementTransform(
+        AppliedPTransform<PBegin, PCollection<T>, Read.Bounded<T>> transform) {
+      return PTransformReplacement.of(
+          transform.getPipeline().begin(), new 
FnApiBoundedRead<>(transform.getTransform()));
+    }
+
+    @Override
+    public Map<PValue, ReplacementOutput> mapOutputs(
+        Map<TupleTag<?>, PValue> outputs, PCollection<T> newOutput) {
+      return ReplacementOutputs.singleton(outputs, newOutput);
+    }
+  }
+
+  /**
+   * Specialized implementation for {@link org.apache.beam.sdk.io.Read.Bounded 
Read.Bounded} for the
+   * Dataflow runner in streaming mode.
+   */
+  private static class FnApiBoundedRead<T> extends PTransform<PBegin, 
PCollection<T>> {
+    private final BoundedSource<T> source;
+
+    public FnApiBoundedRead(Read.Bounded<T> transform) {
+      this.source = transform.getSource();
+    }
+
+    @Override
+    public final PCollection<T> expand(PBegin input) {
+      return input
+          .apply(new Impulse())
+          .apply(
+              ParDo.of(
+                  new DoFn<byte[], BoundedSource<T>>() {
+                    @ProcessElement
+                    public void process(ProcessContext c) throws Exception {
+                      for (BoundedSource<T> split :
+                          source.split(64L << 20, c.getPipelineOptions())) {
+                        c.output(split);
+                      }
+                    }
+                  }))
+          .setCoder((Coder<BoundedSource<T>>) SerializableCoder.of((Class) 
BoundedSource.class))
+          .apply(Reshuffle.<BoundedSource<T>>viaRandomKey())
+          .apply(
+              ParDo.of(
+                  new DoFn<BoundedSource<T>, T>() {
+                    @ProcessElement
+                    public void process(ProcessContext c) throws Exception {
+                      BoundedSource.BoundedReader<T> reader =
+                          c.element().createReader(c.getPipelineOptions());
+                      for (boolean more = reader.start(); more; more = 
reader.advance()) {
+                        c.outputWithTimestamp(reader.getCurrent(), 
reader.getCurrentTimestamp());
+                      }
+                    }
+                  }))
+          .setCoder(source.getOutputCoder());
+    }
+  }
   /**
    * A marker {@link DoFn} for writing the contents of a {@link PCollection} 
to a streaming
    * {@link PCollectionView} backend implementation.
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java
index 55e0c4e..cdc87bf 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java
@@ -64,4 +64,5 @@ public class PropertyNames {
   public static final String VALUE = "value";
   public static final String DISPLAY_DATA = "display_data";
   public static final String RESTRICTION_CODER = "restriction_coder";
+  public static final String IMPULSE_ELEMENT = "impulse_element";
 }

-- 
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].

Reply via email to