Reject stateful DoFn in SparkRunner

Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b0d07d74
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b0d07d74
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b0d07d74

Branch: refs/heads/gearpump-runner
Commit: b0d07d74f7805ee1d30fdedf54c089790d63d898
Parents: 8d71568
Author: Kenneth Knowles <[email protected]>
Authored: Tue Nov 15 21:33:13 2016 -0800
Committer: Kenneth Knowles <[email protected]>
Committed: Mon Nov 21 21:34:20 2016 -0800

----------------------------------------------------------------------
 runners/spark/pom.xml                           |  1 +
 .../spark/translation/TransformTranslator.java  | 23 ++++++++++++++++++++
 2 files changed, 24 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0d07d74/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 4c5b3f5..88223e2 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -72,6 +72,7 @@
                 </goals>
                 <configuration>
                   
<groups>org.apache.beam.sdk.testing.RunnableOnService</groups>
+                  
<excludedGroups>org.apache.beam.sdk.testing.UsesStatefulParDo</excludedGroups>
                   <forkCount>1</forkCount>
                   <reuseForks>false</reuseForks>
                   <failIfNoTests>true</failIfNoTests>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0d07d74/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index c902ee3..60d668e 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -31,6 +31,7 @@ import org.apache.avro.mapred.AvroKey;
 import org.apache.avro.mapreduce.AvroJob;
 import org.apache.avro.mapreduce.AvroKeyInputFormat;
 import org.apache.beam.runners.core.AssignWindowsDoFn;
+import org.apache.beam.runners.spark.SparkRunner;
 import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton;
 import org.apache.beam.runners.spark.aggregators.NamedAggregators;
 import org.apache.beam.runners.spark.io.SourceRDD;
@@ -47,12 +48,14 @@ import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.CombineWithContext;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -225,6 +228,16 @@ public final class TransformTranslator {
     return new TransformEvaluator<ParDo.Bound<InputT, OutputT>>() {
       @Override
       public void evaluate(ParDo.Bound<InputT, OutputT> transform, 
EvaluationContext context) {
+        DoFn<InputT, OutputT> doFn = transform.getNewFn();
+        if 
(DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) {
+          throw new UnsupportedOperationException(
+              String.format(
+                  "Found %s annotations on %s, but %s cannot yet be used with 
state in the %s.",
+                  DoFn.StateId.class.getSimpleName(),
+                  doFn.getClass().getName(),
+                  DoFn.class.getSimpleName(),
+                  SparkRunner.class.getSimpleName()));
+        }
         @SuppressWarnings("unchecked")
         JavaRDD<WindowedValue<InputT>> inRDD =
             ((BoundedDataset<InputT>) 
context.borrowDataset(transform)).getRDD();
@@ -247,6 +260,16 @@ public final class TransformTranslator {
     return new TransformEvaluator<ParDo.BoundMulti<InputT, OutputT>>() {
       @Override
       public void evaluate(ParDo.BoundMulti<InputT, OutputT> transform, 
EvaluationContext context) {
+        DoFn<InputT, OutputT> doFn = transform.getNewFn();
+        if 
(DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) {
+          throw new UnsupportedOperationException(
+              String.format(
+                  "Found %s annotations on %s, but %s cannot yet be used with 
state in the %s.",
+                  DoFn.StateId.class.getSimpleName(),
+                  doFn.getClass().getName(),
+                  DoFn.class.getSimpleName(),
+                  SparkRunner.class.getSimpleName()));
+        }
         @SuppressWarnings("unchecked")
         JavaRDD<WindowedValue<InputT>> inRDD =
             ((BoundedDataset<InputT>) 
context.borrowDataset(transform)).getRDD();

Reply via email to