Reject stateful DoFn in SparkRunner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b0d07d74 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b0d07d74 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b0d07d74 Branch: refs/heads/gearpump-runner Commit: b0d07d74f7805ee1d30fdedf54c089790d63d898 Parents: 8d71568 Author: Kenneth Knowles <[email protected]> Authored: Tue Nov 15 21:33:13 2016 -0800 Committer: Kenneth Knowles <[email protected]> Committed: Mon Nov 21 21:34:20 2016 -0800 ---------------------------------------------------------------------- runners/spark/pom.xml | 1 + .../spark/translation/TransformTranslator.java | 23 ++++++++++++++++++++ 2 files changed, 24 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0d07d74/runners/spark/pom.xml ---------------------------------------------------------------------- diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index 4c5b3f5..88223e2 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -72,6 +72,7 @@ </goals> <configuration> <groups>org.apache.beam.sdk.testing.RunnableOnService</groups> + <excludedGroups>org.apache.beam.sdk.testing.UsesStatefulParDo</excludedGroups> <forkCount>1</forkCount> <reuseForks>false</reuseForks> <failIfNoTests>true</failIfNoTests> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0d07d74/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java index c902ee3..60d668e 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java @@ -31,6 +31,7 @@ import org.apache.avro.mapred.AvroKey; import org.apache.avro.mapreduce.AvroJob; import org.apache.avro.mapreduce.AvroKeyInputFormat; import org.apache.beam.runners.core.AssignWindowsDoFn; +import org.apache.beam.runners.spark.SparkRunner; import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton; import org.apache.beam.runners.spark.aggregators.NamedAggregators; import org.apache.beam.runners.spark.io.SourceRDD; @@ -47,12 +48,14 @@ import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.CombineWithContext; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; @@ -225,6 +228,16 @@ public final class TransformTranslator { return new TransformEvaluator<ParDo.Bound<InputT, OutputT>>() { @Override public void evaluate(ParDo.Bound<InputT, OutputT> transform, EvaluationContext context) { + DoFn<InputT, OutputT> doFn = transform.getNewFn(); + if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) { + throw new UnsupportedOperationException( + String.format( + "Found %s annotations on %s, but %s cannot yet be used with state in the %s.", + DoFn.StateId.class.getSimpleName(), + doFn.getClass().getName(), + DoFn.class.getSimpleName(), + SparkRunner.class.getSimpleName())); + } @SuppressWarnings("unchecked") JavaRDD<WindowedValue<InputT>> inRDD = ((BoundedDataset<InputT>) context.borrowDataset(transform)).getRDD(); @@ -247,6 +260,16 @@ public final class TransformTranslator { return new TransformEvaluator<ParDo.BoundMulti<InputT, OutputT>>() { @Override public void evaluate(ParDo.BoundMulti<InputT, OutputT> transform, EvaluationContext context) { + DoFn<InputT, OutputT> doFn = transform.getNewFn(); + if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) { + throw new UnsupportedOperationException( + String.format( + "Found %s annotations on %s, but %s cannot yet be used with state in the %s.", + DoFn.StateId.class.getSimpleName(), + doFn.getClass().getName(), + DoFn.class.getSimpleName(), + SparkRunner.class.getSimpleName())); + } @SuppressWarnings("unchecked") JavaRDD<WindowedValue<InputT>> inRDD = ((BoundedDataset<InputT>) context.borrowDataset(transform)).getRDD();
