Repository: incubator-beam Updated Branches: refs/heads/master 9f796e22f -> 351fc3efa
Verify one element per window for DataflowPipelineRunner View.asSingleton This changes the expansion of the DataflowPipelineRunner override for View.asSingleton to provide a useful error message to users if there PCollection contains more than one element per window. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/09a1f600 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/09a1f600 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/09a1f600 Branch: refs/heads/master Commit: 09a1f6009671c16e328b822e1a57b5206aef9bfd Parents: 9f796e2 Author: Luke Cwik <[email protected]> Authored: Tue May 10 13:02:21 2016 -0700 Committer: Luke Cwik <[email protected]> Committed: Wed May 11 10:10:47 2016 -0700 ---------------------------------------------------------------------- .../dataflow/DataflowPipelineRunner.java | 28 +++++++++++++- .../dataflow/DataflowPipelineRunnerTest.java | 39 ++++++++++++++++++++ 2 files changed, 66 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09a1f600/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRunner.java index 4076802..3d3c0ec 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRunner.java @@ -848,14 +848,36 @@ public class DataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob> extends DoFn<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>, IsmRecord<WindowedValue<T>>> { + private final Coder<W> windowCoder; + IsmRecordForSingularValuePerWindowDoFn(Coder<W> windowCoder) { + this.windowCoder = windowCoder; + } + @Override public void processElement(ProcessContext c) throws Exception { + Optional<Object> previousWindowStructuralValue = Optional.absent(); + T previousValue = null; + Iterator<KV<W, WindowedValue<T>>> iterator = c.element().getValue().iterator(); while (iterator.hasNext()) { KV<W, WindowedValue<T>> next = iterator.next(); + Object currentWindowStructuralValue = windowCoder.structuralValue(next.getKey()); + + // Verify that the user isn't trying to have more than one element per window as + // a singleton. + checkState(!previousWindowStructuralValue.isPresent() + || !previousWindowStructuralValue.get().equals(currentWindowStructuralValue), + "Multiple values [%s, %s] found for singleton within window [%s].", + previousValue, + next.getValue().getValue(), + next.getKey()); + c.output( IsmRecord.of( ImmutableList.of(next.getKey()), next.getValue())); + + previousWindowStructuralValue = Optional.of(currentWindowStructuralValue); + previousValue = next.getValue().getValue(); } } } @@ -873,10 +895,14 @@ public class DataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob> @Override public PCollectionView<T> apply(PCollection<T> input) { + @SuppressWarnings("unchecked") + Coder<BoundedWindow> windowCoder = (Coder<BoundedWindow>) + input.getWindowingStrategy().getWindowFn().windowCoder(); + return BatchViewAsSingleton.<T, T, T, BoundedWindow>applyForSingleton( runner, input, - new IsmRecordForSingularValuePerWindowDoFn<T, BoundedWindow>(), + new IsmRecordForSingularValuePerWindowDoFn<T, BoundedWindow>(windowCoder), transform.hasDefaultValue(), transform.defaultValue(), input.getCoder()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09a1f600/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRunnerTest.java index 2993c50..66c2feb 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineRunnerTest.java @@ -41,6 +41,7 @@ import static org.mockito.Mockito.when; import org.apache.beam.runners.dataflow.DataflowPipelineRunner.BatchViewAsList; import org.apache.beam.runners.dataflow.DataflowPipelineRunner.BatchViewAsMap; import org.apache.beam.runners.dataflow.DataflowPipelineRunner.BatchViewAsMultimap; +import org.apache.beam.runners.dataflow.DataflowPipelineRunner.BatchViewAsSingleton; import org.apache.beam.runners.dataflow.DataflowPipelineRunner.TransformedMap; import org.apache.beam.runners.dataflow.internal.IsmFormat; import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecord; @@ -952,6 +953,44 @@ public class DataflowPipelineRunnerTest { } @Test + public void testBatchViewAsSingletonToIsmRecord() throws Exception { + DoFnTester<KV<Integer, Iterable<KV<GlobalWindow, WindowedValue<String>>>>, + IsmRecord<WindowedValue<String>>> doFnTester = + DoFnTester.of( + new BatchViewAsSingleton.IsmRecordForSingularValuePerWindowDoFn + <String, GlobalWindow>(GlobalWindow.Coder.INSTANCE)); + + assertThat( + doFnTester.processBatch( + ImmutableList.of(KV.<Integer, Iterable<KV<GlobalWindow, WindowedValue<String>>>>of( + 0, ImmutableList.of(KV.of(GlobalWindow.INSTANCE, valueInGlobalWindow("a")))))), + contains(IsmRecord.of(ImmutableList.of(GlobalWindow.INSTANCE), valueInGlobalWindow("a")))); + } + + @Test + public void testBatchViewAsSingletonToIsmRecordWithMultipleValuesThrowsException() + throws Exception { + DoFnTester<KV<Integer, Iterable<KV<GlobalWindow, WindowedValue<String>>>>, + IsmRecord<WindowedValue<String>>> doFnTester = + DoFnTester.of( + new BatchViewAsSingleton.IsmRecordForSingularValuePerWindowDoFn + <String, GlobalWindow>(GlobalWindow.Coder.INSTANCE)); + + try { + doFnTester.processBatch( + ImmutableList.of(KV.<Integer, Iterable<KV<GlobalWindow, WindowedValue<String>>>>of( + 0, ImmutableList.of( + KV.of(GlobalWindow.INSTANCE, valueInGlobalWindow("a")), + KV.of(GlobalWindow.INSTANCE, valueInGlobalWindow("b")))))); + fail("Expected UserCodeException"); + } catch (UserCodeException e) { + assertTrue(e.getCause() instanceof IllegalStateException); + IllegalStateException rootCause = (IllegalStateException) e.getCause(); + assertThat(rootCause.getMessage(), containsString("found for singleton within window")); + } + } + + @Test public void testBatchViewAsListToIsmRecordForGlobalWindow() throws Exception { DoFnTester<String, IsmRecord<WindowedValue<String>>> doFnTester = DoFnTester.of(new BatchViewAsList.ToIsmRecordForGlobalWindowDoFn<String>());
