Repository: incubator-beam
Updated Branches:
  refs/heads/master 9f796e22f -> 351fc3efa


Verify one element per window for DataflowPipelineRunner View.asSingleton

This changes the expansion of the DataflowPipelineRunner override for
View.asSingleton to provide a useful error message to users if there
PCollection contains more than one element per window.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/09a1f600
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/09a1f600
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/09a1f600

Branch: refs/heads/master
Commit: 09a1f6009671c16e328b822e1a57b5206aef9bfd
Parents: 9f796e2
Author: Luke Cwik <[email protected]>
Authored: Tue May 10 13:02:21 2016 -0700
Committer: Luke Cwik <[email protected]>
Committed: Wed May 11 10:10:47 2016 -0700

----------------------------------------------------------------------
 .../dataflow/DataflowPipelineRunner.java        | 28 +++++++++++++-
 .../dataflow/DataflowPipelineRunnerTest.java    | 39 ++++++++++++++++++++
 2 files changed, 66 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09a1f600/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRunner.java
index 4076802..3d3c0ec 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRunner.java
@@ -848,14 +848,36 @@ public class DataflowPipelineRunner extends 
PipelineRunner<DataflowPipelineJob>
         extends DoFn<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>,
                      IsmRecord<WindowedValue<T>>> {
 
+      private final Coder<W> windowCoder;
+      IsmRecordForSingularValuePerWindowDoFn(Coder<W> windowCoder) {
+        this.windowCoder = windowCoder;
+      }
+
       @Override
       public void processElement(ProcessContext c) throws Exception {
+        Optional<Object> previousWindowStructuralValue = Optional.absent();
+        T previousValue = null;
+
         Iterator<KV<W, WindowedValue<T>>> iterator = 
c.element().getValue().iterator();
         while (iterator.hasNext()) {
           KV<W, WindowedValue<T>> next = iterator.next();
+          Object currentWindowStructuralValue = 
windowCoder.structuralValue(next.getKey());
+
+          // Verify that the user isn't trying to have more than one element 
per window as
+          // a singleton.
+          checkState(!previousWindowStructuralValue.isPresent()
+              || 
!previousWindowStructuralValue.get().equals(currentWindowStructuralValue),
+              "Multiple values [%s, %s] found for singleton within window 
[%s].",
+              previousValue,
+              next.getValue().getValue(),
+              next.getKey());
+
           c.output(
               IsmRecord.of(
                   ImmutableList.of(next.getKey()), next.getValue()));
+
+          previousWindowStructuralValue = 
Optional.of(currentWindowStructuralValue);
+          previousValue = next.getValue().getValue();
         }
       }
     }
@@ -873,10 +895,14 @@ public class DataflowPipelineRunner extends 
PipelineRunner<DataflowPipelineJob>
 
     @Override
     public PCollectionView<T> apply(PCollection<T> input) {
+      @SuppressWarnings("unchecked")
+      Coder<BoundedWindow> windowCoder = (Coder<BoundedWindow>)
+          input.getWindowingStrategy().getWindowFn().windowCoder();
+
       return BatchViewAsSingleton.<T, T, T, BoundedWindow>applyForSingleton(
           runner,
           input,
-          new IsmRecordForSingularValuePerWindowDoFn<T, BoundedWindow>(),
+          new IsmRecordForSingularValuePerWindowDoFn<T, 
BoundedWindow>(windowCoder),
           transform.hasDefaultValue(),
           transform.defaultValue(),
           input.getCoder());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09a1f600/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRunnerTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRunnerTest.java
index 2993c50..66c2feb 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRunnerTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRunnerTest.java
@@ -41,6 +41,7 @@ import static org.mockito.Mockito.when;
 import org.apache.beam.runners.dataflow.DataflowPipelineRunner.BatchViewAsList;
 import org.apache.beam.runners.dataflow.DataflowPipelineRunner.BatchViewAsMap;
 import 
org.apache.beam.runners.dataflow.DataflowPipelineRunner.BatchViewAsMultimap;
+import 
org.apache.beam.runners.dataflow.DataflowPipelineRunner.BatchViewAsSingleton;
 import org.apache.beam.runners.dataflow.DataflowPipelineRunner.TransformedMap;
 import org.apache.beam.runners.dataflow.internal.IsmFormat;
 import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecord;
@@ -952,6 +953,44 @@ public class DataflowPipelineRunnerTest {
   }
 
   @Test
+  public void testBatchViewAsSingletonToIsmRecord() throws Exception {
+    DoFnTester<KV<Integer, Iterable<KV<GlobalWindow, WindowedValue<String>>>>,
+               IsmRecord<WindowedValue<String>>> doFnTester =
+               DoFnTester.of(
+                   new 
BatchViewAsSingleton.IsmRecordForSingularValuePerWindowDoFn
+                   <String, GlobalWindow>(GlobalWindow.Coder.INSTANCE));
+
+    assertThat(
+        doFnTester.processBatch(
+            ImmutableList.of(KV.<Integer, Iterable<KV<GlobalWindow, 
WindowedValue<String>>>>of(
+                0, ImmutableList.of(KV.of(GlobalWindow.INSTANCE, 
valueInGlobalWindow("a")))))),
+        contains(IsmRecord.of(ImmutableList.of(GlobalWindow.INSTANCE), 
valueInGlobalWindow("a"))));
+  }
+
+  @Test
+  public void 
testBatchViewAsSingletonToIsmRecordWithMultipleValuesThrowsException()
+      throws Exception {
+    DoFnTester<KV<Integer, Iterable<KV<GlobalWindow, WindowedValue<String>>>>,
+    IsmRecord<WindowedValue<String>>> doFnTester =
+    DoFnTester.of(
+        new BatchViewAsSingleton.IsmRecordForSingularValuePerWindowDoFn
+        <String, GlobalWindow>(GlobalWindow.Coder.INSTANCE));
+
+    try {
+      doFnTester.processBatch(
+          ImmutableList.of(KV.<Integer, Iterable<KV<GlobalWindow, 
WindowedValue<String>>>>of(
+              0, ImmutableList.of(
+                  KV.of(GlobalWindow.INSTANCE, valueInGlobalWindow("a")),
+                  KV.of(GlobalWindow.INSTANCE, valueInGlobalWindow("b"))))));
+      fail("Expected UserCodeException");
+    } catch (UserCodeException e) {
+      assertTrue(e.getCause() instanceof IllegalStateException);
+      IllegalStateException rootCause = (IllegalStateException) e.getCause();
+      assertThat(rootCause.getMessage(), containsString("found for singleton 
within window"));
+    }
+  }
+
+  @Test
   public void testBatchViewAsListToIsmRecordForGlobalWindow() throws Exception 
{
     DoFnTester<String, IsmRecord<WindowedValue<String>>> doFnTester =
         DoFnTester.of(new 
BatchViewAsList.ToIsmRecordForGlobalWindowDoFn<String>());

Reply via email to