Repository: beam Updated Branches: refs/heads/master 8860cceb7 -> 5f8cfa741
DataflowRunner: Reject merging windowing for stateful ParDo Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cab4d896 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cab4d896 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cab4d896 Branch: refs/heads/master Commit: cab4d8969e7f95b0ece59838ad2d578e75d38823 Parents: 7a9f762 Author: Kenneth Knowles <[email protected]> Authored: Wed Jun 21 20:25:31 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Thu Jun 22 19:47:47 2017 -0700 ---------------------------------------------------------------------- .../dataflow/BatchStatefulParDoOverrides.java | 2 ++ .../dataflow/DataflowPipelineTranslator.java | 5 ++- .../beam/runners/dataflow/DataflowRunner.java | 10 ++++++ .../runners/dataflow/DataflowRunnerTest.java | 38 ++++++++++++++++++++ 4 files changed, 54 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/cab4d896/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java index 41202db..7309f61 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java @@ -146,6 +146,7 @@ public class BatchStatefulParDoOverrides { DoFn<KV<K, InputT>, OutputT> fn = originalParDo.getFn(); verifyFnIsStateful(fn); DataflowRunner.verifyStateSupported(fn); + DataflowRunner.verifyStateSupportForWindowingStrategy(input.getWindowingStrategy()); PTransform< PCollection<? extends KV<K, Iterable<KV<Instant, WindowedValue<KV<K, InputT>>>>>>, @@ -171,6 +172,7 @@ public class BatchStatefulParDoOverrides { DoFn<KV<K, InputT>, OutputT> fn = originalParDo.getFn(); verifyFnIsStateful(fn); DataflowRunner.verifyStateSupported(fn); + DataflowRunner.verifyStateSupportForWindowingStrategy(input.getWindowingStrategy()); PTransform< PCollection<? extends KV<K, Iterable<KV<Instant, WindowedValue<KV<K, InputT>>>>>>, http://git-wip-us.apache.org/repos/asf/beam/blob/cab4d896/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index 6d30544..28fd1bb 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -972,7 +972,10 @@ public class DataflowPipelineTranslator { fn)); } - DataflowRunner.verifyStateSupported(fn); + if (signature.usesState() || signature.usesTimers()) { + DataflowRunner.verifyStateSupported(fn); + DataflowRunner.verifyStateSupportForWindowingStrategy(windowingStrategy); + } stepContext.addInput(PropertyNames.USER_FN, fn.getClass().getName()); stepContext.addInput( http://git-wip-us.apache.org/repos/asf/beam/blob/cab4d896/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 4d7f6ac..5d9f0f3 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -1542,4 +1542,14 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { } } } + + static void verifyStateSupportForWindowingStrategy(WindowingStrategy strategy) { + // https://issues.apache.org/jira/browse/BEAM-2507 + if (!strategy.getWindowFn().isNonMerging()) { + throw new UnsupportedOperationException( + String.format( + "%s does not currently support state or timers with merging windows", + DataflowRunner.class.getSimpleName())); + } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/cab4d896/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index f57c0ee..bc1a042 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -93,12 +93,15 @@ import org.apache.beam.sdk.state.MapState; import org.apache.beam.sdk.state.SetState; import org.apache.beam.sdk.state.StateSpec; import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.state.ValueState; import org.apache.beam.sdk.testing.ExpectedLogs; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.Sessions; +import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.GcsUtil; import org.apache.beam.sdk.util.ReleaseInfo; import org.apache.beam.sdk.util.gcsfs.GcsPath; @@ -112,6 +115,7 @@ import org.apache.beam.sdk.values.WindowingStrategy; import org.hamcrest.Description; import org.hamcrest.Matchers; import org.hamcrest.TypeSafeMatcher; +import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Before; import org.junit.Rule; @@ -127,6 +131,8 @@ import org.mockito.stubbing.Answer; /** * Tests for the {@link DataflowRunner}. + * + * <p>Implements {@link Serializable} because it is caught in closures. */ @RunWith(JUnit4.class) public class DataflowRunnerTest implements Serializable { @@ -1222,6 +1228,38 @@ public class DataflowRunnerTest implements Serializable { testStreamingWriteOverride(options, StreamingShardedWriteFactory.DEFAULT_NUM_SHARDS); } + private void verifyMergingStatefulParDoRejected(PipelineOptions options) throws Exception { + Pipeline p = Pipeline.create(options); + + p.apply(Create.of(KV.of(13, 42))) + .apply(Window.<KV<Integer, Integer>>into(Sessions.withGapDuration(Duration.millis(1)))) + .apply(ParDo.of(new DoFn<KV<Integer, Integer>, Void>() { + @StateId("fizzle") + private final StateSpec<ValueState<Void>> voidState = StateSpecs.value(); + + @ProcessElement + public void process() {} + })); + + thrown.expectMessage("merging"); + thrown.expect(UnsupportedOperationException.class); + p.run(); + } + + @Test + public void testMergingStatefulRejectedInStreaming() throws Exception { + PipelineOptions options = buildPipelineOptions(); + options.as(StreamingOptions.class).setStreaming(true); + verifyMergingStatefulParDoRejected(options); + } + + @Test + public void testMergingStatefulRejectedInBatch() throws Exception { + PipelineOptions options = buildPipelineOptions(); + options.as(StreamingOptions.class).setStreaming(false); + verifyMergingStatefulParDoRejected(options); + } + private void testStreamingWriteOverride(PipelineOptions options, int expectedNumShards) { TestPipeline p = TestPipeline.fromOptions(options);
