Switch Spark streaming tests to custom assertions The current use of PAssert in the streaming tests for the Spark runner work via coincidence. PAssert does not truly support non-global windowing. The switch from side inputs to GBK, with no change in semantics but hopefully an easier on-ramp for new runners, incidentally broke these tests. Soon, PAssert will support windowing, triggers, and unbounded PCollections. Until then, this change writes a slightly custom assertion transform for these tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f222df10 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f222df10 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f222df10 Branch: refs/heads/master Commit: f222df109e773f23e56f9e830454356893989a15 Parents: 77aa093 Author: Kenneth Knowles <[email protected]> Authored: Wed Jun 8 18:11:07 2016 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Thu Jun 9 14:41:09 2016 -0700 ---------------------------------------------------------------------- .../streaming/FlattenStreamingTest.java | 7 +--- .../streaming/KafkaStreamingTest.java | 13 ++----- .../streaming/SimpleStreamingWordCountTest.java | 18 +++------- .../streaming/utils/PAssertStreaming.java | 36 +++++++++++++++++++- 4 files changed, 43 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f222df10/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java index 15b2f39..976c7c2 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java @@ -25,9 +25,7 @@ import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreamin import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Flatten; -import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.PCollection; @@ -77,13 +75,10 @@ public class FlattenStreamingTest { PCollectionList<String> list = PCollectionList.of(windowedW1).and(windowedW2); PCollection<String> union = list.apply(Flatten.<String>pCollections()); - PAssert.thatIterable(union.apply(View.<String>asIterable())) - .containsInAnyOrder(EXPECTED_UNION); + PAssertStreaming.assertContents(union, EXPECTED_UNION); EvaluationResult res = SparkPipelineRunner.create(options).run(p); res.close(); - - PAssertStreaming.assertNoFailures(res); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f222df10/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java index fd75e74..53293fb 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java @@ -27,17 +27,14 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -52,7 +49,6 @@ import java.io.IOException; import java.util.Collections; import java.util.Map; import java.util.Properties; -import java.util.Set; import kafka.serializer.StringDecoder; /** @@ -68,9 +64,7 @@ public class KafkaStreamingTest { private static final Map<String, String> KAFKA_MESSAGES = ImmutableMap.of( "k1", "v1", "k2", "v2", "k3", "v3", "k4", "v4" ); - private static final Set<String> EXPECTED = ImmutableSet.of( - "k1,v1", "k2,v2", "k3,v3", "k4,v4" - ); + private static final String[] EXPECTED = {"k1,v1", "k2,v2", "k3,v3", "k4,v4"}; private static final long TEST_TIMEOUT_MSEC = 1000L; @BeforeClass @@ -116,13 +110,10 @@ public class KafkaStreamingTest { PCollection<String> formattedKV = windowedWords.apply(ParDo.of(new FormatKVFn())); - PAssert.thatIterable(formattedKV.apply(View.<String>asIterable())) - .containsInAnyOrder(EXPECTED); + PAssertStreaming.assertContents(formattedKV, EXPECTED); EvaluationResult res = SparkPipelineRunner.create(options).run(p); res.close(); - - PAssertStreaming.assertNoFailures(res); } @AfterClass http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f222df10/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java index 28133ca..6dc9a08 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.spark.translation.streaming; + import org.apache.beam.runners.spark.EvaluationResult; import org.apache.beam.runners.spark.SimpleWordCountTest; import org.apache.beam.runners.spark.SparkPipelineRunner; @@ -26,33 +27,28 @@ import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreamin import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.PCollection; -import com.google.common.collect.ImmutableSet; - import org.joda.time.Duration; import org.junit.Test; +import java.io.Serializable; import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.Set; /** * Simple word count streaming test. */ -public class SimpleStreamingWordCountTest { +public class SimpleStreamingWordCountTest implements Serializable { private static final String[] WORDS_ARRAY = { "hi there", "hi", "hi sue bob", "hi sue", "", "bob hi"}; private static final List<Iterable<String>> WORDS_QUEUE = Collections.<Iterable<String>>singletonList(Arrays.asList(WORDS_ARRAY)); - private static final Set<String> EXPECTED_COUNT_SET = - ImmutableSet.of("hi: 5", "there: 1", "sue: 2", "bob: 2"); + private static final String[] EXPECTED_COUNTS = {"hi: 5", "there: 1", "sue: 2", "bob: 2"}; private static final long TEST_TIMEOUT_MSEC = 1000L; @Test @@ -71,12 +67,8 @@ public class SimpleStreamingWordCountTest { PCollection<String> output = windowedWords.apply(new SimpleWordCountTest.CountWords()); - PAssert.thatIterable(output.apply(View.<String>asIterable())) - .containsInAnyOrder(EXPECTED_COUNT_SET); - + PAssertStreaming.assertContents(output, EXPECTED_COUNTS); EvaluationResult res = SparkPipelineRunner.create(options).run(p); res.close(); - - PAssertStreaming.assertNoFailures(res); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f222df10/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java index 3d8fc32..f85c440 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java @@ -17,15 +17,26 @@ */ package org.apache.beam.runners.spark.translation.streaming.utils; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertThat; + import org.apache.beam.runners.spark.EvaluationResult; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.Values; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.values.PCollection; import org.junit.Assert; +import java.io.Serializable; + /** * Since PAssert doesn't propagate assert exceptions, use Aggregators to assert streaming * success/failure counters. */ -public final class PAssertStreaming { +public final class PAssertStreaming implements Serializable { /** * Copied aggregator names from {@link org.apache.beam.sdk.testing.PAssert}. @@ -40,4 +51,27 @@ public final class PAssertStreaming { int failures = res.getAggregatorValue(FAILURE_COUNTER, Integer.class); Assert.assertEquals("Found " + failures + " failures, see the log for details", 0, failures); } + + /** + * Adds a pipeline run-time assertion that the contents of {@code actual} are {@code expected}. + * Note that it is oblivious to windowing, so the assertion will apply indiscriminately to all + * windows. + */ + public static <T> void assertContents(PCollection<T> actual, final T[] expected) { + // Because PAssert does not support non-global windowing, but all our data is in one window, + // we set up the assertion directly. + actual + .apply(WithKeys.<String, T>of("dummy")) + .apply(GroupByKey.<String, T>create()) + .apply(Values.<Iterable<T>>create()) + .apply( + MapElements.via( + new SimpleFunction<Iterable<T>, Void>() { + @Override + public Void apply(Iterable<T> input) { + assertThat(input, containsInAnyOrder(expected)); + return null; + } + })); + } }
