Reject stateful DoFn in DataflowRunner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/31a55f40 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/31a55f40 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/31a55f40 Branch: refs/heads/master Commit: 31a55f407473f23a61cf6dfe42c3f6f4c7880920 Parents: bdd3e08 Author: Kenneth Knowles <k...@google.com> Authored: Tue Nov 15 21:35:03 2016 -0800 Committer: Kenneth Knowles <k...@google.com> Committed: Tue Nov 29 09:24:55 2016 -0800 ---------------------------------------------------------------------- runners/google-cloud-dataflow-java/pom.xml | 1 + .../dataflow/DataflowPipelineTranslator.java | 18 ++++++++++++++++++ 2 files changed, 19 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/31a55f40/runners/google-cloud-dataflow-java/pom.xml ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml index da3a4d6..59276e4 100644 --- a/runners/google-cloud-dataflow-java/pom.xml +++ b/runners/google-cloud-dataflow-java/pom.xml @@ -77,6 +77,7 @@ <execution> <id>runnable-on-service-tests</id> <configuration> + <excludedGroups>org.apache.beam.sdk.testing.UsesStatefulParDo</excludedGroups> <excludes> <exclude>org.apache.beam.sdk.transforms.FlattenTest</exclude> </excludes> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/31a55f40/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index 9acf071..0549d5b 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -69,6 +69,7 @@ import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.runners.TransformTreeNode; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.OldDoFn; @@ -77,6 +78,7 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.HasDisplayData; +import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.AppliedCombineFn; @@ -959,6 +961,8 @@ public class DataflowPipelineTranslator { private <InputT, OutputT> void translateMultiHelper( ParDo.BoundMulti<InputT, OutputT> transform, TranslationContext context) { + rejectStatefulDoFn(transform.getNewFn()); + context.addStep(transform, "ParallelDo"); translateInputs(context.getInput(transform), transform.getSideInputs(), context); BiMap<Long, TupleTag<?>> outputMap = @@ -987,6 +991,8 @@ public class DataflowPipelineTranslator { private <InputT, OutputT> void translateSingleHelper( ParDo.Bound<InputT, OutputT> transform, TranslationContext context) { + rejectStatefulDoFn(transform.getNewFn()); + context.addStep(transform, "ParallelDo"); translateInputs(context.getInput(transform), transform.getSideInputs(), context); long mainOutput = context.addOutput(context.getOutput(transform)); @@ -1033,6 +1039,18 @@ public class DataflowPipelineTranslator { registerTransformTranslator(Read.Bounded.class, new ReadTranslator()); } + private static void rejectStatefulDoFn(DoFn<?, ?> doFn) { + if (DoFnSignatures.getSignature(doFn.getClass()).isStateful()) { + throw new UnsupportedOperationException( + String.format( + "Found %s annotations on %s, but %s cannot yet be used with state in the %s.", + DoFn.StateId.class.getSimpleName(), + doFn.getClass().getName(), + DoFn.class.getSimpleName(), + DataflowRunner.class.getSimpleName())); + } + } + private static void translateInputs( PCollection<?> input, List<PCollectionView<?>> sideInputs,