Repository: beam
Updated Branches:
  refs/heads/master 8860cceb7 -> 5f8cfa741


DataflowRunner: Reject merging windowing for stateful ParDo


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cab4d896
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cab4d896
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cab4d896

Branch: refs/heads/master
Commit: cab4d8969e7f95b0ece59838ad2d578e75d38823
Parents: 7a9f762
Author: Kenneth Knowles <[email protected]>
Authored: Wed Jun 21 20:25:31 2017 -0700
Committer: Kenneth Knowles <[email protected]>
Committed: Thu Jun 22 19:47:47 2017 -0700

----------------------------------------------------------------------
 .../dataflow/BatchStatefulParDoOverrides.java   |  2 ++
 .../dataflow/DataflowPipelineTranslator.java    |  5 ++-
 .../beam/runners/dataflow/DataflowRunner.java   | 10 ++++++
 .../runners/dataflow/DataflowRunnerTest.java    | 38 ++++++++++++++++++++
 4 files changed, 54 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/cab4d896/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java
index 41202db..7309f61 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java
@@ -146,6 +146,7 @@ public class BatchStatefulParDoOverrides {
       DoFn<KV<K, InputT>, OutputT> fn = originalParDo.getFn();
       verifyFnIsStateful(fn);
       DataflowRunner.verifyStateSupported(fn);
+      
DataflowRunner.verifyStateSupportForWindowingStrategy(input.getWindowingStrategy());
 
       PTransform<
               PCollection<? extends KV<K, Iterable<KV<Instant, 
WindowedValue<KV<K, InputT>>>>>>,
@@ -171,6 +172,7 @@ public class BatchStatefulParDoOverrides {
       DoFn<KV<K, InputT>, OutputT> fn = originalParDo.getFn();
       verifyFnIsStateful(fn);
       DataflowRunner.verifyStateSupported(fn);
+      
DataflowRunner.verifyStateSupportForWindowingStrategy(input.getWindowingStrategy());
 
       PTransform<
               PCollection<? extends KV<K, Iterable<KV<Instant, 
WindowedValue<KV<K, InputT>>>>>>,

http://git-wip-us.apache.org/repos/asf/beam/blob/cab4d896/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 6d30544..28fd1bb 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -972,7 +972,10 @@ public class DataflowPipelineTranslator {
               fn));
     }
 
-    DataflowRunner.verifyStateSupported(fn);
+    if (signature.usesState() || signature.usesTimers()) {
+      DataflowRunner.verifyStateSupported(fn);
+      DataflowRunner.verifyStateSupportForWindowingStrategy(windowingStrategy);
+    }
 
     stepContext.addInput(PropertyNames.USER_FN, fn.getClass().getName());
     stepContext.addInput(

http://git-wip-us.apache.org/repos/asf/beam/blob/cab4d896/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 4d7f6ac..5d9f0f3 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -1542,4 +1542,14 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
       }
     }
   }
+
+  static void verifyStateSupportForWindowingStrategy(WindowingStrategy 
strategy) {
+    // https://issues.apache.org/jira/browse/BEAM-2507
+    if (!strategy.getWindowFn().isNonMerging()) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "%s does not currently support state or timers with merging 
windows",
+              DataflowRunner.class.getSimpleName()));
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/cab4d896/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index f57c0ee..bc1a042 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -93,12 +93,15 @@ import org.apache.beam.sdk.state.MapState;
 import org.apache.beam.sdk.state.SetState;
 import org.apache.beam.sdk.state.StateSpec;
 import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.ValueState;
 import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.GcsUtil;
 import org.apache.beam.sdk.util.ReleaseInfo;
 import org.apache.beam.sdk.util.gcsfs.GcsPath;
@@ -112,6 +115,7 @@ import org.apache.beam.sdk.values.WindowingStrategy;
 import org.hamcrest.Description;
 import org.hamcrest.Matchers;
 import org.hamcrest.TypeSafeMatcher;
+import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Before;
 import org.junit.Rule;
@@ -127,6 +131,8 @@ import org.mockito.stubbing.Answer;
 
 /**
  * Tests for the {@link DataflowRunner}.
+ *
+ * <p>Implements {@link Serializable} because it is caught in closures.
  */
 @RunWith(JUnit4.class)
 public class DataflowRunnerTest implements Serializable {
@@ -1222,6 +1228,38 @@ public class DataflowRunnerTest implements Serializable {
     testStreamingWriteOverride(options, 
StreamingShardedWriteFactory.DEFAULT_NUM_SHARDS);
   }
 
+  private void verifyMergingStatefulParDoRejected(PipelineOptions options) 
throws Exception {
+    Pipeline p = Pipeline.create(options);
+
+    p.apply(Create.of(KV.of(13, 42)))
+        .apply(Window.<KV<Integer, 
Integer>>into(Sessions.withGapDuration(Duration.millis(1))))
+        .apply(ParDo.of(new DoFn<KV<Integer, Integer>, Void>() {
+          @StateId("fizzle")
+          private final StateSpec<ValueState<Void>> voidState = 
StateSpecs.value();
+
+          @ProcessElement
+          public void process() {}
+        }));
+
+    thrown.expectMessage("merging");
+    thrown.expect(UnsupportedOperationException.class);
+    p.run();
+  }
+
+  @Test
+  public void testMergingStatefulRejectedInStreaming() throws Exception {
+    PipelineOptions options = buildPipelineOptions();
+    options.as(StreamingOptions.class).setStreaming(true);
+    verifyMergingStatefulParDoRejected(options);
+  }
+
+  @Test
+  public void testMergingStatefulRejectedInBatch() throws Exception {
+    PipelineOptions options = buildPipelineOptions();
+    options.as(StreamingOptions.class).setStreaming(false);
+    verifyMergingStatefulParDoRejected(options);
+  }
+
   private void testStreamingWriteOverride(PipelineOptions options, int 
expectedNumShards) {
     TestPipeline p = TestPipeline.fromOptions(options);
 

Reply via email to