Repository: incubator-beam Updated Branches: refs/heads/master 49d82baf1 -> 706fc5376
[BEAM-109] fix support for FixedWindows and SlidingWindows in batch [BEAM-109] Better testing for FixedWindows and SlidingWindows [BEAM-109] lower counts is unordered so better to compare entire result and not just iterator head Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/18242651 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/18242651 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/18242651 Branch: refs/heads/master Commit: 182426516c5184e126888d3b465dc41a1bb312aa Parents: 49d82ba Author: Sela <[email protected]> Authored: Sat Mar 19 12:25:02 2016 +0200 Committer: Sela <[email protected]> Committed: Sat Mar 26 18:58:33 2016 -0700 ---------------------------------------------------------------------- .../spark/translation/TransformTranslator.java | 22 ++++--- .../translation/MultiOutputWordCountTest.java | 19 +++++- .../translation/WindowedWordCountTest.java | 63 ++++++++++++++++---- 3 files changed, 82 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/18242651/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java index 0bd047c..7f72235 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java @@ -30,6 +30,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import com.google.api.client.util.Lists; import com.google.api.client.util.Maps; import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException; import com.google.cloud.dataflow.sdk.coders.Coder; @@ -79,6 +80,7 @@ import org.apache.spark.api.java.JavaRDDLike; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; @@ -267,15 +269,21 @@ public final class TransformTranslator { // Key has to bw windowed in order to group by window as well JavaPairRDD<WindowedValue<K>, WindowedValue<KV<K, VI>>> inRddDuplicatedKeyPair = - inRdd.mapToPair( - new PairFunction<WindowedValue<KV<K, VI>>, WindowedValue<K>, + inRdd.flatMapToPair( + new PairFlatMapFunction<WindowedValue<KV<K, VI>>, WindowedValue<K>, WindowedValue<KV<K, VI>>>() { @Override - public Tuple2<WindowedValue<K>, - WindowedValue<KV<K, VI>>> call(WindowedValue<KV<K, VI>> kv) { - WindowedValue<K> wk = WindowedValue.of(kv.getValue().getKey(), - kv.getTimestamp(), kv.getWindows(), kv.getPane()); - return new Tuple2<>(wk, kv); + public Iterable<Tuple2<WindowedValue<K>, + WindowedValue<KV<K, VI>>>> call(WindowedValue<KV<K, VI>> kv) { + List<Tuple2<WindowedValue<K>, + WindowedValue<KV<K, VI>>>> tuple2s = + Lists.newArrayListWithCapacity(kv.getWindows().size()); + for (BoundedWindow boundedWindow: kv.getWindows()) { + WindowedValue<K> wk = WindowedValue.of(kv.getValue().getKey(), + boundedWindow.maxTimestamp(), boundedWindow, kv.getPane()); + tuple2s.add(new Tuple2<>(wk, kv)); + } + return tuple2s; } }); //-- windowed coders http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/18242651/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java index 8ab3798..974467f 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java @@ -22,14 +22,18 @@ import com.google.cloud.dataflow.sdk.Pipeline; import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; import com.google.cloud.dataflow.sdk.runners.AggregatorValues; +import com.google.cloud.dataflow.sdk.testing.DataflowAssert; import com.google.cloud.dataflow.sdk.transforms.*; import com.google.cloud.dataflow.sdk.values.*; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import org.apache.beam.runners.spark.EvaluationResult; import org.apache.beam.runners.spark.SparkPipelineRunner; import org.junit.Assert; import org.junit.Test; +import java.util.Set; + public class MultiOutputWordCountTest { private static final TupleTag<String> upper = new TupleTag<>(); @@ -37,6 +41,10 @@ public class MultiOutputWordCountTest { private static final TupleTag<KV<String, Long>> lowerCnts = new TupleTag<>(); private static final TupleTag<KV<String, Long>> upperCnts = new TupleTag<>(); + private static final Set<String> EXPECTED_LOWER_COUNTS = + ImmutableSet.of("are: 2", "some: 3", "words: 3", "more: 2", "to: 1", "count: 1", "and: 2", + "even: 1", "others: 1"); + @Test public void testRun() throws Exception { Pipeline p = Pipeline.create(PipelineOptionsFactory.create()); @@ -53,8 +61,8 @@ public class MultiOutputWordCountTest { ApproximateUnique.<KV<String, Long>>globally(16)); EvaluationResult res = SparkPipelineRunner.create().run(p); - Iterable<KV<String, Long>> actualLower = res.get(luc.get(lowerCnts)); - Assert.assertEquals("are", actualLower.iterator().next().getKey()); + DataflowAssert.that(luc.get(lowerCnts).apply(ParDo.of(new FormatCountsFn()))) + .containsInAnyOrder(EXPECTED_LOWER_COUNTS); Iterable<KV<String, Long>> actualUpper = res.get(luc.get(upperCnts)); Assert.assertEquals("Here", actualUpper.iterator().next().getKey()); Iterable<Long> actualUniqCount = res.get(unique); @@ -134,4 +142,11 @@ public class MultiOutputWordCountTest { return extractWordsFn.totalWords; } } + + private static class FormatCountsFn extends DoFn<KV<String, Long>, String> { + @Override + public void processElement(ProcessContext c) { + c.output(c.element().getKey() + ": " + c.element().getValue()); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/18242651/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java index 9f29a37..9fac9c6 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java @@ -24,6 +24,7 @@ import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; import com.google.cloud.dataflow.sdk.testing.DataflowAssert; import com.google.cloud.dataflow.sdk.transforms.Create; import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; +import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows; import com.google.cloud.dataflow.sdk.transforms.windowing.Window; import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.common.collect.ImmutableList; @@ -39,30 +40,66 @@ import org.junit.Test; public class WindowedWordCountTest { private static final String[] WORDS_ARRAY = { - "hi there", "hi", "hi sue bob", - "hi sue", "", "bob hi"}; + "hi there", "hi", "hi sue bob", "hi sue", "", "bob hi"}; private static final Long[] TIMESTAMPS_ARRAY = { - 60000L, 60000L, 60000L, - 120000L, 120000L, 120000L}; + 60000L, 60000L, 60000L, 179000L, 179000L, 179000L}; private static final List<String> WORDS = Arrays.asList(WORDS_ARRAY); private static final List<Long> TIMESTAMPS = Arrays.asList(TIMESTAMPS_ARRAY); - private static final List<String> EXPECTED_COUNT_SET = - ImmutableList.of("hi: 3", "there: 1", "sue: 1", "bob: 1", - "hi: 2", "sue: 1", "bob: 1"); + + private static final List<String> EXPECTED_FIXED_SEPARATE_COUNT_SET = + ImmutableList.of("hi: 3", "there: 1", "sue: 1", "bob: 1", "hi: 2", "sue: 1", "bob: 1"); + + @Test + public void testFixed() throws Exception { + Pipeline p = Pipeline.create(PipelineOptionsFactory.create()); + PCollection<String> inputWords = + p.apply(Create.timestamped(WORDS, TIMESTAMPS)).setCoder(StringUtf8Coder.of()); + PCollection<String> windowedWords = + inputWords.apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))); + + PCollection<String> output = windowedWords.apply(new SimpleWordCountTest.CountWords()); + + DataflowAssert.that(output).containsInAnyOrder(EXPECTED_FIXED_SEPARATE_COUNT_SET); + + EvaluationResult res = SparkPipelineRunner.create().run(p); + res.close(); + } + + private static final List<String> EXPECTED_FIXED_SAME_COUNT_SET = + ImmutableList.of("hi: 5", "there: 1", "sue: 2", "bob: 2"); + + @Test + public void testFixed2() throws Exception { + Pipeline p = Pipeline.create(PipelineOptionsFactory.create()); + PCollection<String> inputWords = p.apply(Create.timestamped(WORDS, TIMESTAMPS)) + .setCoder(StringUtf8Coder.of()); + PCollection<String> windowedWords = inputWords + .apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(5)))); + + PCollection<String> output = windowedWords.apply(new SimpleWordCountTest.CountWords()); + + DataflowAssert.that(output).containsInAnyOrder(EXPECTED_FIXED_SAME_COUNT_SET); + + EvaluationResult res = SparkPipelineRunner.create().run(p); + res.close(); + } + + private static final List<String> EXPECTED_SLIDING_COUNT_SET = + ImmutableList.of("hi: 3", "there: 1", "sue: 1", "bob: 1", "hi: 5", "there: 1", "sue: 2", + "bob: 2", "hi: 2", "sue: 1", "bob: 1"); @Test - public void testRun() throws Exception { - SparkPipelineOptions options = SparkPipelineOptionsFactory.create(); - options.setRunner(SparkPipelineRunner.class); + public void testSliding() throws Exception { Pipeline p = Pipeline.create(PipelineOptionsFactory.create()); PCollection<String> inputWords = p.apply(Create.timestamped(WORDS, TIMESTAMPS)) - .setCoder(StringUtf8Coder.of()); + .setCoder(StringUtf8Coder.of()); PCollection<String> windowedWords = inputWords - .apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))); + .apply(Window.<String>into(SlidingWindows.of(Duration.standardMinutes(2)) + .every(Duration.standardMinutes(1)))); PCollection<String> output = windowedWords.apply(new SimpleWordCountTest.CountWords()); - DataflowAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET); + DataflowAssert.that(output).containsInAnyOrder(EXPECTED_SLIDING_COUNT_SET); EvaluationResult res = SparkPipelineRunner.create().run(p); res.close();
