Repository: incubator-beam
Updated Branches:
  refs/heads/master 1716bfc49 -> 2f86a6ad0


Transmit new DoFn, not OldDoFn, in Dataflow translator


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f52ac3ec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f52ac3ec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f52ac3ec

Branch: refs/heads/master
Commit: f52ac3ec75cfec025290f174f0f0529850c2bfd9
Parents: c21167c
Author: Kenneth Knowles <k...@google.com>
Authored: Tue Nov 15 22:27:35 2016 -0800
Committer: Kenneth Knowles <k...@google.com>
Committed: Tue Nov 29 11:07:02 2016 -0800

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowPipelineTranslator.java    | 7 +++----
 .../org/apache/beam/runners/dataflow/DataflowRunner.java     | 8 +++-----
 2 files changed, 6 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f52ac3ec/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 0549d5b..2af2cae 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -72,7 +72,6 @@ import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
@@ -968,7 +967,7 @@ public class DataflowPipelineTranslator {
             BiMap<Long, TupleTag<?>> outputMap =
                 translateOutputs(context.getOutput(transform), context);
             translateFn(
-                transform.getFn(),
+                transform.getNewFn(),
                 context.getInput(transform).getWindowingStrategy(),
                 transform.getSideInputs(),
                 context.getInput(transform).getCoder(),
@@ -997,7 +996,7 @@ public class DataflowPipelineTranslator {
             translateInputs(context.getInput(transform), 
transform.getSideInputs(), context);
             long mainOutput = context.addOutput(context.getOutput(transform));
             translateFn(
-                transform.getFn(),
+                transform.getNewFn(),
                 context.getInput(transform).getWindowingStrategy(),
                 transform.getSideInputs(),
                 context.getInput(transform).getCoder(),
@@ -1075,7 +1074,7 @@ public class DataflowPipelineTranslator {
   }
 
   private static void translateFn(
-      OldDoFn fn,
+      DoFn fn,
       WindowingStrategy windowingStrategy,
       Iterable<PCollectionView<?>> sideInputs,
       Coder inputCoder,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f52ac3ec/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index b629d65..ca3f0ed 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -128,7 +128,6 @@ import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -2364,8 +2363,7 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
    * {@link PCollectionView} backend implementation.
    */
   @Deprecated
-  public static class StreamingPCollectionViewWriterFn<T>
-  extends OldDoFn<Iterable<T>, T> implements OldDoFn.RequiresWindowAccess {
+  public static class StreamingPCollectionViewWriterFn<T> extends 
DoFn<Iterable<T>, T> {
     private final PCollectionView<?> view;
     private final Coder<T> dataCoder;
 
@@ -2387,8 +2385,8 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
       return dataCoder;
     }
 
-    @Override
-    public void processElement(ProcessContext c) throws Exception {
+    @ProcessElement
+    public void processElement(ProcessContext c, BoundedWindow w) throws 
Exception {
       throw new UnsupportedOperationException(
           String.format(
               "%s is a marker class only and should never be executed.", 
getClass().getName()));

Reply via email to