Switch Spark streaming tests to custom assertions

The current use of PAssert in the streaming tests for
the Spark runner work via coincidence. PAssert does not
truly support non-global windowing. The switch from
side inputs to GBK, with no change in semantics but hopefully
an easier on-ramp for new runners, incidentally broke
these tests. Soon, PAssert will support windowing, triggers,
and unbounded PCollections. Until then, this change
writes a slightly custom assertion transform for these tests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f222df10
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f222df10
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f222df10

Branch: refs/heads/master
Commit: f222df109e773f23e56f9e830454356893989a15
Parents: 77aa093
Author: Kenneth Knowles <[email protected]>
Authored: Wed Jun 8 18:11:07 2016 -0700
Committer: Kenneth Knowles <[email protected]>
Committed: Thu Jun 9 14:41:09 2016 -0700

----------------------------------------------------------------------
 .../streaming/FlattenStreamingTest.java         |  7 +---
 .../streaming/KafkaStreamingTest.java           | 13 ++-----
 .../streaming/SimpleStreamingWordCountTest.java | 18 +++-------
 .../streaming/utils/PAssertStreaming.java       | 36 +++++++++++++++++++-
 4 files changed, 43 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f222df10/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
index 15b2f39..976c7c2 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
@@ -25,9 +25,7 @@ import 
org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreamin
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.PCollection;
@@ -77,13 +75,10 @@ public class FlattenStreamingTest {
     PCollectionList<String> list = 
PCollectionList.of(windowedW1).and(windowedW2);
     PCollection<String> union = list.apply(Flatten.<String>pCollections());
 
-    PAssert.thatIterable(union.apply(View.<String>asIterable()))
-            .containsInAnyOrder(EXPECTED_UNION);
+    PAssertStreaming.assertContents(union, EXPECTED_UNION);
 
     EvaluationResult res = SparkPipelineRunner.create(options).run(p);
     res.close();
-
-    PAssertStreaming.assertNoFailures(res);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f222df10/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
index fd75e74..53293fb 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
@@ -27,17 +27,14 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
 
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -52,7 +49,6 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Set;
 
 import kafka.serializer.StringDecoder;
 /**
@@ -68,9 +64,7 @@ public class KafkaStreamingTest {
   private static final Map<String, String> KAFKA_MESSAGES = ImmutableMap.of(
       "k1", "v1", "k2", "v2", "k3", "v3", "k4", "v4"
   );
-  private static final Set<String> EXPECTED = ImmutableSet.of(
-      "k1,v1", "k2,v2", "k3,v3", "k4,v4"
-  );
+  private static final String[] EXPECTED = {"k1,v1", "k2,v2", "k3,v3", 
"k4,v4"};
   private static final long TEST_TIMEOUT_MSEC = 1000L;
 
   @BeforeClass
@@ -116,13 +110,10 @@ public class KafkaStreamingTest {
 
     PCollection<String> formattedKV = windowedWords.apply(ParDo.of(new 
FormatKVFn()));
 
-    PAssert.thatIterable(formattedKV.apply(View.<String>asIterable()))
-        .containsInAnyOrder(EXPECTED);
+    PAssertStreaming.assertContents(formattedKV, EXPECTED);
 
     EvaluationResult res = SparkPipelineRunner.create(options).run(p);
     res.close();
-
-    PAssertStreaming.assertNoFailures(res);
   }
 
   @AfterClass

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f222df10/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
index 28133ca..6dc9a08 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.spark.translation.streaming;
 
+
 import org.apache.beam.runners.spark.EvaluationResult;
 import org.apache.beam.runners.spark.SimpleWordCountTest;
 import org.apache.beam.runners.spark.SparkPipelineRunner;
@@ -26,33 +27,28 @@ import 
org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreamin
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.PCollection;
 
-import com.google.common.collect.ImmutableSet;
-
 import org.joda.time.Duration;
 import org.junit.Test;
 
+import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.Set;
 
 /**
  * Simple word count streaming test.
  */
-public class SimpleStreamingWordCountTest {
+public class SimpleStreamingWordCountTest implements Serializable {
 
   private static final String[] WORDS_ARRAY = {
       "hi there", "hi", "hi sue bob", "hi sue", "", "bob hi"};
   private static final List<Iterable<String>> WORDS_QUEUE =
       Collections.<Iterable<String>>singletonList(Arrays.asList(WORDS_ARRAY));
-  private static final Set<String> EXPECTED_COUNT_SET =
-      ImmutableSet.of("hi: 5", "there: 1", "sue: 2", "bob: 2");
+  private static final String[] EXPECTED_COUNTS = {"hi: 5", "there: 1", "sue: 
2", "bob: 2"};
   private static final long TEST_TIMEOUT_MSEC = 1000L;
 
   @Test
@@ -71,12 +67,8 @@ public class SimpleStreamingWordCountTest {
 
     PCollection<String> output = windowedWords.apply(new 
SimpleWordCountTest.CountWords());
 
-    PAssert.thatIterable(output.apply(View.<String>asIterable()))
-        .containsInAnyOrder(EXPECTED_COUNT_SET);
-
+    PAssertStreaming.assertContents(output, EXPECTED_COUNTS);
     EvaluationResult res = SparkPipelineRunner.create(options).run(p);
     res.close();
-
-    PAssertStreaming.assertNoFailures(res);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f222df10/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
index 3d8fc32..f85c440 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
@@ -17,15 +17,26 @@
  */
 package org.apache.beam.runners.spark.translation.streaming.utils;
 
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
+
 import org.apache.beam.runners.spark.EvaluationResult;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.values.PCollection;
 
 import org.junit.Assert;
 
+import java.io.Serializable;
+
 /**
  * Since PAssert doesn't propagate assert exceptions, use Aggregators to 
assert streaming
  * success/failure counters.
  */
-public final class PAssertStreaming {
+public final class PAssertStreaming implements Serializable {
 
   /**
    * Copied aggregator names from {@link org.apache.beam.sdk.testing.PAssert}.
@@ -40,4 +51,27 @@ public final class PAssertStreaming {
     int failures = res.getAggregatorValue(FAILURE_COUNTER, Integer.class);
     Assert.assertEquals("Found " + failures + " failures, see the log for 
details", 0, failures);
   }
+
+  /**
+   * Adds a pipeline run-time assertion that the contents of {@code actual} 
are {@code expected}.
+   * Note that it is oblivious to windowing, so the assertion will apply 
indiscriminately to all
+   * windows.
+   */
+  public static <T> void assertContents(PCollection<T> actual, final T[] 
expected) {
+    // Because PAssert does not support non-global windowing, but all our data 
is in one window,
+    // we set up the assertion directly.
+    actual
+        .apply(WithKeys.<String, T>of("dummy"))
+        .apply(GroupByKey.<String, T>create())
+        .apply(Values.<Iterable<T>>create())
+        .apply(
+            MapElements.via(
+                new SimpleFunction<Iterable<T>, Void>() {
+                  @Override
+                  public Void apply(Iterable<T> input) {
+                    assertThat(input, containsInAnyOrder(expected));
+                    return null;
+                  }
+                }));
+  }
 }

Reply via email to