Repository: incubator-beam Updated Branches: refs/heads/master 1716bfc49 -> 2f86a6ad0
Transmit new DoFn, not OldDoFn, in Dataflow translator Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f52ac3ec Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f52ac3ec Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f52ac3ec Branch: refs/heads/master Commit: f52ac3ec75cfec025290f174f0f0529850c2bfd9 Parents: c21167c Author: Kenneth Knowles <k...@google.com> Authored: Tue Nov 15 22:27:35 2016 -0800 Committer: Kenneth Knowles <k...@google.com> Committed: Tue Nov 29 11:07:02 2016 -0800 ---------------------------------------------------------------------- .../beam/runners/dataflow/DataflowPipelineTranslator.java | 7 +++---- .../org/apache/beam/runners/dataflow/DataflowRunner.java | 8 +++----- 2 files changed, 6 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f52ac3ec/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index 0549d5b..2af2cae 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -72,7 +72,6 @@ import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; @@ -968,7 +967,7 @@ public class DataflowPipelineTranslator { BiMap<Long, TupleTag<?>> outputMap = translateOutputs(context.getOutput(transform), context); translateFn( - transform.getFn(), + transform.getNewFn(), context.getInput(transform).getWindowingStrategy(), transform.getSideInputs(), context.getInput(transform).getCoder(), @@ -997,7 +996,7 @@ public class DataflowPipelineTranslator { translateInputs(context.getInput(transform), transform.getSideInputs(), context); long mainOutput = context.addOutput(context.getOutput(transform)); translateFn( - transform.getFn(), + transform.getNewFn(), context.getInput(transform).getWindowingStrategy(), transform.getSideInputs(), context.getInput(transform).getCoder(), @@ -1075,7 +1074,7 @@ public class DataflowPipelineTranslator { } private static void translateFn( - OldDoFn fn, + DoFn fn, WindowingStrategy windowingStrategy, Iterable<PCollectionView<?>> sideInputs, Coder inputCoder, http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f52ac3ec/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index b629d65..ca3f0ed 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -128,7 +128,6 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -2364,8 +2363,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { * {@link PCollectionView} backend implementation. */ @Deprecated - public static class StreamingPCollectionViewWriterFn<T> - extends OldDoFn<Iterable<T>, T> implements OldDoFn.RequiresWindowAccess { + public static class StreamingPCollectionViewWriterFn<T> extends DoFn<Iterable<T>, T> { private final PCollectionView<?> view; private final Coder<T> dataCoder; @@ -2387,8 +2385,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { return dataCoder; } - @Override - public void processElement(ProcessContext c) throws Exception { + @ProcessElement + public void processElement(ProcessContext c, BoundedWindow w) throws Exception { throw new UnsupportedOperationException( String.format( "%s is a marker class only and should never be executed.", getClass().getName()));