jstorm-runner: remove ValidatesRunner tests and dead code from jstorm module.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4ff42cbc Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4ff42cbc Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4ff42cbc Branch: refs/heads/jstorm-runner Commit: 4ff42cbc65452ae6259d90f07f2f80423eeb69df Parents: aa251a4 Author: Pei He <[email protected]> Authored: Thu Jul 13 18:38:49 2017 +0800 Committer: Pei He <[email protected]> Committed: Sat Aug 19 12:02:56 2017 +0800 ---------------------------------------------------------------------- .../jstorm/translation/TranslatorRegistry.java | 18 - .../translator/CombineGloballyTranslator.java | 25 - .../translator/CombinePerKeyTranslator.java | 25 - .../translator/ReshuffleTranslator.java | 24 - .../translator/WindowBoundTranslator.java | 47 -- .../util/DefaultSideInputReader.java | 45 -- .../translator/CoGroupByKeyTest.java | 301 --------- .../translation/translator/GroupByKeyTest.java | 155 ----- .../translation/translator/ParDoTest.java | 624 ------------------- 9 files changed, 1264 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/4ff42cbc/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java index bce5b3e..316186e 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java @@ -49,30 +49,12 @@ public class TranslatorRegistry { static { TRANSLATORS.put(Read.Bounded.class, new BoundedSourceTranslator()); TRANSLATORS.put(Read.Unbounded.class, new UnboundedSourceTranslator()); - // TRANSLATORS.put(Write.Bound.class, new WriteSinkStreamingTranslator()); - // TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator()); - TRANSLATORS.put(ParDo.SingleOutput.class, new ParDoBoundTranslator()); TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoBoundMultiTranslator()); - - //TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslator<>()); TRANSLATORS.put(Window.Assign.class, new WindowAssignTranslator<>()); - TRANSLATORS.put(Flatten.PCollections.class, new FlattenTranslator()); - TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslator()); - TRANSLATORS.put(ViewTranslator.CreateJStormPCollectionView.class, new ViewTranslator()); - - /** - * Currently, empty translation is required for combine and reshuffle. - * Because, the transforms will be mapped to GroupByKey and Pardo finally. - * So we only need to translator the finally transforms. - * If any improvement is required, the composite transforms will be translated in the future. - */ - // TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslator()); - // TRANSLATORS.put(Globally.class, new CombineGloballyTranslator()); - // TRANSLATORS.put(Reshuffle.class, new ReshuffleTranslator()); } public static TransformTranslator<?> getTranslator(PTransform<?, ?> transform) { http://git-wip-us.apache.org/repos/asf/beam/blob/4ff42cbc/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombineGloballyTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombineGloballyTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombineGloballyTranslator.java deleted file mode 100644 index fe5fca9..0000000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombineGloballyTranslator.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation.translator; - -import org.apache.beam.sdk.transforms.Combine; - -public class CombineGloballyTranslator<InputT, OutputT> - extends TransformTranslator.Default<Combine.Globally<InputT, OutputT>> { - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/4ff42cbc/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombinePerKeyTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombinePerKeyTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombinePerKeyTranslator.java deleted file mode 100644 index c382fb7..0000000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombinePerKeyTranslator.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation.translator; - -import org.apache.beam.sdk.transforms.Combine; - -public class CombinePerKeyTranslator<K, InputT, OutputT> - extends TransformTranslator.Default<Combine.PerKey<K, InputT, OutputT>> { - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/4ff42cbc/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ReshuffleTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ReshuffleTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ReshuffleTranslator.java deleted file mode 100644 index c450a22..0000000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ReshuffleTranslator.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation.translator; - -import org.apache.beam.sdk.transforms.Reshuffle; - -public class ReshuffleTranslator<K, V> extends TransformTranslator.Default<Reshuffle<K, V>> { - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/4ff42cbc/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowBoundTranslator.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowBoundTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowBoundTranslator.java deleted file mode 100644 index c863c9e..0000000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowBoundTranslator.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation.translator; - -import org.apache.beam.runners.jstorm.translation.TranslationContext; -import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.SlidingWindows; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Translates a Window.Bound node into a Storm WindowedBolt - * - * @param <T> - */ -public class WindowBoundTranslator<T> extends TransformTranslator.Default<Window.Assign<T>> { - private static final Logger LOG = LoggerFactory.getLogger(WindowBoundTranslator.class); - - // Do nothing here currently. The assign of window strategy is included in AssignTranslator. - @Override - public void translateNode(Window.Assign<T> transform, TranslationContext context) { - if (transform.getWindowFn() instanceof FixedWindows) { - context.getUserGraphContext().setWindowed(); - } else if (transform.getWindowFn() instanceof SlidingWindows) { - context.getUserGraphContext().setWindowed(); - } else { - throw new UnsupportedOperationException( - "Not supported window type currently: " + transform.getWindowFn()); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/4ff42cbc/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultSideInputReader.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultSideInputReader.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultSideInputReader.java deleted file mode 100644 index 750095e..0000000 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultSideInputReader.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation.util; - -import java.io.Serializable; -import javax.annotation.Nullable; -import org.apache.beam.runners.core.SideInputReader; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.values.PCollectionView; - -/** - * No-op SideInputReader implementation. - */ -public class DefaultSideInputReader implements SideInputReader, Serializable { - @Nullable - @Override - public <T> T get(PCollectionView<T> pCollectionView, BoundedWindow boundedWindow) { - return null; - } - - @Override - public <T> boolean contains(PCollectionView<T> pCollectionView) { - return false; - } - - @Override - public boolean isEmpty() { - return true; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/4ff42cbc/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/CoGroupByKeyTest.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/CoGroupByKeyTest.java b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/CoGroupByKeyTest.java deleted file mode 100644 index 809436e..0000000 --- a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/CoGroupByKeyTest.java +++ /dev/null @@ -1,301 +0,0 @@ -package org.apache.beam.runners.jstorm.translation.translator; - -import org.apache.beam.runners.jstorm.TestJStormRunner; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.BigEndianIntegerCoder; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.ValidatesRunner; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.transforms.join.CoGbkResult; -import org.apache.beam.sdk.transforms.join.CoGroupByKey; -import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; -import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TupleTag; -import org.joda.time.Duration; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; - -import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; - -@RunWith(JUnit4.class) -public class CoGroupByKeyTest implements Serializable { - /** - * Converts the given list into a PCollection belonging to the provided - * Pipeline in such a way that coder inference needs to be performed. - */ - private PCollection<KV<Integer, String>> createInput(String name, - Pipeline p, List<KV<Integer, String>> list) { - return createInput(name, p, list, new ArrayList<Long>()); - } - - /** - * Converts the given list with timestamps into a PCollection. - */ - private PCollection<KV<Integer, String>> createInput(String name, - Pipeline p, List<KV<Integer, String>> list, List<Long> timestamps) { - PCollection<KV<Integer, String>> input; - if (timestamps.isEmpty()) { - input = p.apply("Create" + name, Create.of(list) - .withCoder(KvCoder.of(BigEndianIntegerCoder.of(), StringUtf8Coder.of()))); - } else { - input = p.apply("Create" + name, Create.timestamped(list, timestamps) - .withCoder(KvCoder.of(BigEndianIntegerCoder.of(), StringUtf8Coder.of()))); - } - return input.apply( - "Identity" + name, - ParDo.of( - new DoFn<KV<Integer, String>, KV<Integer, String>>() { - @ProcessElement - public void processElement(ProcessContext c) { - c.output(c.element()); - } - })); - } - - /** - * Returns a {@code PCollection<KV<Integer, CoGbkResult>>} containing the result - * of a {@link CoGroupByKey} over 2 {@code PCollection<KV<Integer, String>>}, - * where each {@link PCollection} has no duplicate keys and the key sets of - * each {@link PCollection} are intersecting but neither is a subset of the other. - */ - private PCollection<KV<Integer, CoGbkResult>> buildGetOnlyGbk( - Pipeline p, - TupleTag<String> tag1, - TupleTag<String> tag2) { - List<KV<Integer, String>> list1 = - Arrays.asList( - KV.of(1, "collection1-1"), - KV.of(2, "collection1-2")); - List<KV<Integer, String>> list2 = - Arrays.asList( - KV.of(2, "collection2-2"), - KV.of(3, "collection2-3")); - PCollection<KV<Integer, String>> collection1 = createInput("CreateList1", p, list1); - PCollection<KV<Integer, String>> collection2 = createInput("CreateList2", p, list2); - PCollection<KV<Integer, CoGbkResult>> coGbkResults = - KeyedPCollectionTuple.of(tag1, collection1) - .and(tag2, collection2) - .apply(CoGroupByKey.<Integer>create()); - return coGbkResults; - } - - @Test - @Category(ValidatesRunner.class) - public void testCoGroupByKeyGetOnly() { - StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class); - options.setRunner(TestJStormRunner.class); - options.setLocalMode(true); - - Pipeline p = Pipeline.create(options); - - final TupleTag<String> tag1 = new TupleTag<>(); - final TupleTag<String> tag2 = new TupleTag<>(); - - PCollection<KV<Integer, CoGbkResult>> coGbkResults = - buildGetOnlyGbk(p, tag1, tag2); - - PAssert.thatMap(coGbkResults).satisfies( - new SerializableFunction<Map<Integer, CoGbkResult>, Void>() { - @Override - public Void apply(Map<Integer, CoGbkResult> results) { - assertEquals("collection1-1", results.get(1).getOnly(tag1)); - assertEquals("collection1-2", results.get(2).getOnly(tag1)); - assertEquals("collection2-2", results.get(2).getOnly(tag2)); - assertEquals("collection2-3", results.get(3).getOnly(tag2)); - return null; - } - }); - - p.run(); - } - - /** - * Returns a {@code PCollection<KV<Integer, CoGbkResult>>} containing the - * results of the {@code CoGroupByKey} over three - * {@code PCollection<KV<Integer, String>>}, each of which correlates - * a customer id to purchases, addresses, or names, respectively. - */ - private PCollection<KV<Integer, CoGbkResult>> buildPurchasesCoGbk( - Pipeline p, - TupleTag<String> purchasesTag, - TupleTag<String> addressesTag, - TupleTag<String> namesTag) { - List<KV<Integer, String>> idToPurchases = - Arrays.asList( - KV.of(2, "Boat"), - KV.of(1, "Shoes"), - KV.of(3, "Car"), - KV.of(1, "Book"), - KV.of(10, "Pens"), - KV.of(8, "House"), - KV.of(4, "Suit"), - KV.of(11, "House"), - KV.of(14, "Shoes"), - KV.of(2, "Suit"), - KV.of(8, "Suit Case"), - KV.of(3, "House")); - - List<KV<Integer, String>> idToAddress = - Arrays.asList( - KV.of(2, "53 S. 3rd"), - KV.of(10, "383 Jackson Street"), - KV.of(20, "3 W. Arizona"), - KV.of(3, "29 School Rd"), - KV.of(8, "6 Watling Rd")); - - List<KV<Integer, String>> idToName = - Arrays.asList( - KV.of(1, "John Smith"), - KV.of(2, "Sally James"), - KV.of(8, "Jeffery Spalding"), - KV.of(20, "Joan Lichtfield")); - - PCollection<KV<Integer, String>> purchasesTable = - createInput("CreateIdToPurchases", p, idToPurchases); - - PCollection<KV<Integer, String>> addressTable = - createInput("CreateIdToAddress", p, idToAddress); - - PCollection<KV<Integer, String>> nameTable = - createInput("CreateIdToName", p, idToName); - - PCollection<KV<Integer, CoGbkResult>> coGbkResults = - KeyedPCollectionTuple.of(namesTag, nameTable) - .and(addressesTag, addressTable) - .and(purchasesTag, purchasesTable) - .apply(CoGroupByKey.<Integer>create()); - return coGbkResults; - } - - /** - * Returns a {@code PCollection<KV<Integer, CoGbkResult>>} containing the - * results of the {@code CoGroupByKey} over 2 {@code PCollection<KV<Integer, String>>}, - * each of which correlates a customer id to clicks, purchases, respectively. - */ - private PCollection<KV<Integer, CoGbkResult>> buildPurchasesCoGbkWithWindowing( - Pipeline p, - TupleTag<String> clicksTag, - TupleTag<String> purchasesTag) { - List<KV<Integer, String>> idToClick = - Arrays.asList( - KV.of(1, "Click t0"), - KV.of(2, "Click t2"), - KV.of(1, "Click t4"), - KV.of(1, "Click t6"), - KV.of(2, "Click t8")); - - List<KV<Integer, String>> idToPurchases = - Arrays.asList( - KV.of(1, "Boat t1"), - KV.of(1, "Shoesi t2"), - KV.of(1, "Pens t3"), - KV.of(2, "House t4"), - KV.of(2, "Suit t5"), - KV.of(1, "Car t6"), - KV.of(1, "Book t7"), - KV.of(2, "House t8"), - KV.of(2, "Shoes t9"), - KV.of(2, "House t10")); - - PCollection<KV<Integer, String>> clicksTable = - createInput("CreateClicks", - p, - idToClick, - Arrays.asList(0L, 2L, 4L, 6L, 8L)) - .apply("WindowClicks", Window.<KV<Integer, String>>into( - FixedWindows.of(new Duration(4))) - .withTimestampCombiner(TimestampCombiner.EARLIEST)); - - PCollection<KV<Integer, String>> purchasesTable = - createInput("CreatePurchases", - p, - idToPurchases, - Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L)) - .apply("WindowPurchases", Window.<KV<Integer, String>>into( - FixedWindows.of(new Duration(4))) - .withTimestampCombiner(TimestampCombiner.EARLIEST)); - - PCollection<KV<Integer, CoGbkResult>> coGbkResults = - KeyedPCollectionTuple.of(clicksTag, clicksTable) - .and(purchasesTag, purchasesTable) - .apply(CoGroupByKey.<Integer>create()); - return coGbkResults; - } - - @Test - @Category(ValidatesRunner.class) - public void testCoGroupByKey() { - StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class); - options.setRunner(TestJStormRunner.class); - options.setLocalMode(true); - - Pipeline p = Pipeline.create(options); - - final TupleTag<String> namesTag = new TupleTag<>(); - final TupleTag<String> addressesTag = new TupleTag<>(); - final TupleTag<String> purchasesTag = new TupleTag<>(); - - - PCollection<KV<Integer, CoGbkResult>> coGbkResults = - buildPurchasesCoGbk(p, purchasesTag, addressesTag, namesTag); - - PAssert.thatMap(coGbkResults).satisfies( - new SerializableFunction<Map<Integer, CoGbkResult>, Void>() { - @Override - public Void apply(Map<Integer, CoGbkResult> results) { - CoGbkResult result1 = results.get(1); - assertEquals("John Smith", result1.getOnly(namesTag)); - assertThat(result1.getAll(purchasesTag), containsInAnyOrder("Shoes", "Book")); - - CoGbkResult result2 = results.get(2); - assertEquals("Sally James", result2.getOnly(namesTag)); - assertEquals("53 S. 3rd", result2.getOnly(addressesTag)); - assertThat(result2.getAll(purchasesTag), containsInAnyOrder("Suit", "Boat")); - - CoGbkResult result3 = results.get(3); - assertEquals("29 School Rd", result3.getOnly(addressesTag), "29 School Rd"); - assertThat(result3.getAll(purchasesTag), containsInAnyOrder("Car", "House")); - - CoGbkResult result8 = results.get(8); - assertEquals("Jeffery Spalding", result8.getOnly(namesTag)); - assertEquals("6 Watling Rd", result8.getOnly(addressesTag)); - assertThat(result8.getAll(purchasesTag), containsInAnyOrder("House", "Suit Case")); - - CoGbkResult result20 = results.get(20); - assertEquals("Joan Lichtfield", result20.getOnly(namesTag)); - assertEquals("3 W. Arizona", result20.getOnly(addressesTag)); - - assertEquals("383 Jackson Street", results.get(10).getOnly(addressesTag)); - - assertThat(results.get(4).getAll(purchasesTag), containsInAnyOrder("Suit")); - assertThat(results.get(10).getAll(purchasesTag), containsInAnyOrder("Pens")); - assertThat(results.get(11).getAll(purchasesTag), containsInAnyOrder("House")); - assertThat(results.get(14).getAll(purchasesTag), containsInAnyOrder("Shoes")); - - return null; - } - }); - - p.run(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/beam/blob/4ff42cbc/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTest.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTest.java b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTest.java deleted file mode 100644 index 9a8b43a..0000000 --- a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTest.java +++ /dev/null @@ -1,155 +0,0 @@ - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation.translator; - -import org.apache.beam.runners.jstorm.TestJStormRunner; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.BigEndianIntegerCoder; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.hamcrest.Description; -import org.hamcrest.Matcher; -import org.hamcrest.TypeSafeMatcher; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.util.Arrays; -import java.util.List; - -import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; -import static org.hamcrest.core.Is.is; -import static org.junit.Assert.assertThat; - -/** - * Tests for {@link GroupByKey} with {@link StormRunner}. - */ -@RunWith(JUnit4.class) -public class GroupByKeyTest { - - static final String[] WORDS_ARRAY = new String[] { - "hi", "there", "hi", "hi", "sue", "bob", - "hi", "sue", "", "", "ZOW", "bob", "" }; - - static final List<String> WORDS = Arrays.asList(WORDS_ARRAY); - - @Test - public void testGroupByKey() { - StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class); - options.setRunner(TestJStormRunner.class); - options.setLocalMode(true); - - Pipeline p = Pipeline.create(options); - - List<KV<String, Integer>> ungroupedPairs = Arrays.asList( - KV.of("k1", 3), - KV.of("k5", Integer.MAX_VALUE), - KV.of("k5", Integer.MIN_VALUE), - KV.of("k2", 66), - KV.of("k1", 4), - KV.of("k2", -33), - KV.of("k3", 0)); - - PCollection<KV<String, Integer>> input = - p.apply(Create.of(ungroupedPairs) - .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))); - - PCollection<KV<String, Iterable<Integer>>> output = - input.apply(GroupByKey.<String, Integer>create()); - - PAssert.that(output) - .satisfies(new AssertThatHasExpectedContentsForTestGroupByKey()); - - p.run(); - } - - @Test - public void testCountGloballyBasic() { - StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class); - options.setRunner(TestJStormRunner.class); - options.setLocalMode(true); - - Pipeline p = Pipeline.create(options); - PCollection<String> input = p.apply(Create.of(WORDS)); - - PCollection<Long> output = - input.apply(Count.<String>globally()); - - PAssert.that(output) - .containsInAnyOrder(13L); - p.run(); - } - - static class AssertThatHasExpectedContentsForTestGroupByKey - implements SerializableFunction<Iterable<KV<String, Iterable<Integer>>>, - Void> { - @Override - public Void apply(Iterable<KV<String, Iterable<Integer>>> actual) { - assertThat(actual, containsInAnyOrder( - KvMatcher.isKv(is("k1"), containsInAnyOrder(3, 4)), - KvMatcher.isKv(is("k5"), containsInAnyOrder(Integer.MAX_VALUE, - Integer.MIN_VALUE)), - KvMatcher.isKv(is("k2"), containsInAnyOrder(66, -33)), - KvMatcher.isKv(is("k3"), containsInAnyOrder(0)))); - return null; - } - } - - /** - * Matcher for KVs. - */ - public static class KvMatcher<K, V> - extends TypeSafeMatcher<KV<? extends K, ? extends V>> { - final Matcher<? super K> keyMatcher; - final Matcher<? super V> valueMatcher; - - public static <K, V> KvMatcher<K, V> isKv(Matcher<K> keyMatcher, - Matcher<V> valueMatcher) { - return new KvMatcher<>(keyMatcher, valueMatcher); - } - - public KvMatcher(Matcher<? super K> keyMatcher, - Matcher<? super V> valueMatcher) { - this.keyMatcher = keyMatcher; - this.valueMatcher = valueMatcher; - } - - @Override - public boolean matchesSafely(KV<? extends K, ? extends V> kv) { - return keyMatcher.matches(kv.getKey()) - && valueMatcher.matches(kv.getValue()); - } - - @Override - public void describeTo(Description description) { - description - .appendText("a KV(").appendValue(keyMatcher) - .appendText(", ").appendValue(valueMatcher) - .appendText(")"); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/4ff42cbc/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/ParDoTest.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/ParDoTest.java b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/ParDoTest.java deleted file mode 100644 index c911364..0000000 --- a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/ParDoTest.java +++ /dev/null @@ -1,624 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.jstorm.translation.translator; - -import org.apache.beam.runners.jstorm.TestJStormRunner; -import com.google.common.base.MoreObjects; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.*; -import org.apache.beam.sdk.io.GenerateSequence; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.state.*; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.UsesMapState; -import org.apache.beam.sdk.testing.UsesStatefulParDo; -import org.apache.beam.sdk.testing.ValidatesRunner; -import org.apache.beam.sdk.transforms.*; -import org.apache.beam.sdk.transforms.windowing.*; -import org.apache.beam.sdk.values.*; -import org.joda.time.Duration; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.Serializable; -import java.util.*; - -import static org.hamcrest.Matchers.anyOf; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; -import static org.hamcrest.collection.IsIterableContainingInOrder.contains; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; - -/** - * Tests for {@link ParDo} with {@link StormRunner}. - */ -@RunWith(JUnit4.class) -public class ParDoTest implements Serializable { - - @Test - public void testParDo() throws IOException { - StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class); - options.setRunner(TestJStormRunner.class); - Pipeline pipeline = Pipeline.create(options); - - List<Integer> inputs = Arrays.asList(3, -42, 666); - - PCollection<String> output = pipeline - .apply(Create.of(inputs)) - .apply(ParDo.of(new TestDoFn())); - - PAssert.that(output) - .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs)); - - pipeline.run(); - } - - @Test - public void testParDoWithSideInputs() throws IOException { - StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class); - options.setRunner(TestJStormRunner.class); - Pipeline pipeline = Pipeline.create(options); - - List<Integer> inputs = Arrays.asList(3, -42, 666); - - PCollectionView<Integer> sideInput1 = pipeline - .apply("CreateSideInput1", Create.of(11)) - .apply("ViewSideInput1", View.<Integer>asSingleton()); - PCollectionView<Integer> sideInputUnread = pipeline - .apply("CreateSideInputUnread", Create.of(-3333)) - .apply("ViewSideInputUnread", View.<Integer>asSingleton()); - - PCollectionView<Integer> sideInput2 = pipeline - .apply("CreateSideInput2", Create.of(222)) - .apply("ViewSideInput2", View.<Integer>asSingleton()); - PCollection<String> output = pipeline - .apply(Create.of(inputs)) - .apply(ParDo.of(new TestDoFn( - Arrays.asList(sideInput1, sideInput2), - Arrays.<TupleTag<String>>asList())) - .withSideInputs(sideInput1, sideInputUnread, sideInput2)); - - PAssert.that(output) - .satisfies(ParDoTest.HasExpectedOutput - .forInput(inputs) - .andSideInputs(11, 222)); - - pipeline.run(); - } - - @Test - public void testParDoWithTaggedOutput() { - List<Integer> inputs = Arrays.asList(3, -42, 666); - - TupleTag<String> mainOutputTag = new TupleTag<String>("main"){}; - TupleTag<String> additionalOutputTag1 = new TupleTag<String>("additional1"){}; - TupleTag<String> additionalOutputTag2 = new TupleTag<String>("additional2"){}; - TupleTag<String> additionalOutputTag3 = new TupleTag<String>("additional3"){}; - TupleTag<String> additionalOutputTagUnwritten = new TupleTag<String>("unwrittenOutput"){}; - - StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class); - options.setRunner(TestJStormRunner.class); - Pipeline pipeline = Pipeline.create(options); - - PCollectionTuple outputs = pipeline - .apply(Create.of(inputs)) - .apply(ParDo - .of(new TestDoFn( - Arrays.<PCollectionView<Integer>>asList(), - Arrays.asList(additionalOutputTag1, additionalOutputTag2, additionalOutputTag3))) - .withOutputTags( - mainOutputTag, - TupleTagList.of(additionalOutputTag3) - .and(additionalOutputTag1) - .and(additionalOutputTagUnwritten) - .and(additionalOutputTag2))); - - PAssert.that(outputs.get(mainOutputTag)) - .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs)); - - PAssert.that(outputs.get(additionalOutputTag1)) - .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs) - .fromOutput(additionalOutputTag1)); - PAssert.that(outputs.get(additionalOutputTag2)) - .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs) - .fromOutput(additionalOutputTag2)); - PAssert.that(outputs.get(additionalOutputTag3)) - .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs) - .fromOutput(additionalOutputTag3)); - PAssert.that(outputs.get(additionalOutputTagUnwritten)).empty(); - - pipeline.run(); - } - - @Test - public void testNoWindowFnDoesNotReassignWindows() { - StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class); - options.setRunner(TestJStormRunner.class); - Pipeline pipeline = Pipeline.create(options); - - final PCollection<Long> initialWindows = - pipeline - .apply(GenerateSequence.from(0).to(10)) - .apply("AssignWindows", Window.into(new WindowOddEvenBuckets())); - - // Sanity check the window assignment to demonstrate the baseline - PAssert.that(initialWindows) - .inWindow(WindowOddEvenBuckets.EVEN_WINDOW) - .containsInAnyOrder(0L, 2L, 4L, 6L, 8L); - PAssert.that(initialWindows) - .inWindow(WindowOddEvenBuckets.ODD_WINDOW) - .containsInAnyOrder(1L, 3L, 5L, 7L, 9L); - - PCollection<Boolean> upOne = - initialWindows.apply( - "ModifyTypes", - MapElements.<Long, Boolean>via( - new SimpleFunction<Long, Boolean>() { - @Override - public Boolean apply(Long input) { - return input % 2 == 0; - } - })); - PAssert.that(upOne) - .inWindow(WindowOddEvenBuckets.EVEN_WINDOW) - .containsInAnyOrder(true, true, true, true, true); - PAssert.that(upOne) - .inWindow(WindowOddEvenBuckets.ODD_WINDOW) - .containsInAnyOrder(false, false, false, false, false); - - // The elements should be in the same windows, even though they would not be assigned to the - // same windows with the updated timestamps. If we try to apply the original WindowFn, the type - // will not be appropriate and the runner should crash, as a Boolean cannot be converted into - // a long. - PCollection<Boolean> updatedTrigger = - upOne.apply( - "UpdateWindowingStrategy", - Window.<Boolean>configure().triggering(Never.ever()) - .withAllowedLateness(Duration.ZERO) - .accumulatingFiredPanes()); - pipeline.run(); - } - - @Test - public void testValueStateSameId() { - StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class); - options.setRunner(TestJStormRunner.class); - Pipeline pipeline = Pipeline.create(options); - - final String stateId = "foo"; - - DoFn<KV<String, Integer>, KV<String, Integer>> fn = - new DoFn<KV<String, Integer>, KV<String, Integer>>() { - - @StateId(stateId) - private final StateSpec<ValueState<Integer>> intState = - StateSpecs.value(VarIntCoder.of()); - - @ProcessElement - public void processElement( - ProcessContext c, @StateId(stateId) ValueState<Integer> state) { - Integer currentValue = MoreObjects.firstNonNull(state.read(), 0); - c.output(KV.of("sizzle", currentValue)); - state.write(currentValue + 1); - } - }; - - DoFn<KV<String, Integer>, Integer> fn2 = - new DoFn<KV<String, Integer>, Integer>() { - - @StateId(stateId) - private final StateSpec<ValueState<Integer>> intState = - StateSpecs.value(VarIntCoder.of()); - - @ProcessElement - public void processElement( - ProcessContext c, @StateId(stateId) ValueState<Integer> state) { - Integer currentValue = MoreObjects.firstNonNull(state.read(), 13); - c.output(currentValue); - state.write(currentValue + 13); - } - }; - - PCollection<KV<String, Integer>> intermediate = - pipeline.apply(Create.of(KV.of("hello", 42), KV.of("hello", 97), KV.of("hello", 84))) - .apply("First stateful ParDo", ParDo.of(fn)); - - PCollection<Integer> output = - intermediate.apply("Second stateful ParDo", ParDo.of(fn2)); - - PAssert.that(intermediate) - .containsInAnyOrder(KV.of("sizzle", 0), KV.of("sizzle", 1), KV.of("sizzle", 2)); - PAssert.that(output).containsInAnyOrder(13, 26, 39); - pipeline.run(); - } - - @Test - @Category({ValidatesRunner.class, UsesStatefulParDo.class}) - public void testValueStateTaggedOutput() { - StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class); - options.setRunner(TestJStormRunner.class); - Pipeline pipeline = Pipeline.create(options); - - final String stateId = "foo"; - - final TupleTag<Integer> evenTag = new TupleTag<Integer>() {}; - final TupleTag<Integer> oddTag = new TupleTag<Integer>() {}; - - DoFn<KV<String, Integer>, Integer> fn = - new DoFn<KV<String, Integer>, Integer>() { - - @StateId(stateId) - private final StateSpec<ValueState<Integer>> intState = - StateSpecs.value(VarIntCoder.of()); - - @ProcessElement - public void processElement( - ProcessContext c, @StateId(stateId) ValueState<Integer> state) { - Integer currentValue = MoreObjects.firstNonNull(state.read(), 0); - if (currentValue % 2 == 0) { - c.output(currentValue); - } else { - c.output(oddTag, currentValue); - } - state.write(currentValue + 1); - } - }; - - PCollectionTuple output = - pipeline.apply( - Create.of( - KV.of("hello", 42), - KV.of("hello", 97), - KV.of("hello", 84), - KV.of("goodbye", 33), - KV.of("hello", 859), - KV.of("goodbye", 83945))) - .apply(ParDo.of(fn).withOutputTags(evenTag, TupleTagList.of(oddTag))); - - PCollection<Integer> evens = output.get(evenTag); - PCollection<Integer> odds = output.get(oddTag); - - // There are 0 and 2 from "hello" and just 0 from "goodbye" - PAssert.that(evens).containsInAnyOrder(0, 2, 0); - - // There are 1 and 3 from "hello" and just "1" from "goodbye" - PAssert.that(odds).containsInAnyOrder(1, 3, 1); - pipeline.run(); - } - - @Test - @Category({ValidatesRunner.class, UsesStatefulParDo.class, UsesMapState.class}) - public void testMapStateCoderInference() { - StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class); - options.setRunner(TestJStormRunner.class); - Pipeline pipeline = Pipeline.create(options); - - final String stateId = "foo"; - final String countStateId = "count"; - Coder<MyInteger> myIntegerCoder = MyIntegerCoder.of(); - pipeline.getCoderRegistry().registerCoderForClass(MyInteger.class, myIntegerCoder); - - DoFn<KV<String, KV<String, Integer>>, KV<String, MyInteger>> fn = - new DoFn<KV<String, KV<String, Integer>>, KV<String, MyInteger>>() { - - @StateId(stateId) - private final StateSpec<MapState<String, MyInteger>> mapState = StateSpecs.map(); - - @StateId(countStateId) - private final StateSpec<CombiningState<Integer, int[], Integer>> - countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(), - Sum.ofIntegers()); - - @ProcessElement - public void processElement( - ProcessContext c, @StateId(stateId) MapState<String, MyInteger> state, - @StateId(countStateId) CombiningState<Integer, int[], Integer> - count) { - KV<String, Integer> value = c.element().getValue(); - state.put(value.getKey(), new MyInteger(value.getValue())); - count.add(1); - if (count.read() >= 4) { - Iterable<Map.Entry<String, MyInteger>> iterate = state.entries().read(); - for (Map.Entry<String, MyInteger> entry : iterate) { - c.output(KV.of(entry.getKey(), entry.getValue())); - } - } - } - }; - - PCollection<KV<String, MyInteger>> output = - pipeline.apply( - Create.of( - KV.of("hello", KV.of("a", 97)), KV.of("hello", KV.of("b", 42)), - KV.of("hello", KV.of("b", 42)), KV.of("hello", KV.of("c", 12)))) - .apply(ParDo.of(fn)).setCoder(KvCoder.of(StringUtf8Coder.of(), myIntegerCoder)); - - PAssert.that(output).containsInAnyOrder(KV.of("a", new MyInteger(97)), - KV.of("b", new MyInteger(42)), KV.of("c", new MyInteger(12))); - pipeline.run(); - } - - - private static class WindowOddEvenBuckets extends NonMergingWindowFn<Long, IntervalWindow> { - private static final IntervalWindow EVEN_WINDOW = - new IntervalWindow( - BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE.maxTimestamp()); - private static final IntervalWindow ODD_WINDOW = - new IntervalWindow( - BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE.maxTimestamp().minus(1)); - - @Override - public Collection<IntervalWindow> assignWindows(AssignContext c) throws Exception { - if (c.element() % 2 == 0) { - return Collections.singleton(EVEN_WINDOW); - } - return Collections.singleton(ODD_WINDOW); - } - - @Override - public boolean isCompatible(WindowFn<?, ?> other) { - return other instanceof WindowOddEvenBuckets; - } - - @Override - public Coder<IntervalWindow> windowCoder() { - return new IntervalWindow.IntervalWindowCoder(); - } - - @Override - public WindowMappingFn<IntervalWindow> getDefaultWindowMappingFn() { - throw new UnsupportedOperationException( - String.format("Can't use %s for side inputs", getClass().getSimpleName())); - } - } - - - static class TestDoFn extends DoFn<Integer, String> { - enum State {NOT_SET_UP, UNSTARTED, STARTED, PROCESSING, FINISHED} - - State state = State.NOT_SET_UP; - - final List<PCollectionView<Integer>> sideInputViews = new ArrayList<>(); - final List<TupleTag<String>> additionalOutputTupleTags = new ArrayList<>(); - - public TestDoFn() { - } - - public TestDoFn(List<PCollectionView<Integer>> sideInputViews, - List<TupleTag<String>> additionalOutputTupleTags) { - this.sideInputViews.addAll(sideInputViews); - this.additionalOutputTupleTags.addAll(additionalOutputTupleTags); - } - - @Setup - public void prepare() { - assertEquals(State.NOT_SET_UP, state); - state = State.UNSTARTED; - } - - @StartBundle - public void startBundle() { - assertThat(state, - anyOf(equalTo(State.UNSTARTED), equalTo(State.FINISHED))); - - state = State.STARTED; - } - - @ProcessElement - public void processElement(ProcessContext c) { - System.out.println("Recv elem: " + c.element()); - assertThat(state, - anyOf(equalTo(State.STARTED), equalTo(State.PROCESSING))); - state = State.PROCESSING; - outputToAllWithSideInputs(c, "processing: " + c.element()); - } - - @FinishBundle - public void finishBundle(FinishBundleContext c) { - assertThat(state, - anyOf(equalTo(State.STARTED), equalTo(State.PROCESSING))); - state = State.FINISHED; - c.output("finished", BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE); - for (TupleTag<String> additionalOutputTupleTag : additionalOutputTupleTags) { - c.output( - additionalOutputTupleTag, - additionalOutputTupleTag.getId() + ": " + "finished", - BoundedWindow.TIMESTAMP_MIN_VALUE, - GlobalWindow.INSTANCE); - } - } - - private void outputToAllWithSideInputs(ProcessContext c, String value) { - if (!sideInputViews.isEmpty()) { - List<Integer> sideInputValues = new ArrayList<>(); - for (PCollectionView<Integer> sideInputView : sideInputViews) { - sideInputValues.add(c.sideInput(sideInputView)); - } - value += ": " + sideInputValues; - } - c.output(value); - for (TupleTag<String> additionalOutputTupleTag : additionalOutputTupleTags) { - c.output(additionalOutputTupleTag, - additionalOutputTupleTag.getId() + ": " + value); - } - } - } - - private static class MyInteger implements Comparable<MyInteger> { - private final int value; - - MyInteger(int value) { - this.value = value; - } - - public int getValue() { - return value; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - - if (!(o instanceof MyInteger)) { - return false; - } - - MyInteger myInteger = (MyInteger) o; - - return value == myInteger.value; - - } - - @Override - public int hashCode() { - return value; - } - - @Override - public int compareTo(MyInteger o) { - return Integer.compare(this.getValue(), o.getValue()); - } - - @Override - public String toString() { - return "MyInteger{" + "value=" + value + '}'; - } - } - - private static class MyIntegerCoder extends AtomicCoder<MyInteger> { - private static final MyIntegerCoder INSTANCE = new MyIntegerCoder(); - - private final VarIntCoder delegate = VarIntCoder.of(); - - public static MyIntegerCoder of() { - return INSTANCE; - } - - @Override - public void encode(MyInteger value, OutputStream outStream) - throws CoderException, IOException { - delegate.encode(value.getValue(), outStream); - } - - @Override - public MyInteger decode(InputStream inStream) throws CoderException, - IOException { - return new MyInteger(delegate.decode(inStream)); - } - } - - /** PAssert "matcher" for expected output. */ - static class HasExpectedOutput - implements SerializableFunction<Iterable<String>, Void>, Serializable { - private final List<Integer> inputs; - private final List<Integer> sideInputs; - private final String additionalOutput; - private final boolean ordered; - - public static HasExpectedOutput forInput(List<Integer> inputs) { - return new HasExpectedOutput( - new ArrayList<Integer>(inputs), - new ArrayList<Integer>(), - "", - false); - } - - private HasExpectedOutput(List<Integer> inputs, - List<Integer> sideInputs, - String additionalOutput, - boolean ordered) { - this.inputs = inputs; - this.sideInputs = sideInputs; - this.additionalOutput = additionalOutput; - this.ordered = ordered; - } - - public HasExpectedOutput andSideInputs(Integer... sideInputValues) { - List<Integer> sideInputs = new ArrayList<>(); - for (Integer sideInputValue : sideInputValues) { - sideInputs.add(sideInputValue); - } - return new HasExpectedOutput(inputs, sideInputs, additionalOutput, ordered); - } - - public HasExpectedOutput fromOutput(TupleTag<String> outputTag) { - return fromOutput(outputTag.getId()); - } - public HasExpectedOutput fromOutput(String outputId) { - return new HasExpectedOutput(inputs, sideInputs, outputId, ordered); - } - - public HasExpectedOutput inOrder() { - return new HasExpectedOutput(inputs, sideInputs, additionalOutput, true); - } - - @Override - public Void apply(Iterable<String> outputs) { - List<String> processeds = new ArrayList<>(); - List<String> finisheds = new ArrayList<>(); - for (String output : outputs) { - if (output.contains("finished")) { - finisheds.add(output); - } else { - processeds.add(output); - } - } - - String sideInputsSuffix; - if (sideInputs.isEmpty()) { - sideInputsSuffix = ""; - } else { - sideInputsSuffix = ": " + sideInputs; - } - - String additionalOutputPrefix; - if (additionalOutput.isEmpty()) { - additionalOutputPrefix = ""; - } else { - additionalOutputPrefix = additionalOutput + ": "; - } - - List<String> expectedProcesseds = new ArrayList<>(); - for (Integer input : inputs) { - expectedProcesseds.add( - additionalOutputPrefix + "processing: " + input + sideInputsSuffix); - } - String[] expectedProcessedsArray = - expectedProcesseds.toArray(new String[expectedProcesseds.size()]); - if (!ordered || expectedProcesseds.isEmpty()) { - assertThat(processeds, containsInAnyOrder(expectedProcessedsArray)); - } else { - assertThat(processeds, contains(expectedProcessedsArray)); - } - - for (String finished : finisheds) { - assertEquals(additionalOutputPrefix + "finished", finished); - } - - return null; - } - } -}
