Repository: beam Updated Branches: refs/heads/jstorm-runner 0a05de365 -> e00e0e841
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/CoGroupByKeyTest.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/CoGroupByKeyTest.java b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/CoGroupByKeyTest.java new file mode 100644 index 0000000..548fb20 --- /dev/null +++ b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/CoGroupByKeyTest.java @@ -0,0 +1,302 @@ +package org.apache.beam.runners.jstorm.translation.translator; + +import org.apache.beam.runners.jstorm.StormPipelineOptions; +import org.apache.beam.runners.jstorm.TestJStormRunner; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.BigEndianIntegerCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.ValidatesRunner; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.join.CoGbkResult; +import org.apache.beam.sdk.transforms.join.CoGroupByKey; +import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TupleTag; +import org.joda.time.Duration; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +@RunWith(JUnit4.class) +public class CoGroupByKeyTest implements Serializable { + /** + * Converts the given list into a PCollection belonging to the provided + * Pipeline in such a way that coder inference needs to be performed. + */ + private PCollection<KV<Integer, String>> createInput(String name, + Pipeline p, List<KV<Integer, String>> list) { + return createInput(name, p, list, new ArrayList<Long>()); + } + + /** + * Converts the given list with timestamps into a PCollection. + */ + private PCollection<KV<Integer, String>> createInput(String name, + Pipeline p, List<KV<Integer, String>> list, List<Long> timestamps) { + PCollection<KV<Integer, String>> input; + if (timestamps.isEmpty()) { + input = p.apply("Create" + name, Create.of(list) + .withCoder(KvCoder.of(BigEndianIntegerCoder.of(), StringUtf8Coder.of()))); + } else { + input = p.apply("Create" + name, Create.timestamped(list, timestamps) + .withCoder(KvCoder.of(BigEndianIntegerCoder.of(), StringUtf8Coder.of()))); + } + return input.apply( + "Identity" + name, + ParDo.of( + new DoFn<KV<Integer, String>, KV<Integer, String>>() { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(c.element()); + } + })); + } + + /** + * Returns a {@code PCollection<KV<Integer, CoGbkResult>>} containing the result + * of a {@link CoGroupByKey} over 2 {@code PCollection<KV<Integer, String>>}, + * where each {@link PCollection} has no duplicate keys and the key sets of + * each {@link PCollection} are intersecting but neither is a subset of the other. + */ + private PCollection<KV<Integer, CoGbkResult>> buildGetOnlyGbk( + Pipeline p, + TupleTag<String> tag1, + TupleTag<String> tag2) { + List<KV<Integer, String>> list1 = + Arrays.asList( + KV.of(1, "collection1-1"), + KV.of(2, "collection1-2")); + List<KV<Integer, String>> list2 = + Arrays.asList( + KV.of(2, "collection2-2"), + KV.of(3, "collection2-3")); + PCollection<KV<Integer, String>> collection1 = createInput("CreateList1", p, list1); + PCollection<KV<Integer, String>> collection2 = createInput("CreateList2", p, list2); + PCollection<KV<Integer, CoGbkResult>> coGbkResults = + KeyedPCollectionTuple.of(tag1, collection1) + .and(tag2, collection2) + .apply(CoGroupByKey.<Integer>create()); + return coGbkResults; + } + + @Test + @Category(ValidatesRunner.class) + public void testCoGroupByKeyGetOnly() { + StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class); + options.setRunner(TestJStormRunner.class); + options.setLocalMode(true); + + Pipeline p = Pipeline.create(options); + + final TupleTag<String> tag1 = new TupleTag<>(); + final TupleTag<String> tag2 = new TupleTag<>(); + + PCollection<KV<Integer, CoGbkResult>> coGbkResults = + buildGetOnlyGbk(p, tag1, tag2); + + PAssert.thatMap(coGbkResults).satisfies( + new SerializableFunction<Map<Integer, CoGbkResult>, Void>() { + @Override + public Void apply(Map<Integer, CoGbkResult> results) { + assertEquals("collection1-1", results.get(1).getOnly(tag1)); + assertEquals("collection1-2", results.get(2).getOnly(tag1)); + assertEquals("collection2-2", results.get(2).getOnly(tag2)); + assertEquals("collection2-3", results.get(3).getOnly(tag2)); + return null; + } + }); + + p.run(); + } + + /** + * Returns a {@code PCollection<KV<Integer, CoGbkResult>>} containing the + * results of the {@code CoGroupByKey} over three + * {@code PCollection<KV<Integer, String>>}, each of which correlates + * a customer id to purchases, addresses, or names, respectively. + */ + private PCollection<KV<Integer, CoGbkResult>> buildPurchasesCoGbk( + Pipeline p, + TupleTag<String> purchasesTag, + TupleTag<String> addressesTag, + TupleTag<String> namesTag) { + List<KV<Integer, String>> idToPurchases = + Arrays.asList( + KV.of(2, "Boat"), + KV.of(1, "Shoes"), + KV.of(3, "Car"), + KV.of(1, "Book"), + KV.of(10, "Pens"), + KV.of(8, "House"), + KV.of(4, "Suit"), + KV.of(11, "House"), + KV.of(14, "Shoes"), + KV.of(2, "Suit"), + KV.of(8, "Suit Case"), + KV.of(3, "House")); + + List<KV<Integer, String>> idToAddress = + Arrays.asList( + KV.of(2, "53 S. 3rd"), + KV.of(10, "383 Jackson Street"), + KV.of(20, "3 W. Arizona"), + KV.of(3, "29 School Rd"), + KV.of(8, "6 Watling Rd")); + + List<KV<Integer, String>> idToName = + Arrays.asList( + KV.of(1, "John Smith"), + KV.of(2, "Sally James"), + KV.of(8, "Jeffery Spalding"), + KV.of(20, "Joan Lichtfield")); + + PCollection<KV<Integer, String>> purchasesTable = + createInput("CreateIdToPurchases", p, idToPurchases); + + PCollection<KV<Integer, String>> addressTable = + createInput("CreateIdToAddress", p, idToAddress); + + PCollection<KV<Integer, String>> nameTable = + createInput("CreateIdToName", p, idToName); + + PCollection<KV<Integer, CoGbkResult>> coGbkResults = + KeyedPCollectionTuple.of(namesTag, nameTable) + .and(addressesTag, addressTable) + .and(purchasesTag, purchasesTable) + .apply(CoGroupByKey.<Integer>create()); + return coGbkResults; + } + + /** + * Returns a {@code PCollection<KV<Integer, CoGbkResult>>} containing the + * results of the {@code CoGroupByKey} over 2 {@code PCollection<KV<Integer, String>>}, + * each of which correlates a customer id to clicks, purchases, respectively. + */ + private PCollection<KV<Integer, CoGbkResult>> buildPurchasesCoGbkWithWindowing( + Pipeline p, + TupleTag<String> clicksTag, + TupleTag<String> purchasesTag) { + List<KV<Integer, String>> idToClick = + Arrays.asList( + KV.of(1, "Click t0"), + KV.of(2, "Click t2"), + KV.of(1, "Click t4"), + KV.of(1, "Click t6"), + KV.of(2, "Click t8")); + + List<KV<Integer, String>> idToPurchases = + Arrays.asList( + KV.of(1, "Boat t1"), + KV.of(1, "Shoesi t2"), + KV.of(1, "Pens t3"), + KV.of(2, "House t4"), + KV.of(2, "Suit t5"), + KV.of(1, "Car t6"), + KV.of(1, "Book t7"), + KV.of(2, "House t8"), + KV.of(2, "Shoes t9"), + KV.of(2, "House t10")); + + PCollection<KV<Integer, String>> clicksTable = + createInput("CreateClicks", + p, + idToClick, + Arrays.asList(0L, 2L, 4L, 6L, 8L)) + .apply("WindowClicks", Window.<KV<Integer, String>>into( + FixedWindows.of(new Duration(4))) + .withTimestampCombiner(TimestampCombiner.EARLIEST)); + + PCollection<KV<Integer, String>> purchasesTable = + createInput("CreatePurchases", + p, + idToPurchases, + Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L)) + .apply("WindowPurchases", Window.<KV<Integer, String>>into( + FixedWindows.of(new Duration(4))) + .withTimestampCombiner(TimestampCombiner.EARLIEST)); + + PCollection<KV<Integer, CoGbkResult>> coGbkResults = + KeyedPCollectionTuple.of(clicksTag, clicksTable) + .and(purchasesTag, purchasesTable) + .apply(CoGroupByKey.<Integer>create()); + return coGbkResults; + } + + @Test + @Category(ValidatesRunner.class) + public void testCoGroupByKey() { + StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class); + options.setRunner(TestJStormRunner.class); + options.setLocalMode(true); + + Pipeline p = Pipeline.create(options); + + final TupleTag<String> namesTag = new TupleTag<>(); + final TupleTag<String> addressesTag = new TupleTag<>(); + final TupleTag<String> purchasesTag = new TupleTag<>(); + + + PCollection<KV<Integer, CoGbkResult>> coGbkResults = + buildPurchasesCoGbk(p, purchasesTag, addressesTag, namesTag); + + PAssert.thatMap(coGbkResults).satisfies( + new SerializableFunction<Map<Integer, CoGbkResult>, Void>() { + @Override + public Void apply(Map<Integer, CoGbkResult> results) { + CoGbkResult result1 = results.get(1); + assertEquals("John Smith", result1.getOnly(namesTag)); + assertThat(result1.getAll(purchasesTag), containsInAnyOrder("Shoes", "Book")); + + CoGbkResult result2 = results.get(2); + assertEquals("Sally James", result2.getOnly(namesTag)); + assertEquals("53 S. 3rd", result2.getOnly(addressesTag)); + assertThat(result2.getAll(purchasesTag), containsInAnyOrder("Suit", "Boat")); + + CoGbkResult result3 = results.get(3); + assertEquals("29 School Rd", result3.getOnly(addressesTag), "29 School Rd"); + assertThat(result3.getAll(purchasesTag), containsInAnyOrder("Car", "House")); + + CoGbkResult result8 = results.get(8); + assertEquals("Jeffery Spalding", result8.getOnly(namesTag)); + assertEquals("6 Watling Rd", result8.getOnly(addressesTag)); + assertThat(result8.getAll(purchasesTag), containsInAnyOrder("House", "Suit Case")); + + CoGbkResult result20 = results.get(20); + assertEquals("Joan Lichtfield", result20.getOnly(namesTag)); + assertEquals("3 W. Arizona", result20.getOnly(addressesTag)); + + assertEquals("383 Jackson Street", results.get(10).getOnly(addressesTag)); + + assertThat(results.get(4).getAll(purchasesTag), containsInAnyOrder("Suit")); + assertThat(results.get(10).getAll(purchasesTag), containsInAnyOrder("Pens")); + assertThat(results.get(11).getAll(purchasesTag), containsInAnyOrder("House")); + assertThat(results.get(14).getAll(purchasesTag), containsInAnyOrder("Shoes")); + + return null; + } + }); + + p.run(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTest.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTest.java b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTest.java new file mode 100644 index 0000000..3e21a89 --- /dev/null +++ b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTest.java @@ -0,0 +1,158 @@ + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation.translator; + +import org.apache.beam.runners.jstorm.StormPipelineOptions; + +import org.apache.beam.runners.jstorm.StormRunner; +import org.apache.beam.runners.jstorm.TestJStormRunner; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.BigEndianIntegerCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeMatcher; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.Arrays; +import java.util.List; + +import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; + +/** + * Tests for {@link GroupByKey} with {@link StormRunner}. + */ +@RunWith(JUnit4.class) +public class GroupByKeyTest { + + static final String[] WORDS_ARRAY = new String[] { + "hi", "there", "hi", "hi", "sue", "bob", + "hi", "sue", "", "", "ZOW", "bob", "" }; + + static final List<String> WORDS = Arrays.asList(WORDS_ARRAY); + + @Test + public void testGroupByKey() { + StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class); + options.setRunner(TestJStormRunner.class); + options.setLocalMode(true); + + Pipeline p = Pipeline.create(options); + + List<KV<String, Integer>> ungroupedPairs = Arrays.asList( + KV.of("k1", 3), + KV.of("k5", Integer.MAX_VALUE), + KV.of("k5", Integer.MIN_VALUE), + KV.of("k2", 66), + KV.of("k1", 4), + KV.of("k2", -33), + KV.of("k3", 0)); + + PCollection<KV<String, Integer>> input = + p.apply(Create.of(ungroupedPairs) + .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))); + + PCollection<KV<String, Iterable<Integer>>> output = + input.apply(GroupByKey.<String, Integer>create()); + + PAssert.that(output) + .satisfies(new AssertThatHasExpectedContentsForTestGroupByKey()); + + p.run(); + } + + @Test + public void testCountGloballyBasic() { + StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class); + options.setRunner(TestJStormRunner.class); + options.setLocalMode(true); + + Pipeline p = Pipeline.create(options); + PCollection<String> input = p.apply(Create.of(WORDS)); + + PCollection<Long> output = + input.apply(Count.<String>globally()); + + PAssert.that(output) + .containsInAnyOrder(13L); + p.run(); + } + + static class AssertThatHasExpectedContentsForTestGroupByKey + implements SerializableFunction<Iterable<KV<String, Iterable<Integer>>>, + Void> { + @Override + public Void apply(Iterable<KV<String, Iterable<Integer>>> actual) { + assertThat(actual, containsInAnyOrder( + KvMatcher.isKv(is("k1"), containsInAnyOrder(3, 4)), + KvMatcher.isKv(is("k5"), containsInAnyOrder(Integer.MAX_VALUE, + Integer.MIN_VALUE)), + KvMatcher.isKv(is("k2"), containsInAnyOrder(66, -33)), + KvMatcher.isKv(is("k3"), containsInAnyOrder(0)))); + return null; + } + } + + /** + * Matcher for KVs. + */ + public static class KvMatcher<K, V> + extends TypeSafeMatcher<KV<? extends K, ? extends V>> { + final Matcher<? super K> keyMatcher; + final Matcher<? super V> valueMatcher; + + public static <K, V> KvMatcher<K, V> isKv(Matcher<K> keyMatcher, + Matcher<V> valueMatcher) { + return new KvMatcher<>(keyMatcher, valueMatcher); + } + + public KvMatcher(Matcher<? super K> keyMatcher, + Matcher<? super V> valueMatcher) { + this.keyMatcher = keyMatcher; + this.valueMatcher = valueMatcher; + } + + @Override + public boolean matchesSafely(KV<? extends K, ? extends V> kv) { + return keyMatcher.matches(kv.getKey()) + && valueMatcher.matches(kv.getValue()); + } + + @Override + public void describeTo(Description description) { + description + .appendText("a KV(").appendValue(keyMatcher) + .appendText(", ").appendValue(valueMatcher) + .appendText(")"); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/ParDoTest.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/ParDoTest.java b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/ParDoTest.java new file mode 100644 index 0000000..f2d1896 --- /dev/null +++ b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/ParDoTest.java @@ -0,0 +1,627 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.jstorm.translation.translator; + +import org.apache.beam.runners.jstorm.StormPipelineOptions; + +import org.apache.beam.runners.jstorm.StormRunner; +import org.apache.beam.runners.jstorm.TestJStormRunner; +import com.google.common.base.MoreObjects; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.*; +import org.apache.beam.sdk.io.GenerateSequence; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.state.*; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.UsesMapState; +import org.apache.beam.sdk.testing.UsesStatefulParDo; +import org.apache.beam.sdk.testing.ValidatesRunner; +import org.apache.beam.sdk.transforms.*; +import org.apache.beam.sdk.transforms.windowing.*; +import org.apache.beam.sdk.values.*; +import org.joda.time.Duration; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; +import java.util.*; + +import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; +import static org.hamcrest.collection.IsIterableContainingInOrder.contains; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +/** + * Tests for {@link ParDo} with {@link StormRunner}. + */ +@RunWith(JUnit4.class) +public class ParDoTest implements Serializable { + + @Test + public void testParDo() throws IOException { + StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class); + options.setRunner(TestJStormRunner.class); + Pipeline pipeline = Pipeline.create(options); + + List<Integer> inputs = Arrays.asList(3, -42, 666); + + PCollection<String> output = pipeline + .apply(Create.of(inputs)) + .apply(ParDo.of(new TestDoFn())); + + PAssert.that(output) + .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs)); + + pipeline.run(); + } + + @Test + public void testParDoWithSideInputs() throws IOException { + StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class); + options.setRunner(TestJStormRunner.class); + Pipeline pipeline = Pipeline.create(options); + + List<Integer> inputs = Arrays.asList(3, -42, 666); + + PCollectionView<Integer> sideInput1 = pipeline + .apply("CreateSideInput1", Create.of(11)) + .apply("ViewSideInput1", View.<Integer>asSingleton()); + PCollectionView<Integer> sideInputUnread = pipeline + .apply("CreateSideInputUnread", Create.of(-3333)) + .apply("ViewSideInputUnread", View.<Integer>asSingleton()); + + PCollectionView<Integer> sideInput2 = pipeline + .apply("CreateSideInput2", Create.of(222)) + .apply("ViewSideInput2", View.<Integer>asSingleton()); + PCollection<String> output = pipeline + .apply(Create.of(inputs)) + .apply(ParDo.of(new TestDoFn( + Arrays.asList(sideInput1, sideInput2), + Arrays.<TupleTag<String>>asList())) + .withSideInputs(sideInput1, sideInputUnread, sideInput2)); + + PAssert.that(output) + .satisfies(ParDoTest.HasExpectedOutput + .forInput(inputs) + .andSideInputs(11, 222)); + + pipeline.run(); + } + + @Test + public void testParDoWithTaggedOutput() { + List<Integer> inputs = Arrays.asList(3, -42, 666); + + TupleTag<String> mainOutputTag = new TupleTag<String>("main"){}; + TupleTag<String> additionalOutputTag1 = new TupleTag<String>("additional1"){}; + TupleTag<String> additionalOutputTag2 = new TupleTag<String>("additional2"){}; + TupleTag<String> additionalOutputTag3 = new TupleTag<String>("additional3"){}; + TupleTag<String> additionalOutputTagUnwritten = new TupleTag<String>("unwrittenOutput"){}; + + StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class); + options.setRunner(TestJStormRunner.class); + Pipeline pipeline = Pipeline.create(options); + + PCollectionTuple outputs = pipeline + .apply(Create.of(inputs)) + .apply(ParDo + .of(new TestDoFn( + Arrays.<PCollectionView<Integer>>asList(), + Arrays.asList(additionalOutputTag1, additionalOutputTag2, additionalOutputTag3))) + .withOutputTags( + mainOutputTag, + TupleTagList.of(additionalOutputTag3) + .and(additionalOutputTag1) + .and(additionalOutputTagUnwritten) + .and(additionalOutputTag2))); + + PAssert.that(outputs.get(mainOutputTag)) + .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs)); + + PAssert.that(outputs.get(additionalOutputTag1)) + .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs) + .fromOutput(additionalOutputTag1)); + PAssert.that(outputs.get(additionalOutputTag2)) + .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs) + .fromOutput(additionalOutputTag2)); + PAssert.that(outputs.get(additionalOutputTag3)) + .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs) + .fromOutput(additionalOutputTag3)); + PAssert.that(outputs.get(additionalOutputTagUnwritten)).empty(); + + pipeline.run(); + } + + @Test + public void testNoWindowFnDoesNotReassignWindows() { + StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class); + options.setRunner(TestJStormRunner.class); + Pipeline pipeline = Pipeline.create(options); + + final PCollection<Long> initialWindows = + pipeline + .apply(GenerateSequence.from(0).to(10)) + .apply("AssignWindows", Window.into(new WindowOddEvenBuckets())); + + // Sanity check the window assignment to demonstrate the baseline + PAssert.that(initialWindows) + .inWindow(WindowOddEvenBuckets.EVEN_WINDOW) + .containsInAnyOrder(0L, 2L, 4L, 6L, 8L); + PAssert.that(initialWindows) + .inWindow(WindowOddEvenBuckets.ODD_WINDOW) + .containsInAnyOrder(1L, 3L, 5L, 7L, 9L); + + PCollection<Boolean> upOne = + initialWindows.apply( + "ModifyTypes", + MapElements.<Long, Boolean>via( + new SimpleFunction<Long, Boolean>() { + @Override + public Boolean apply(Long input) { + return input % 2 == 0; + } + })); + PAssert.that(upOne) + .inWindow(WindowOddEvenBuckets.EVEN_WINDOW) + .containsInAnyOrder(true, true, true, true, true); + PAssert.that(upOne) + .inWindow(WindowOddEvenBuckets.ODD_WINDOW) + .containsInAnyOrder(false, false, false, false, false); + + // The elements should be in the same windows, even though they would not be assigned to the + // same windows with the updated timestamps. If we try to apply the original WindowFn, the type + // will not be appropriate and the runner should crash, as a Boolean cannot be converted into + // a long. + PCollection<Boolean> updatedTrigger = + upOne.apply( + "UpdateWindowingStrategy", + Window.<Boolean>configure().triggering(Never.ever()) + .withAllowedLateness(Duration.ZERO) + .accumulatingFiredPanes()); + pipeline.run(); + } + + @Test + public void testValueStateSameId() { + StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class); + options.setRunner(TestJStormRunner.class); + Pipeline pipeline = Pipeline.create(options); + + final String stateId = "foo"; + + DoFn<KV<String, Integer>, KV<String, Integer>> fn = + new DoFn<KV<String, Integer>, KV<String, Integer>>() { + + @StateId(stateId) + private final StateSpec<ValueState<Integer>> intState = + StateSpecs.value(VarIntCoder.of()); + + @ProcessElement + public void processElement( + ProcessContext c, @StateId(stateId) ValueState<Integer> state) { + Integer currentValue = MoreObjects.firstNonNull(state.read(), 0); + c.output(KV.of("sizzle", currentValue)); + state.write(currentValue + 1); + } + }; + + DoFn<KV<String, Integer>, Integer> fn2 = + new DoFn<KV<String, Integer>, Integer>() { + + @StateId(stateId) + private final StateSpec<ValueState<Integer>> intState = + StateSpecs.value(VarIntCoder.of()); + + @ProcessElement + public void processElement( + ProcessContext c, @StateId(stateId) ValueState<Integer> state) { + Integer currentValue = MoreObjects.firstNonNull(state.read(), 13); + c.output(currentValue); + state.write(currentValue + 13); + } + }; + + PCollection<KV<String, Integer>> intermediate = + pipeline.apply(Create.of(KV.of("hello", 42), KV.of("hello", 97), KV.of("hello", 84))) + .apply("First stateful ParDo", ParDo.of(fn)); + + PCollection<Integer> output = + intermediate.apply("Second stateful ParDo", ParDo.of(fn2)); + + PAssert.that(intermediate) + .containsInAnyOrder(KV.of("sizzle", 0), KV.of("sizzle", 1), KV.of("sizzle", 2)); + PAssert.that(output).containsInAnyOrder(13, 26, 39); + pipeline.run(); + } + + @Test + @Category({ValidatesRunner.class, UsesStatefulParDo.class}) + public void testValueStateTaggedOutput() { + StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class); + options.setRunner(TestJStormRunner.class); + Pipeline pipeline = Pipeline.create(options); + + final String stateId = "foo"; + + final TupleTag<Integer> evenTag = new TupleTag<Integer>() {}; + final TupleTag<Integer> oddTag = new TupleTag<Integer>() {}; + + DoFn<KV<String, Integer>, Integer> fn = + new DoFn<KV<String, Integer>, Integer>() { + + @StateId(stateId) + private final StateSpec<ValueState<Integer>> intState = + StateSpecs.value(VarIntCoder.of()); + + @ProcessElement + public void processElement( + ProcessContext c, @StateId(stateId) ValueState<Integer> state) { + Integer currentValue = MoreObjects.firstNonNull(state.read(), 0); + if (currentValue % 2 == 0) { + c.output(currentValue); + } else { + c.output(oddTag, currentValue); + } + state.write(currentValue + 1); + } + }; + + PCollectionTuple output = + pipeline.apply( + Create.of( + KV.of("hello", 42), + KV.of("hello", 97), + KV.of("hello", 84), + KV.of("goodbye", 33), + KV.of("hello", 859), + KV.of("goodbye", 83945))) + .apply(ParDo.of(fn).withOutputTags(evenTag, TupleTagList.of(oddTag))); + + PCollection<Integer> evens = output.get(evenTag); + PCollection<Integer> odds = output.get(oddTag); + + // There are 0 and 2 from "hello" and just 0 from "goodbye" + PAssert.that(evens).containsInAnyOrder(0, 2, 0); + + // There are 1 and 3 from "hello" and just "1" from "goodbye" + PAssert.that(odds).containsInAnyOrder(1, 3, 1); + pipeline.run(); + } + + @Test + @Category({ValidatesRunner.class, UsesStatefulParDo.class, UsesMapState.class}) + public void testMapStateCoderInference() { + StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class); + options.setRunner(TestJStormRunner.class); + Pipeline pipeline = Pipeline.create(options); + + final String stateId = "foo"; + final String countStateId = "count"; + Coder<MyInteger> myIntegerCoder = MyIntegerCoder.of(); + pipeline.getCoderRegistry().registerCoderForClass(MyInteger.class, myIntegerCoder); + + DoFn<KV<String, KV<String, Integer>>, KV<String, MyInteger>> fn = + new DoFn<KV<String, KV<String, Integer>>, KV<String, MyInteger>>() { + + @StateId(stateId) + private final StateSpec<MapState<String, MyInteger>> mapState = StateSpecs.map(); + + @StateId(countStateId) + private final StateSpec<CombiningState<Integer, int[], Integer>> + countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(), + Sum.ofIntegers()); + + @ProcessElement + public void processElement( + ProcessContext c, @StateId(stateId) MapState<String, MyInteger> state, + @StateId(countStateId) CombiningState<Integer, int[], Integer> + count) { + KV<String, Integer> value = c.element().getValue(); + state.put(value.getKey(), new MyInteger(value.getValue())); + count.add(1); + if (count.read() >= 4) { + Iterable<Map.Entry<String, MyInteger>> iterate = state.entries().read(); + for (Map.Entry<String, MyInteger> entry : iterate) { + c.output(KV.of(entry.getKey(), entry.getValue())); + } + } + } + }; + + PCollection<KV<String, MyInteger>> output = + pipeline.apply( + Create.of( + KV.of("hello", KV.of("a", 97)), KV.of("hello", KV.of("b", 42)), + KV.of("hello", KV.of("b", 42)), KV.of("hello", KV.of("c", 12)))) + .apply(ParDo.of(fn)).setCoder(KvCoder.of(StringUtf8Coder.of(), myIntegerCoder)); + + PAssert.that(output).containsInAnyOrder(KV.of("a", new MyInteger(97)), + KV.of("b", new MyInteger(42)), KV.of("c", new MyInteger(12))); + pipeline.run(); + } + + + private static class WindowOddEvenBuckets extends NonMergingWindowFn<Long, IntervalWindow> { + private static final IntervalWindow EVEN_WINDOW = + new IntervalWindow( + BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE.maxTimestamp()); + private static final IntervalWindow ODD_WINDOW = + new IntervalWindow( + BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE.maxTimestamp().minus(1)); + + @Override + public Collection<IntervalWindow> assignWindows(AssignContext c) throws Exception { + if (c.element() % 2 == 0) { + return Collections.singleton(EVEN_WINDOW); + } + return Collections.singleton(ODD_WINDOW); + } + + @Override + public boolean isCompatible(WindowFn<?, ?> other) { + return other instanceof WindowOddEvenBuckets; + } + + @Override + public Coder<IntervalWindow> windowCoder() { + return new IntervalWindow.IntervalWindowCoder(); + } + + @Override + public WindowMappingFn<IntervalWindow> getDefaultWindowMappingFn() { + throw new UnsupportedOperationException( + String.format("Can't use %s for side inputs", getClass().getSimpleName())); + } + } + + + static class TestDoFn extends DoFn<Integer, String> { + enum State {NOT_SET_UP, UNSTARTED, STARTED, PROCESSING, FINISHED} + + State state = State.NOT_SET_UP; + + final List<PCollectionView<Integer>> sideInputViews = new ArrayList<>(); + final List<TupleTag<String>> additionalOutputTupleTags = new ArrayList<>(); + + public TestDoFn() { + } + + public TestDoFn(List<PCollectionView<Integer>> sideInputViews, + List<TupleTag<String>> additionalOutputTupleTags) { + this.sideInputViews.addAll(sideInputViews); + this.additionalOutputTupleTags.addAll(additionalOutputTupleTags); + } + + @Setup + public void prepare() { + assertEquals(State.NOT_SET_UP, state); + state = State.UNSTARTED; + } + + @StartBundle + public void startBundle() { + assertThat(state, + anyOf(equalTo(State.UNSTARTED), equalTo(State.FINISHED))); + + state = State.STARTED; + } + + @ProcessElement + public void processElement(ProcessContext c) { + System.out.println("Recv elem: " + c.element()); + assertThat(state, + anyOf(equalTo(State.STARTED), equalTo(State.PROCESSING))); + state = State.PROCESSING; + outputToAllWithSideInputs(c, "processing: " + c.element()); + } + + @FinishBundle + public void finishBundle(FinishBundleContext c) { + assertThat(state, + anyOf(equalTo(State.STARTED), equalTo(State.PROCESSING))); + state = State.FINISHED; + c.output("finished", BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE); + for (TupleTag<String> additionalOutputTupleTag : additionalOutputTupleTags) { + c.output( + additionalOutputTupleTag, + additionalOutputTupleTag.getId() + ": " + "finished", + BoundedWindow.TIMESTAMP_MIN_VALUE, + GlobalWindow.INSTANCE); + } + } + + private void outputToAllWithSideInputs(ProcessContext c, String value) { + if (!sideInputViews.isEmpty()) { + List<Integer> sideInputValues = new ArrayList<>(); + for (PCollectionView<Integer> sideInputView : sideInputViews) { + sideInputValues.add(c.sideInput(sideInputView)); + } + value += ": " + sideInputValues; + } + c.output(value); + for (TupleTag<String> additionalOutputTupleTag : additionalOutputTupleTags) { + c.output(additionalOutputTupleTag, + additionalOutputTupleTag.getId() + ": " + value); + } + } + } + + private static class MyInteger implements Comparable<MyInteger> { + private final int value; + + MyInteger(int value) { + this.value = value; + } + + public int getValue() { + return value; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (!(o instanceof MyInteger)) { + return false; + } + + MyInteger myInteger = (MyInteger) o; + + return value == myInteger.value; + + } + + @Override + public int hashCode() { + return value; + } + + @Override + public int compareTo(MyInteger o) { + return Integer.compare(this.getValue(), o.getValue()); + } + + @Override + public String toString() { + return "MyInteger{" + "value=" + value + '}'; + } + } + + private static class MyIntegerCoder extends AtomicCoder<MyInteger> { + private static final MyIntegerCoder INSTANCE = new MyIntegerCoder(); + + private final VarIntCoder delegate = VarIntCoder.of(); + + public static MyIntegerCoder of() { + return INSTANCE; + } + + @Override + public void encode(MyInteger value, OutputStream outStream) + throws CoderException, IOException { + delegate.encode(value.getValue(), outStream); + } + + @Override + public MyInteger decode(InputStream inStream) throws CoderException, + IOException { + return new MyInteger(delegate.decode(inStream)); + } + } + + /** PAssert "matcher" for expected output. */ + static class HasExpectedOutput + implements SerializableFunction<Iterable<String>, Void>, Serializable { + private final List<Integer> inputs; + private final List<Integer> sideInputs; + private final String additionalOutput; + private final boolean ordered; + + public static HasExpectedOutput forInput(List<Integer> inputs) { + return new HasExpectedOutput( + new ArrayList<Integer>(inputs), + new ArrayList<Integer>(), + "", + false); + } + + private HasExpectedOutput(List<Integer> inputs, + List<Integer> sideInputs, + String additionalOutput, + boolean ordered) { + this.inputs = inputs; + this.sideInputs = sideInputs; + this.additionalOutput = additionalOutput; + this.ordered = ordered; + } + + public HasExpectedOutput andSideInputs(Integer... sideInputValues) { + List<Integer> sideInputs = new ArrayList<>(); + for (Integer sideInputValue : sideInputValues) { + sideInputs.add(sideInputValue); + } + return new HasExpectedOutput(inputs, sideInputs, additionalOutput, ordered); + } + + public HasExpectedOutput fromOutput(TupleTag<String> outputTag) { + return fromOutput(outputTag.getId()); + } + public HasExpectedOutput fromOutput(String outputId) { + return new HasExpectedOutput(inputs, sideInputs, outputId, ordered); + } + + public HasExpectedOutput inOrder() { + return new HasExpectedOutput(inputs, sideInputs, additionalOutput, true); + } + + @Override + public Void apply(Iterable<String> outputs) { + List<String> processeds = new ArrayList<>(); + List<String> finisheds = new ArrayList<>(); + for (String output : outputs) { + if (output.contains("finished")) { + finisheds.add(output); + } else { + processeds.add(output); + } + } + + String sideInputsSuffix; + if (sideInputs.isEmpty()) { + sideInputsSuffix = ""; + } else { + sideInputsSuffix = ": " + sideInputs; + } + + String additionalOutputPrefix; + if (additionalOutput.isEmpty()) { + additionalOutputPrefix = ""; + } else { + additionalOutputPrefix = additionalOutput + ": "; + } + + List<String> expectedProcesseds = new ArrayList<>(); + for (Integer input : inputs) { + expectedProcesseds.add( + additionalOutputPrefix + "processing: " + input + sideInputsSuffix); + } + String[] expectedProcessedsArray = + expectedProcesseds.toArray(new String[expectedProcesseds.size()]); + if (!ordered || expectedProcesseds.isEmpty()) { + assertThat(processeds, containsInAnyOrder(expectedProcessedsArray)); + } else { + assertThat(processeds, contains(expectedProcessedsArray)); + } + + for (String finished : finisheds) { + assertEquals(additionalOutputPrefix + "finished", finished); + } + + return null; + } + } +}
