Repository: beam
Updated Branches:
  refs/heads/jstorm-runner 0a05de365 -> e00e0e841


http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/CoGroupByKeyTest.java
----------------------------------------------------------------------
diff --git 
a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/CoGroupByKeyTest.java
 
b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/CoGroupByKeyTest.java
new file mode 100644
index 0000000..548fb20
--- /dev/null
+++ 
b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/CoGroupByKeyTest.java
@@ -0,0 +1,302 @@
+package org.apache.beam.runners.jstorm.translation.translator;
+
+import org.apache.beam.runners.jstorm.StormPipelineOptions;
+import org.apache.beam.runners.jstorm.TestJStormRunner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.ValidatesRunner;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.join.CoGbkResult;
+import org.apache.beam.sdk.transforms.join.CoGroupByKey;
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Duration;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+@RunWith(JUnit4.class)
+public class CoGroupByKeyTest implements Serializable {
+    /**
+     * Converts the given list into a PCollection belonging to the provided
+     * Pipeline in such a way that coder inference needs to be performed.
+     */
+    private PCollection<KV<Integer, String>> createInput(String name,
+                                                         Pipeline p, 
List<KV<Integer, String>> list) {
+        return createInput(name, p, list,  new ArrayList<Long>());
+    }
+
+    /**
+     * Converts the given list with timestamps into a PCollection.
+     */
+    private PCollection<KV<Integer, String>> createInput(String name,
+                                                         Pipeline p, 
List<KV<Integer, String>> list, List<Long> timestamps) {
+        PCollection<KV<Integer, String>> input;
+        if (timestamps.isEmpty()) {
+            input = p.apply("Create" + name, Create.of(list)
+                    .withCoder(KvCoder.of(BigEndianIntegerCoder.of(), 
StringUtf8Coder.of())));
+        } else {
+            input = p.apply("Create" + name, Create.timestamped(list, 
timestamps)
+                    .withCoder(KvCoder.of(BigEndianIntegerCoder.of(), 
StringUtf8Coder.of())));
+        }
+        return input.apply(
+                "Identity" + name,
+                ParDo.of(
+                        new DoFn<KV<Integer, String>, KV<Integer, String>>() {
+                            @ProcessElement
+                            public void processElement(ProcessContext c) {
+                                c.output(c.element());
+                            }
+                        }));
+    }
+
+    /**
+     * Returns a {@code PCollection<KV<Integer, CoGbkResult>>} containing the 
result
+     * of a {@link CoGroupByKey} over 2 {@code PCollection<KV<Integer, 
String>>},
+     * where each {@link PCollection} has no duplicate keys and the key sets of
+     * each {@link PCollection} are intersecting but neither is a subset of 
the other.
+     */
+    private PCollection<KV<Integer, CoGbkResult>> buildGetOnlyGbk(
+            Pipeline p,
+            TupleTag<String> tag1,
+            TupleTag<String> tag2) {
+        List<KV<Integer, String>> list1 =
+                Arrays.asList(
+                        KV.of(1, "collection1-1"),
+                        KV.of(2, "collection1-2"));
+        List<KV<Integer, String>> list2 =
+                Arrays.asList(
+                        KV.of(2, "collection2-2"),
+                        KV.of(3, "collection2-3"));
+        PCollection<KV<Integer, String>> collection1 = 
createInput("CreateList1", p, list1);
+        PCollection<KV<Integer, String>> collection2 = 
createInput("CreateList2", p, list2);
+        PCollection<KV<Integer, CoGbkResult>> coGbkResults =
+                KeyedPCollectionTuple.of(tag1, collection1)
+                        .and(tag2, collection2)
+                        .apply(CoGroupByKey.<Integer>create());
+        return coGbkResults;
+    }
+
+    @Test
+    @Category(ValidatesRunner.class)
+    public void testCoGroupByKeyGetOnly() {
+        StormPipelineOptions options = 
PipelineOptionsFactory.as(StormPipelineOptions.class);
+        options.setRunner(TestJStormRunner.class);
+        options.setLocalMode(true);
+
+        Pipeline p = Pipeline.create(options);
+
+        final TupleTag<String> tag1 = new TupleTag<>();
+        final TupleTag<String> tag2 = new TupleTag<>();
+
+        PCollection<KV<Integer, CoGbkResult>> coGbkResults =
+                buildGetOnlyGbk(p, tag1, tag2);
+
+        PAssert.thatMap(coGbkResults).satisfies(
+                new SerializableFunction<Map<Integer, CoGbkResult>, Void>() {
+                    @Override
+                    public Void apply(Map<Integer, CoGbkResult> results) {
+                        assertEquals("collection1-1", 
results.get(1).getOnly(tag1));
+                        assertEquals("collection1-2", 
results.get(2).getOnly(tag1));
+                        assertEquals("collection2-2", 
results.get(2).getOnly(tag2));
+                        assertEquals("collection2-3", 
results.get(3).getOnly(tag2));
+                        return null;
+                    }
+                });
+
+        p.run();
+    }
+
+    /**
+     * Returns a {@code PCollection<KV<Integer, CoGbkResult>>} containing the
+     * results of the {@code CoGroupByKey} over three
+     * {@code PCollection<KV<Integer, String>>}, each of which correlates
+     * a customer id to purchases, addresses, or names, respectively.
+     */
+    private PCollection<KV<Integer, CoGbkResult>> buildPurchasesCoGbk(
+            Pipeline p,
+            TupleTag<String> purchasesTag,
+            TupleTag<String> addressesTag,
+            TupleTag<String> namesTag) {
+        List<KV<Integer, String>> idToPurchases =
+                Arrays.asList(
+                        KV.of(2, "Boat"),
+                        KV.of(1, "Shoes"),
+                        KV.of(3, "Car"),
+                        KV.of(1, "Book"),
+                        KV.of(10, "Pens"),
+                        KV.of(8, "House"),
+                        KV.of(4, "Suit"),
+                        KV.of(11, "House"),
+                        KV.of(14, "Shoes"),
+                        KV.of(2, "Suit"),
+                        KV.of(8, "Suit Case"),
+                        KV.of(3, "House"));
+
+        List<KV<Integer, String>> idToAddress =
+                Arrays.asList(
+                        KV.of(2, "53 S. 3rd"),
+                        KV.of(10, "383 Jackson Street"),
+                        KV.of(20, "3 W. Arizona"),
+                        KV.of(3, "29 School Rd"),
+                        KV.of(8, "6 Watling Rd"));
+
+        List<KV<Integer, String>> idToName =
+                Arrays.asList(
+                        KV.of(1, "John Smith"),
+                        KV.of(2, "Sally James"),
+                        KV.of(8, "Jeffery Spalding"),
+                        KV.of(20, "Joan Lichtfield"));
+
+        PCollection<KV<Integer, String>> purchasesTable =
+                createInput("CreateIdToPurchases", p, idToPurchases);
+
+        PCollection<KV<Integer, String>> addressTable =
+                createInput("CreateIdToAddress", p, idToAddress);
+
+        PCollection<KV<Integer, String>> nameTable =
+                createInput("CreateIdToName", p, idToName);
+
+        PCollection<KV<Integer, CoGbkResult>> coGbkResults =
+                KeyedPCollectionTuple.of(namesTag, nameTable)
+                        .and(addressesTag, addressTable)
+                        .and(purchasesTag, purchasesTable)
+                        .apply(CoGroupByKey.<Integer>create());
+        return coGbkResults;
+    }
+
+    /**
+     * Returns a {@code PCollection<KV<Integer, CoGbkResult>>} containing the
+     * results of the {@code CoGroupByKey} over 2 {@code 
PCollection<KV<Integer, String>>},
+     * each of which correlates a customer id to clicks, purchases, 
respectively.
+     */
+    private PCollection<KV<Integer, CoGbkResult>> 
buildPurchasesCoGbkWithWindowing(
+            Pipeline p,
+            TupleTag<String> clicksTag,
+            TupleTag<String> purchasesTag) {
+        List<KV<Integer, String>> idToClick =
+                Arrays.asList(
+                        KV.of(1, "Click t0"),
+                        KV.of(2, "Click t2"),
+                        KV.of(1, "Click t4"),
+                        KV.of(1, "Click t6"),
+                        KV.of(2, "Click t8"));
+
+        List<KV<Integer, String>> idToPurchases =
+                Arrays.asList(
+                        KV.of(1, "Boat t1"),
+                        KV.of(1, "Shoesi t2"),
+                        KV.of(1, "Pens t3"),
+                        KV.of(2, "House t4"),
+                        KV.of(2, "Suit t5"),
+                        KV.of(1, "Car t6"),
+                        KV.of(1, "Book t7"),
+                        KV.of(2, "House t8"),
+                        KV.of(2, "Shoes t9"),
+                        KV.of(2, "House t10"));
+
+        PCollection<KV<Integer, String>> clicksTable =
+                createInput("CreateClicks",
+                        p,
+                        idToClick,
+                        Arrays.asList(0L, 2L, 4L, 6L, 8L))
+                        .apply("WindowClicks", Window.<KV<Integer, 
String>>into(
+                                FixedWindows.of(new Duration(4)))
+                                
.withTimestampCombiner(TimestampCombiner.EARLIEST));
+
+        PCollection<KV<Integer, String>> purchasesTable =
+                createInput("CreatePurchases",
+                        p,
+                        idToPurchases,
+                        Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L))
+                        .apply("WindowPurchases", Window.<KV<Integer, 
String>>into(
+                                FixedWindows.of(new Duration(4)))
+                                
.withTimestampCombiner(TimestampCombiner.EARLIEST));
+
+        PCollection<KV<Integer, CoGbkResult>> coGbkResults =
+                KeyedPCollectionTuple.of(clicksTag, clicksTable)
+                        .and(purchasesTag, purchasesTable)
+                        .apply(CoGroupByKey.<Integer>create());
+        return coGbkResults;
+    }
+
+    @Test
+    @Category(ValidatesRunner.class)
+    public void testCoGroupByKey() {
+        StormPipelineOptions options = 
PipelineOptionsFactory.as(StormPipelineOptions.class);
+        options.setRunner(TestJStormRunner.class);
+        options.setLocalMode(true);
+
+        Pipeline p = Pipeline.create(options);
+
+        final TupleTag<String> namesTag = new TupleTag<>();
+        final TupleTag<String> addressesTag = new TupleTag<>();
+        final TupleTag<String> purchasesTag = new TupleTag<>();
+
+
+        PCollection<KV<Integer, CoGbkResult>> coGbkResults =
+                buildPurchasesCoGbk(p, purchasesTag, addressesTag, namesTag);
+
+        PAssert.thatMap(coGbkResults).satisfies(
+                new SerializableFunction<Map<Integer, CoGbkResult>, Void>() {
+                    @Override
+                    public Void apply(Map<Integer, CoGbkResult> results) {
+                        CoGbkResult result1 = results.get(1);
+                        assertEquals("John Smith", result1.getOnly(namesTag));
+                        assertThat(result1.getAll(purchasesTag), 
containsInAnyOrder("Shoes", "Book"));
+
+                        CoGbkResult result2 = results.get(2);
+                        assertEquals("Sally James", result2.getOnly(namesTag));
+                        assertEquals("53 S. 3rd", 
result2.getOnly(addressesTag));
+                        assertThat(result2.getAll(purchasesTag), 
containsInAnyOrder("Suit", "Boat"));
+
+                        CoGbkResult result3 = results.get(3);
+                        assertEquals("29 School Rd", 
result3.getOnly(addressesTag), "29 School Rd");
+                        assertThat(result3.getAll(purchasesTag), 
containsInAnyOrder("Car", "House"));
+
+                        CoGbkResult result8 = results.get(8);
+                        assertEquals("Jeffery Spalding", 
result8.getOnly(namesTag));
+                        assertEquals("6 Watling Rd", 
result8.getOnly(addressesTag));
+                        assertThat(result8.getAll(purchasesTag), 
containsInAnyOrder("House", "Suit Case"));
+
+                        CoGbkResult result20 = results.get(20);
+                        assertEquals("Joan Lichtfield", 
result20.getOnly(namesTag));
+                        assertEquals("3 W. Arizona", 
result20.getOnly(addressesTag));
+
+                        assertEquals("383 Jackson Street", 
results.get(10).getOnly(addressesTag));
+
+                        assertThat(results.get(4).getAll(purchasesTag), 
containsInAnyOrder("Suit"));
+                        assertThat(results.get(10).getAll(purchasesTag), 
containsInAnyOrder("Pens"));
+                        assertThat(results.get(11).getAll(purchasesTag), 
containsInAnyOrder("House"));
+                        assertThat(results.get(14).getAll(purchasesTag), 
containsInAnyOrder("Shoes"));
+
+                        return null;
+                    }
+                });
+
+        p.run();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTest.java
----------------------------------------------------------------------
diff --git 
a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTest.java
 
b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTest.java
new file mode 100644
index 0000000..3e21a89
--- /dev/null
+++ 
b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTest.java
@@ -0,0 +1,158 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.translator;
+
+import org.apache.beam.runners.jstorm.StormPipelineOptions;
+
+import org.apache.beam.runners.jstorm.StormRunner;
+import org.apache.beam.runners.jstorm.TestJStormRunner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static 
org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link GroupByKey} with {@link StormRunner}.
+ */
+@RunWith(JUnit4.class)
+public class GroupByKeyTest {
+
+    static final String[] WORDS_ARRAY = new String[] {
+            "hi", "there", "hi", "hi", "sue", "bob",
+            "hi", "sue", "", "", "ZOW", "bob", "" };
+
+    static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
+
+    @Test
+    public void testGroupByKey() {
+        StormPipelineOptions options = 
PipelineOptionsFactory.as(StormPipelineOptions.class);
+        options.setRunner(TestJStormRunner.class);
+        options.setLocalMode(true);
+
+        Pipeline p = Pipeline.create(options);
+
+        List<KV<String, Integer>> ungroupedPairs = Arrays.asList(
+                KV.of("k1", 3),
+                KV.of("k5", Integer.MAX_VALUE),
+                KV.of("k5", Integer.MIN_VALUE),
+                KV.of("k2", 66),
+                KV.of("k1", 4),
+                KV.of("k2", -33),
+                KV.of("k3", 0));
+
+        PCollection<KV<String, Integer>> input =
+                p.apply(Create.of(ungroupedPairs)
+                        .withCoder(KvCoder.of(StringUtf8Coder.of(), 
BigEndianIntegerCoder.of())));
+
+        PCollection<KV<String, Iterable<Integer>>> output =
+                input.apply(GroupByKey.<String, Integer>create());
+
+        PAssert.that(output)
+                .satisfies(new 
AssertThatHasExpectedContentsForTestGroupByKey());
+
+        p.run();
+    }
+
+    @Test
+    public void testCountGloballyBasic() {
+        StormPipelineOptions options = 
PipelineOptionsFactory.as(StormPipelineOptions.class);
+        options.setRunner(TestJStormRunner.class);
+        options.setLocalMode(true);
+
+        Pipeline p = Pipeline.create(options);
+        PCollection<String> input = p.apply(Create.of(WORDS));
+
+        PCollection<Long> output =
+                input.apply(Count.<String>globally());
+
+        PAssert.that(output)
+                .containsInAnyOrder(13L);
+        p.run();
+    }
+
+    static class AssertThatHasExpectedContentsForTestGroupByKey
+            implements SerializableFunction<Iterable<KV<String, 
Iterable<Integer>>>,
+            Void> {
+        @Override
+        public Void apply(Iterable<KV<String, Iterable<Integer>>> actual) {
+            assertThat(actual, containsInAnyOrder(
+                    KvMatcher.isKv(is("k1"), containsInAnyOrder(3, 4)),
+                    KvMatcher.isKv(is("k5"), 
containsInAnyOrder(Integer.MAX_VALUE,
+                            Integer.MIN_VALUE)),
+                    KvMatcher.isKv(is("k2"), containsInAnyOrder(66, -33)),
+                    KvMatcher.isKv(is("k3"), containsInAnyOrder(0))));
+            return null;
+        }
+    }
+
+    /**
+     * Matcher for KVs.
+     */
+    public static class KvMatcher<K, V>
+            extends TypeSafeMatcher<KV<? extends K, ? extends V>> {
+        final Matcher<? super K> keyMatcher;
+        final Matcher<? super V> valueMatcher;
+
+        public static <K, V> KvMatcher<K, V> isKv(Matcher<K> keyMatcher,
+                                                  Matcher<V> valueMatcher) {
+            return new KvMatcher<>(keyMatcher, valueMatcher);
+        }
+
+        public KvMatcher(Matcher<? super K> keyMatcher,
+                         Matcher<? super V> valueMatcher) {
+            this.keyMatcher = keyMatcher;
+            this.valueMatcher = valueMatcher;
+        }
+
+        @Override
+        public boolean matchesSafely(KV<? extends K, ? extends V> kv) {
+            return keyMatcher.matches(kv.getKey())
+                    && valueMatcher.matches(kv.getValue());
+        }
+
+        @Override
+        public void describeTo(Description description) {
+            description
+                    .appendText("a KV(").appendValue(keyMatcher)
+                    .appendText(", ").appendValue(valueMatcher)
+                    .appendText(")");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/ParDoTest.java
----------------------------------------------------------------------
diff --git 
a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/ParDoTest.java
 
b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/ParDoTest.java
new file mode 100644
index 0000000..f2d1896
--- /dev/null
+++ 
b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/ParDoTest.java
@@ -0,0 +1,627 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.translator;
+
+import org.apache.beam.runners.jstorm.StormPipelineOptions;
+
+import org.apache.beam.runners.jstorm.StormRunner;
+import org.apache.beam.runners.jstorm.TestJStormRunner;
+import com.google.common.base.MoreObjects;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.*;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.state.*;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.UsesMapState;
+import org.apache.beam.sdk.testing.UsesStatefulParDo;
+import org.apache.beam.sdk.testing.ValidatesRunner;
+import org.apache.beam.sdk.transforms.*;
+import org.apache.beam.sdk.transforms.windowing.*;
+import org.apache.beam.sdk.values.*;
+import org.joda.time.Duration;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.*;
+
+import static org.hamcrest.Matchers.anyOf;
+import static org.hamcrest.Matchers.equalTo;
+import static 
org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
+import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link ParDo} with {@link StormRunner}.
+ */
+@RunWith(JUnit4.class)
+public class ParDoTest implements Serializable {
+
+    @Test
+    public void testParDo() throws IOException {
+        StormPipelineOptions options = 
PipelineOptionsFactory.as(StormPipelineOptions.class);
+        options.setRunner(TestJStormRunner.class);
+        Pipeline pipeline = Pipeline.create(options);
+
+        List<Integer> inputs = Arrays.asList(3, -42, 666);
+
+        PCollection<String> output = pipeline
+                .apply(Create.of(inputs))
+                .apply(ParDo.of(new TestDoFn()));
+
+        PAssert.that(output)
+                .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs));
+
+        pipeline.run();
+    }
+
+    @Test
+    public void testParDoWithSideInputs() throws IOException {
+        StormPipelineOptions options = 
PipelineOptionsFactory.as(StormPipelineOptions.class);
+        options.setRunner(TestJStormRunner.class);
+        Pipeline pipeline = Pipeline.create(options);
+
+        List<Integer> inputs = Arrays.asList(3, -42, 666);
+
+        PCollectionView<Integer> sideInput1 = pipeline
+                .apply("CreateSideInput1", Create.of(11))
+                .apply("ViewSideInput1", View.<Integer>asSingleton());
+        PCollectionView<Integer> sideInputUnread = pipeline
+                .apply("CreateSideInputUnread", Create.of(-3333))
+                .apply("ViewSideInputUnread", View.<Integer>asSingleton());
+
+        PCollectionView<Integer> sideInput2 = pipeline
+                .apply("CreateSideInput2", Create.of(222))
+                .apply("ViewSideInput2", View.<Integer>asSingleton());
+        PCollection<String> output = pipeline
+                .apply(Create.of(inputs))
+                .apply(ParDo.of(new TestDoFn(
+                                Arrays.asList(sideInput1, sideInput2),
+                                Arrays.<TupleTag<String>>asList()))
+                        .withSideInputs(sideInput1, sideInputUnread, 
sideInput2));
+
+        PAssert.that(output)
+                .satisfies(ParDoTest.HasExpectedOutput
+                        .forInput(inputs)
+                        .andSideInputs(11, 222));
+
+        pipeline.run();
+    }
+
+    @Test
+    public void testParDoWithTaggedOutput() {
+        List<Integer> inputs = Arrays.asList(3, -42, 666);
+
+        TupleTag<String> mainOutputTag = new TupleTag<String>("main"){};
+        TupleTag<String> additionalOutputTag1 = new 
TupleTag<String>("additional1"){};
+        TupleTag<String> additionalOutputTag2 = new 
TupleTag<String>("additional2"){};
+        TupleTag<String> additionalOutputTag3 = new 
TupleTag<String>("additional3"){};
+        TupleTag<String> additionalOutputTagUnwritten = new 
TupleTag<String>("unwrittenOutput"){};
+
+        StormPipelineOptions options = 
PipelineOptionsFactory.as(StormPipelineOptions.class);
+        options.setRunner(TestJStormRunner.class);
+        Pipeline pipeline = Pipeline.create(options);
+
+        PCollectionTuple outputs = pipeline
+            .apply(Create.of(inputs))
+            .apply(ParDo
+                .of(new TestDoFn(
+                    Arrays.<PCollectionView<Integer>>asList(),
+                    Arrays.asList(additionalOutputTag1, additionalOutputTag2, 
additionalOutputTag3)))
+                .withOutputTags(
+                    mainOutputTag,
+                    TupleTagList.of(additionalOutputTag3)
+                        .and(additionalOutputTag1)
+                        .and(additionalOutputTagUnwritten)
+                        .and(additionalOutputTag2)));
+
+        PAssert.that(outputs.get(mainOutputTag))
+            .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs));
+
+        PAssert.that(outputs.get(additionalOutputTag1))
+            .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs)
+                .fromOutput(additionalOutputTag1));
+        PAssert.that(outputs.get(additionalOutputTag2))
+            .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs)
+                .fromOutput(additionalOutputTag2));
+        PAssert.that(outputs.get(additionalOutputTag3))
+            .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs)
+                .fromOutput(additionalOutputTag3));
+        PAssert.that(outputs.get(additionalOutputTagUnwritten)).empty();
+
+        pipeline.run();
+    }
+
+    @Test
+    public void testNoWindowFnDoesNotReassignWindows() {
+        StormPipelineOptions options = 
PipelineOptionsFactory.as(StormPipelineOptions.class);
+        options.setRunner(TestJStormRunner.class);
+        Pipeline pipeline = Pipeline.create(options);
+
+        final PCollection<Long> initialWindows =
+                pipeline
+                    .apply(GenerateSequence.from(0).to(10))
+                    .apply("AssignWindows", Window.into(new 
WindowOddEvenBuckets()));
+
+        // Sanity check the window assignment to demonstrate the baseline
+        PAssert.that(initialWindows)
+                .inWindow(WindowOddEvenBuckets.EVEN_WINDOW)
+                .containsInAnyOrder(0L, 2L, 4L, 6L, 8L);
+        PAssert.that(initialWindows)
+                .inWindow(WindowOddEvenBuckets.ODD_WINDOW)
+                .containsInAnyOrder(1L, 3L, 5L, 7L, 9L);
+
+        PCollection<Boolean> upOne =
+                initialWindows.apply(
+                        "ModifyTypes",
+                        MapElements.<Long, Boolean>via(
+                                new SimpleFunction<Long, Boolean>() {
+                                    @Override
+                                    public Boolean apply(Long input) {
+                                        return input % 2 == 0;
+                                    }
+                                }));
+        PAssert.that(upOne)
+                .inWindow(WindowOddEvenBuckets.EVEN_WINDOW)
+                .containsInAnyOrder(true, true, true, true, true);
+        PAssert.that(upOne)
+                .inWindow(WindowOddEvenBuckets.ODD_WINDOW)
+                .containsInAnyOrder(false, false, false, false, false);
+
+        // The elements should be in the same windows, even though they would 
not be assigned to the
+        // same windows with the updated timestamps. If we try to apply the 
original WindowFn, the type
+        // will not be appropriate and the runner should crash, as a Boolean 
cannot be converted into
+        // a long.
+        PCollection<Boolean> updatedTrigger =
+                upOne.apply(
+                        "UpdateWindowingStrategy",
+                        Window.<Boolean>configure().triggering(Never.ever())
+                                .withAllowedLateness(Duration.ZERO)
+                                .accumulatingFiredPanes());
+        pipeline.run();
+    }
+
+    @Test
+    public void testValueStateSameId() {
+        StormPipelineOptions options = 
PipelineOptionsFactory.as(StormPipelineOptions.class);
+        options.setRunner(TestJStormRunner.class);
+        Pipeline pipeline = Pipeline.create(options);
+
+        final String stateId = "foo";
+
+        DoFn<KV<String, Integer>, KV<String, Integer>> fn =
+                new DoFn<KV<String, Integer>, KV<String, Integer>>() {
+
+                    @StateId(stateId)
+                    private final StateSpec<ValueState<Integer>> intState =
+                            StateSpecs.value(VarIntCoder.of());
+
+                    @ProcessElement
+                    public void processElement(
+                            ProcessContext c, @StateId(stateId) 
ValueState<Integer> state) {
+                        Integer currentValue = 
MoreObjects.firstNonNull(state.read(), 0);
+                        c.output(KV.of("sizzle", currentValue));
+                        state.write(currentValue + 1);
+                    }
+                };
+
+        DoFn<KV<String, Integer>, Integer> fn2 =
+                new DoFn<KV<String, Integer>, Integer>() {
+
+                    @StateId(stateId)
+                    private final StateSpec<ValueState<Integer>> intState =
+                            StateSpecs.value(VarIntCoder.of());
+
+                    @ProcessElement
+                    public void processElement(
+                            ProcessContext c, @StateId(stateId) 
ValueState<Integer> state) {
+                        Integer currentValue = 
MoreObjects.firstNonNull(state.read(), 13);
+                        c.output(currentValue);
+                        state.write(currentValue + 13);
+                    }
+                };
+
+        PCollection<KV<String, Integer>> intermediate =
+                pipeline.apply(Create.of(KV.of("hello", 42), KV.of("hello", 
97), KV.of("hello", 84)))
+                        .apply("First stateful ParDo", ParDo.of(fn));
+
+        PCollection<Integer> output =
+                intermediate.apply("Second stateful ParDo", ParDo.of(fn2));
+
+        PAssert.that(intermediate)
+                .containsInAnyOrder(KV.of("sizzle", 0), KV.of("sizzle", 1), 
KV.of("sizzle", 2));
+        PAssert.that(output).containsInAnyOrder(13, 26, 39);
+        pipeline.run();
+    }
+
+    @Test
+    @Category({ValidatesRunner.class, UsesStatefulParDo.class})
+    public void testValueStateTaggedOutput() {
+        StormPipelineOptions options = 
PipelineOptionsFactory.as(StormPipelineOptions.class);
+        options.setRunner(TestJStormRunner.class);
+        Pipeline pipeline = Pipeline.create(options);
+
+        final String stateId = "foo";
+
+        final TupleTag<Integer> evenTag = new TupleTag<Integer>() {};
+        final TupleTag<Integer> oddTag = new TupleTag<Integer>() {};
+
+        DoFn<KV<String, Integer>, Integer> fn =
+                new DoFn<KV<String, Integer>, Integer>() {
+
+                    @StateId(stateId)
+                    private final StateSpec<ValueState<Integer>> intState =
+                            StateSpecs.value(VarIntCoder.of());
+
+                    @ProcessElement
+                    public void processElement(
+                            ProcessContext c, @StateId(stateId) 
ValueState<Integer> state) {
+                        Integer currentValue = 
MoreObjects.firstNonNull(state.read(), 0);
+                        if (currentValue % 2 == 0) {
+                            c.output(currentValue);
+                        } else {
+                            c.output(oddTag, currentValue);
+                        }
+                        state.write(currentValue + 1);
+                    }
+                };
+
+        PCollectionTuple output =
+                pipeline.apply(
+                        Create.of(
+                                KV.of("hello", 42),
+                                KV.of("hello", 97),
+                                KV.of("hello", 84),
+                                KV.of("goodbye", 33),
+                                KV.of("hello", 859),
+                                KV.of("goodbye", 83945)))
+                        .apply(ParDo.of(fn).withOutputTags(evenTag, 
TupleTagList.of(oddTag)));
+
+        PCollection<Integer> evens = output.get(evenTag);
+        PCollection<Integer> odds = output.get(oddTag);
+
+        // There are 0 and 2 from "hello" and just 0 from "goodbye"
+        PAssert.that(evens).containsInAnyOrder(0, 2, 0);
+
+        // There are 1 and 3 from "hello" and just "1" from "goodbye"
+        PAssert.that(odds).containsInAnyOrder(1, 3, 1);
+        pipeline.run();
+    }
+
+    @Test
+    @Category({ValidatesRunner.class, UsesStatefulParDo.class, 
UsesMapState.class})
+    public void testMapStateCoderInference() {
+        StormPipelineOptions options = 
PipelineOptionsFactory.as(StormPipelineOptions.class);
+        options.setRunner(TestJStormRunner.class);
+        Pipeline pipeline = Pipeline.create(options);
+
+        final String stateId = "foo";
+        final String countStateId = "count";
+        Coder<MyInteger> myIntegerCoder = MyIntegerCoder.of();
+        pipeline.getCoderRegistry().registerCoderForClass(MyInteger.class, 
myIntegerCoder);
+
+        DoFn<KV<String, KV<String, Integer>>, KV<String, MyInteger>> fn =
+                new DoFn<KV<String, KV<String, Integer>>, KV<String, 
MyInteger>>() {
+
+                    @StateId(stateId)
+                    private final StateSpec<MapState<String, MyInteger>> 
mapState = StateSpecs.map();
+
+                    @StateId(countStateId)
+                    private final StateSpec<CombiningState<Integer, int[], 
Integer>>
+                            countState = 
StateSpecs.combiningFromInputInternal(VarIntCoder.of(),
+                            Sum.ofIntegers());
+
+                    @ProcessElement
+                    public void processElement(
+                            ProcessContext c, @StateId(stateId) 
MapState<String, MyInteger> state,
+                            @StateId(countStateId) CombiningState<Integer, 
int[], Integer>
+                                    count) {
+                        KV<String, Integer> value = c.element().getValue();
+                        state.put(value.getKey(), new 
MyInteger(value.getValue()));
+                        count.add(1);
+                        if (count.read() >= 4) {
+                            Iterable<Map.Entry<String, MyInteger>> iterate = 
state.entries().read();
+                            for (Map.Entry<String, MyInteger> entry : iterate) 
{
+                                c.output(KV.of(entry.getKey(), 
entry.getValue()));
+                            }
+                        }
+                    }
+                };
+
+        PCollection<KV<String, MyInteger>> output =
+                pipeline.apply(
+                        Create.of(
+                                KV.of("hello", KV.of("a", 97)), KV.of("hello", 
KV.of("b", 42)),
+                                KV.of("hello", KV.of("b", 42)), KV.of("hello", 
KV.of("c", 12))))
+                        
.apply(ParDo.of(fn)).setCoder(KvCoder.of(StringUtf8Coder.of(), myIntegerCoder));
+
+        PAssert.that(output).containsInAnyOrder(KV.of("a", new MyInteger(97)),
+                KV.of("b", new MyInteger(42)), KV.of("c", new MyInteger(12)));
+        pipeline.run();
+    }
+
+
+    private static class WindowOddEvenBuckets extends NonMergingWindowFn<Long, 
IntervalWindow> {
+        private static final IntervalWindow EVEN_WINDOW =
+                new IntervalWindow(
+                        BoundedWindow.TIMESTAMP_MIN_VALUE, 
GlobalWindow.INSTANCE.maxTimestamp());
+        private static final IntervalWindow ODD_WINDOW =
+                new IntervalWindow(
+                        BoundedWindow.TIMESTAMP_MIN_VALUE, 
GlobalWindow.INSTANCE.maxTimestamp().minus(1));
+
+        @Override
+        public Collection<IntervalWindow> assignWindows(AssignContext c) 
throws Exception {
+            if (c.element() % 2 == 0) {
+                return Collections.singleton(EVEN_WINDOW);
+            }
+            return Collections.singleton(ODD_WINDOW);
+        }
+
+        @Override
+        public boolean isCompatible(WindowFn<?, ?> other) {
+            return other instanceof WindowOddEvenBuckets;
+        }
+
+        @Override
+        public Coder<IntervalWindow> windowCoder() {
+            return new IntervalWindow.IntervalWindowCoder();
+        }
+
+        @Override
+        public WindowMappingFn<IntervalWindow> getDefaultWindowMappingFn() {
+            throw new UnsupportedOperationException(
+                    String.format("Can't use %s for side inputs", 
getClass().getSimpleName()));
+        }
+    }
+
+
+    static class TestDoFn extends DoFn<Integer, String> {
+        enum State {NOT_SET_UP, UNSTARTED, STARTED, PROCESSING, FINISHED}
+
+        State state = State.NOT_SET_UP;
+
+        final List<PCollectionView<Integer>> sideInputViews = new 
ArrayList<>();
+        final List<TupleTag<String>> additionalOutputTupleTags = new 
ArrayList<>();
+
+        public TestDoFn() {
+        }
+
+        public TestDoFn(List<PCollectionView<Integer>> sideInputViews,
+                        List<TupleTag<String>> additionalOutputTupleTags) {
+            this.sideInputViews.addAll(sideInputViews);
+            this.additionalOutputTupleTags.addAll(additionalOutputTupleTags);
+        }
+
+        @Setup
+        public void prepare() {
+            assertEquals(State.NOT_SET_UP, state);
+            state = State.UNSTARTED;
+        }
+
+        @StartBundle
+        public void startBundle() {
+            assertThat(state,
+                anyOf(equalTo(State.UNSTARTED), equalTo(State.FINISHED)));
+
+            state = State.STARTED;
+        }
+
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+            System.out.println("Recv elem: " + c.element());
+            assertThat(state,
+                anyOf(equalTo(State.STARTED), equalTo(State.PROCESSING)));
+            state = State.PROCESSING;
+            outputToAllWithSideInputs(c, "processing: " + c.element());
+        }
+
+        @FinishBundle
+        public void finishBundle(FinishBundleContext c) {
+            assertThat(state,
+                anyOf(equalTo(State.STARTED), equalTo(State.PROCESSING)));
+            state = State.FINISHED;
+            c.output("finished", BoundedWindow.TIMESTAMP_MIN_VALUE, 
GlobalWindow.INSTANCE);
+            for (TupleTag<String> additionalOutputTupleTag : 
additionalOutputTupleTags) {
+                c.output(
+                    additionalOutputTupleTag,
+                    additionalOutputTupleTag.getId() + ": " + "finished",
+                    BoundedWindow.TIMESTAMP_MIN_VALUE,
+                    GlobalWindow.INSTANCE);
+            }
+        }
+
+        private void outputToAllWithSideInputs(ProcessContext c, String value) 
{
+            if (!sideInputViews.isEmpty()) {
+                List<Integer> sideInputValues = new ArrayList<>();
+                for (PCollectionView<Integer> sideInputView : sideInputViews) {
+                    sideInputValues.add(c.sideInput(sideInputView));
+                }
+                value += ": " + sideInputValues;
+            }
+            c.output(value);
+            for (TupleTag<String> additionalOutputTupleTag : 
additionalOutputTupleTags) {
+                c.output(additionalOutputTupleTag,
+                    additionalOutputTupleTag.getId() + ": " + value);
+            }
+        }
+    }
+
+    private static class MyInteger implements Comparable<MyInteger> {
+        private final int value;
+
+        MyInteger(int value) {
+            this.value = value;
+        }
+
+        public int getValue() {
+            return value;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+
+            if (!(o instanceof MyInteger)) {
+                return false;
+            }
+
+            MyInteger myInteger = (MyInteger) o;
+
+            return value == myInteger.value;
+
+        }
+
+        @Override
+        public int hashCode() {
+            return value;
+        }
+
+        @Override
+        public int compareTo(MyInteger o) {
+            return Integer.compare(this.getValue(), o.getValue());
+        }
+
+        @Override
+        public String toString() {
+            return "MyInteger{" + "value=" + value + '}';
+        }
+    }
+
+    private static class MyIntegerCoder extends AtomicCoder<MyInteger> {
+        private static final MyIntegerCoder INSTANCE = new MyIntegerCoder();
+
+        private final VarIntCoder delegate = VarIntCoder.of();
+
+        public static MyIntegerCoder of() {
+            return INSTANCE;
+        }
+
+        @Override
+        public void encode(MyInteger value, OutputStream outStream)
+                throws CoderException, IOException {
+            delegate.encode(value.getValue(), outStream);
+        }
+
+        @Override
+        public MyInteger decode(InputStream inStream) throws CoderException,
+                IOException {
+            return new MyInteger(delegate.decode(inStream));
+        }
+    }
+
+    /** PAssert "matcher" for expected output. */
+    static class HasExpectedOutput
+        implements SerializableFunction<Iterable<String>, Void>, Serializable {
+        private final List<Integer> inputs;
+        private final List<Integer> sideInputs;
+        private final String additionalOutput;
+        private final boolean ordered;
+
+        public static HasExpectedOutput forInput(List<Integer> inputs) {
+            return new HasExpectedOutput(
+                new ArrayList<Integer>(inputs),
+                new ArrayList<Integer>(),
+                "",
+                false);
+        }
+
+        private HasExpectedOutput(List<Integer> inputs,
+                                  List<Integer> sideInputs,
+                                  String additionalOutput,
+                                  boolean ordered) {
+            this.inputs = inputs;
+            this.sideInputs = sideInputs;
+            this.additionalOutput = additionalOutput;
+            this.ordered = ordered;
+        }
+
+        public HasExpectedOutput andSideInputs(Integer... sideInputValues) {
+            List<Integer> sideInputs = new ArrayList<>();
+            for (Integer sideInputValue : sideInputValues) {
+                sideInputs.add(sideInputValue);
+            }
+            return new HasExpectedOutput(inputs, sideInputs, additionalOutput, 
ordered);
+        }
+
+        public HasExpectedOutput fromOutput(TupleTag<String> outputTag) {
+            return fromOutput(outputTag.getId());
+        }
+        public HasExpectedOutput fromOutput(String outputId) {
+            return new HasExpectedOutput(inputs, sideInputs, outputId, 
ordered);
+        }
+
+        public HasExpectedOutput inOrder() {
+            return new HasExpectedOutput(inputs, sideInputs, additionalOutput, 
true);
+        }
+
+        @Override
+        public Void apply(Iterable<String> outputs) {
+            List<String> processeds = new ArrayList<>();
+            List<String> finisheds = new ArrayList<>();
+            for (String output : outputs) {
+                if (output.contains("finished")) {
+                    finisheds.add(output);
+                } else {
+                    processeds.add(output);
+                }
+            }
+
+            String sideInputsSuffix;
+            if (sideInputs.isEmpty()) {
+                sideInputsSuffix = "";
+            } else {
+                sideInputsSuffix = ": " + sideInputs;
+            }
+
+            String additionalOutputPrefix;
+            if (additionalOutput.isEmpty()) {
+                additionalOutputPrefix = "";
+            } else {
+                additionalOutputPrefix = additionalOutput + ": ";
+            }
+
+            List<String> expectedProcesseds = new ArrayList<>();
+            for (Integer input : inputs) {
+                expectedProcesseds.add(
+                    additionalOutputPrefix + "processing: " + input + 
sideInputsSuffix);
+            }
+            String[] expectedProcessedsArray =
+                expectedProcesseds.toArray(new 
String[expectedProcesseds.size()]);
+            if (!ordered || expectedProcesseds.isEmpty()) {
+                assertThat(processeds, 
containsInAnyOrder(expectedProcessedsArray));
+            } else {
+                assertThat(processeds, contains(expectedProcessedsArray));
+            }
+
+            for (String finished : finisheds) {
+                assertEquals(additionalOutputPrefix + "finished", finished);
+            }
+
+            return null;
+        }
+    }
+}

Reply via email to