Repository: beam Updated Branches: refs/heads/master 9ed0af8f2 -> 5506be87d
DataflowRunner: Reject SetState and MapState Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c0576445 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c0576445 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c0576445 Branch: refs/heads/master Commit: c05764454e73ab93d0602f34b8b7622d46e1d892 Parents: 3785b5b Author: Kenneth Knowles <[email protected]> Authored: Wed Jun 21 20:58:35 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Thu Jun 22 13:51:03 2017 -0700 ---------------------------------------------------------------------- .../dataflow/BatchStatefulParDoOverrides.java | 2 + .../dataflow/DataflowPipelineTranslator.java | 2 + .../beam/runners/dataflow/DataflowRunner.java | 30 +++++++ .../runners/dataflow/DataflowRunnerTest.java | 89 ++++++++++++++++++-- 4 files changed, 114 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/c0576445/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java index 4d9a57f..41202db 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java @@ -145,6 +145,7 @@ public class BatchStatefulParDoOverrides { public PCollection<OutputT> expand(PCollection<KV<K, InputT>> input) { DoFn<KV<K, InputT>, OutputT> fn = originalParDo.getFn(); verifyFnIsStateful(fn); + DataflowRunner.verifyStateSupported(fn); PTransform< PCollection<? extends KV<K, Iterable<KV<Instant, WindowedValue<KV<K, InputT>>>>>>, @@ -169,6 +170,7 @@ public class BatchStatefulParDoOverrides { public PCollectionTuple expand(PCollection<KV<K, InputT>> input) { DoFn<KV<K, InputT>, OutputT> fn = originalParDo.getFn(); verifyFnIsStateful(fn); + DataflowRunner.verifyStateSupported(fn); PTransform< PCollection<? extends KV<K, Iterable<KV<Instant, WindowedValue<KV<K, InputT>>>>>>, http://git-wip-us.apache.org/repos/asf/beam/blob/c0576445/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index bfd9b64..6d30544 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -972,6 +972,8 @@ public class DataflowPipelineTranslator { fn)); } + DataflowRunner.verifyStateSupported(fn); + stepContext.addInput(PropertyNames.USER_FN, fn.getClass().getName()); stepContext.addInput( PropertyNames.SERIALIZED_FN, http://git-wip-us.apache.org/repos/asf/beam/blob/c0576445/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 1741287..4d7f6ac 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -107,6 +107,8 @@ import org.apache.beam.sdk.runners.PTransformOverride; import org.apache.beam.sdk.runners.PTransformOverrideFactory; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.runners.TransformHierarchy.Node; +import org.apache.beam.sdk.state.MapState; +import org.apache.beam.sdk.state.SetState; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Combine.GroupedValues; import org.apache.beam.sdk.transforms.Create; @@ -119,6 +121,8 @@ import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature; +import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.InstanceBuilder; @@ -136,6 +140,7 @@ import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.ValueWithRecordId; import org.apache.beam.sdk.values.WindowingStrategy; import org.joda.time.DateTimeUtils; @@ -1512,4 +1517,29 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { return workerHarnessContainerImage.replace("IMAGE", "beam-java-batch"); } } + + static void verifyStateSupported(DoFn<?, ?> fn) { + DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass()); + + for (DoFnSignature.StateDeclaration stateDecl : signature.stateDeclarations().values()) { + + // https://issues.apache.org/jira/browse/BEAM-1474 + if (stateDecl.stateType().isSubtypeOf(TypeDescriptor.of(MapState.class))) { + throw new UnsupportedOperationException(String.format( + "%s does not currently support %s", + DataflowRunner.class.getSimpleName(), + MapState.class.getSimpleName() + )); + } + + // https://issues.apache.org/jira/browse/BEAM-1479 + if (stateDecl.stateType().isSubtypeOf(TypeDescriptor.of(SetState.class))) { + throw new UnsupportedOperationException(String.format( + "%s does not currently support %s", + DataflowRunner.class.getSimpleName(), + SetState.class.getSimpleName() + )); + } + } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/c0576445/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index aae21cf..f57c0ee 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -50,6 +50,7 @@ import com.google.common.collect.ImmutableList; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.Serializable; import java.net.URL; import java.net.URLClassLoader; import java.nio.channels.FileChannel; @@ -82,18 +83,26 @@ import org.apache.beam.sdk.io.WriteFiles; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.runners.TransformHierarchy.Node; +import org.apache.beam.sdk.state.MapState; +import org.apache.beam.sdk.state.SetState; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.StateSpecs; import org.apache.beam.sdk.testing.ExpectedLogs; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.util.GcsUtil; import org.apache.beam.sdk.util.ReleaseInfo; import org.apache.beam.sdk.util.gcsfs.GcsPath; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.PValue; @@ -120,7 +129,7 @@ import org.mockito.stubbing.Answer; * Tests for the {@link DataflowRunner}. */ @RunWith(JUnit4.class) -public class DataflowRunnerTest { +public class DataflowRunnerTest implements Serializable { private static final String VALID_STAGING_BUCKET = "gs://valid-bucket/staging"; private static final String VALID_TEMP_BUCKET = "gs://valid-bucket/temp"; @@ -130,15 +139,12 @@ public class DataflowRunnerTest { private static final String PROJECT_ID = "some-project"; private static final String REGION_ID = "some-region-1"; - @Rule - public TemporaryFolder tmpFolder = new TemporaryFolder(); - @Rule - public ExpectedException thrown = ExpectedException.none(); - @Rule - public ExpectedLogs expectedLogs = ExpectedLogs.none(DataflowRunner.class); + @Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder(); + @Rule public transient ExpectedException thrown = ExpectedException.none(); + @Rule public transient ExpectedLogs expectedLogs = ExpectedLogs.none(DataflowRunner.class); - private Dataflow.Projects.Locations.Jobs mockJobs; - private GcsUtil mockGcsUtil; + private transient Dataflow.Projects.Locations.Jobs mockJobs; + private transient GcsUtil mockGcsUtil; // Asserts that the given Job has all expected fields set. private static void assertValidJob(Job job) { @@ -1001,6 +1007,71 @@ public class DataflowRunnerTest { assertTrue(transform.translated); } + private void verifyMapStateUnsupported(PipelineOptions options) throws Exception { + Pipeline p = Pipeline.create(options); + p.apply(Create.of(KV.of(13, 42))) + .apply( + ParDo.of( + new DoFn<KV<Integer, Integer>, Void>() { + @StateId("fizzle") + private final StateSpec<MapState<Void, Void>> voidState = StateSpecs.map(); + + @ProcessElement + public void process() {} + })); + + thrown.expectMessage("MapState"); + thrown.expect(UnsupportedOperationException.class); + p.run(); + } + + @Test + public void testMapStateUnsupportedInBatch() throws Exception { + PipelineOptions options = buildPipelineOptions(); + options.as(StreamingOptions.class).setStreaming(false); + verifyMapStateUnsupported(options); + } + + @Test + public void testMapStateUnsupportedInStreaming() throws Exception { + PipelineOptions options = buildPipelineOptions(); + options.as(StreamingOptions.class).setStreaming(true); + verifyMapStateUnsupported(options); + } + + private void verifySetStateUnsupported(PipelineOptions options) throws Exception { + Pipeline p = Pipeline.create(options); + p.apply(Create.of(KV.of(13, 42))) + .apply( + ParDo.of( + new DoFn<KV<Integer, Integer>, Void>() { + @StateId("fizzle") + private final StateSpec<SetState<Void>> voidState = StateSpecs.set(); + + @ProcessElement + public void process() {} + })); + + thrown.expectMessage("SetState"); + thrown.expect(UnsupportedOperationException.class); + p.run(); + } + + @Test + public void testSetStateUnsupportedInBatch() throws Exception { + PipelineOptions options = buildPipelineOptions(); + options.as(StreamingOptions.class).setStreaming(false); + Pipeline p = Pipeline.create(options); + verifySetStateUnsupported(options); + } + + @Test + public void testSetStateUnsupportedInStreaming() throws Exception { + PipelineOptions options = buildPipelineOptions(); + options.as(StreamingOptions.class).setStreaming(true); + verifySetStateUnsupported(options); + } + /** Records all the composite transforms visited within the Pipeline. */ private static class CompositeTransformRecorder extends PipelineVisitor.Defaults { private List<PTransform<?, ?>> transforms = new ArrayList<>();
