Repository: incubator-beam Updated Branches: refs/heads/master d5814a38d -> 0f7b81615
Reorganize Java packages Move org.apache.contrib.joinlibrary to org.apache.beam.sdk.extensions.joinlibrary. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e4059412 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e4059412 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e4059412 Branch: refs/heads/master Commit: e4059412d111a223ee7cfda0004a62699b38f41f Parents: e2ca889 Author: Davor Bonaci <[email protected]> Authored: Mon Apr 25 16:45:59 2016 -0700 Committer: bchambers <[email protected]> Committed: Mon Apr 25 17:32:23 2016 -0700 ---------------------------------------------------------------------- .../beam/sdk/extensions/joinlibrary/Join.java | 186 +++++++++++++++++++ .../org/apache/contrib/joinlibrary/Join.java | 186 ------------------- .../extensions/joinlibrary/InnerJoinTest.java | 143 ++++++++++++++ .../joinlibrary/OuterLeftJoinTest.java | 153 +++++++++++++++ .../joinlibrary/OuterRightJoinTest.java | 153 +++++++++++++++ .../contrib/joinlibrary/InnerJoinTest.java | 143 -------------- .../contrib/joinlibrary/OuterLeftJoinTest.java | 153 --------------- .../contrib/joinlibrary/OuterRightJoinTest.java | 153 --------------- 8 files changed, 635 insertions(+), 635 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4059412/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java b/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java new file mode 100644 index 0000000..968a613 --- /dev/null +++ b/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.joinlibrary; + +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.join.CoGbkResult; +import org.apache.beam.sdk.transforms.join.CoGroupByKey; +import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TupleTag; +import com.google.common.base.Preconditions; + +/** + * Utility class with different versions of joins. All methods join two collections of + * key/value pairs (KV). + */ +public class Join { + + /** + * Inner join of two collections of KV elements. + * @param leftCollection Left side collection to join. + * @param rightCollection Right side collection to join. + * @param <K> Type of the key for both collections + * @param <V1> Type of the values for the left collection. + * @param <V2> Type of the values for the right collection. + * @return A joined collection of KV where Key is the key and value is a + * KV where Key is of type V1 and Value is type V2. + */ + public static <K, V1, V2> PCollection<KV<K, KV<V1, V2>>> innerJoin( + final PCollection<KV<K, V1>> leftCollection, final PCollection<KV<K, V2>> rightCollection) { + Preconditions.checkNotNull(leftCollection); + Preconditions.checkNotNull(rightCollection); + + final TupleTag<V1> v1Tuple = new TupleTag<>(); + final TupleTag<V2> v2Tuple = new TupleTag<>(); + + PCollection<KV<K, CoGbkResult>> coGbkResultCollection = + KeyedPCollectionTuple.of(v1Tuple, leftCollection) + .and(v2Tuple, rightCollection) + .apply(CoGroupByKey.<K>create()); + + return coGbkResultCollection.apply(ParDo.of( + new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() { + @Override + public void processElement(ProcessContext c) { + KV<K, CoGbkResult> e = c.element(); + + Iterable<V1> leftValuesIterable = e.getValue().getAll(v1Tuple); + Iterable<V2> rightValuesIterable = e.getValue().getAll(v2Tuple); + + for (V1 leftValue : leftValuesIterable) { + for (V2 rightValue : rightValuesIterable) { + c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue))); + } + } + } + })) + .setCoder(KvCoder.of(((KvCoder) leftCollection.getCoder()).getKeyCoder(), + KvCoder.of(((KvCoder) leftCollection.getCoder()).getValueCoder(), + ((KvCoder) rightCollection.getCoder()).getValueCoder()))); + } + + /** + * Left Outer Join of two collections of KV elements. + * @param leftCollection Left side collection to join. + * @param rightCollection Right side collection to join. + * @param nullValue Value to use as null value when right side do not match left side. + * @param <K> Type of the key for both collections + * @param <V1> Type of the values for the left collection. + * @param <V2> Type of the values for the right collection. + * @return A joined collection of KV where Key is the key and value is a + * KV where Key is of type V1 and Value is type V2. Values that + * should be null or empty is replaced with nullValue. + */ + public static <K, V1, V2> PCollection<KV<K, KV<V1, V2>>> leftOuterJoin( + final PCollection<KV<K, V1>> leftCollection, + final PCollection<KV<K, V2>> rightCollection, + final V2 nullValue) { + Preconditions.checkNotNull(leftCollection); + Preconditions.checkNotNull(rightCollection); + Preconditions.checkNotNull(nullValue); + + final TupleTag<V1> v1Tuple = new TupleTag<>(); + final TupleTag<V2> v2Tuple = new TupleTag<>(); + + PCollection<KV<K, CoGbkResult>> coGbkResultCollection = + KeyedPCollectionTuple.of(v1Tuple, leftCollection) + .and(v2Tuple, rightCollection) + .apply(CoGroupByKey.<K>create()); + + return coGbkResultCollection.apply(ParDo.of( + new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() { + @Override + public void processElement(ProcessContext c) { + KV<K, CoGbkResult> e = c.element(); + + Iterable<V1> leftValuesIterable = e.getValue().getAll(v1Tuple); + Iterable<V2> rightValuesIterable = e.getValue().getAll(v2Tuple); + + for (V1 leftValue : leftValuesIterable) { + if (rightValuesIterable.iterator().hasNext()) { + for (V2 rightValue : rightValuesIterable) { + c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue))); + } + } else { + c.output(KV.of(e.getKey(), KV.of(leftValue, nullValue))); + } + } + } + })) + .setCoder(KvCoder.of(((KvCoder) leftCollection.getCoder()).getKeyCoder(), + KvCoder.of(((KvCoder) leftCollection.getCoder()).getValueCoder(), + ((KvCoder) rightCollection.getCoder()).getValueCoder()))); + } + + /** + * Right Outer Join of two collections of KV elements. + * @param leftCollection Left side collection to join. + * @param rightCollection Right side collection to join. + * @param nullValue Value to use as null value when left side do not match right side. + * @param <K> Type of the key for both collections + * @param <V1> Type of the values for the left collection. + * @param <V2> Type of the values for the right collection. + * @return A joined collection of KV where Key is the key and value is a + * KV where Key is of type V1 and Value is type V2. Keys that + * should be null or empty is replaced with nullValue. + */ + public static <K, V1, V2> PCollection<KV<K, KV<V1, V2>>> rightOuterJoin( + final PCollection<KV<K, V1>> leftCollection, + final PCollection<KV<K, V2>> rightCollection, + final V1 nullValue) { + Preconditions.checkNotNull(leftCollection); + Preconditions.checkNotNull(rightCollection); + Preconditions.checkNotNull(nullValue); + + final TupleTag<V1> v1Tuple = new TupleTag<>(); + final TupleTag<V2> v2Tuple = new TupleTag<>(); + + PCollection<KV<K, CoGbkResult>> coGbkResultCollection = + KeyedPCollectionTuple.of(v1Tuple, leftCollection) + .and(v2Tuple, rightCollection) + .apply(CoGroupByKey.<K>create()); + + return coGbkResultCollection.apply(ParDo.of( + new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() { + @Override + public void processElement(ProcessContext c) { + KV<K, CoGbkResult> e = c.element(); + + Iterable<V1> leftValuesIterable = e.getValue().getAll(v1Tuple); + Iterable<V2> rightValuesIterable = e.getValue().getAll(v2Tuple); + + for (V2 rightValue : rightValuesIterable) { + if (leftValuesIterable.iterator().hasNext()) { + for (V1 leftValue : leftValuesIterable) { + c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue))); + } + } else { + c.output(KV.of(e.getKey(), KV.of(nullValue, rightValue))); + } + } + } + })) + .setCoder(KvCoder.of(((KvCoder) leftCollection.getCoder()).getKeyCoder(), + KvCoder.of(((KvCoder) leftCollection.getCoder()).getValueCoder(), + ((KvCoder) rightCollection.getCoder()).getValueCoder()))); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4059412/sdks/java/extensions/join-library/src/main/java/org/apache/contrib/joinlibrary/Join.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/join-library/src/main/java/org/apache/contrib/joinlibrary/Join.java b/sdks/java/extensions/join-library/src/main/java/org/apache/contrib/joinlibrary/Join.java deleted file mode 100644 index 6421e97..0000000 --- a/sdks/java/extensions/join-library/src/main/java/org/apache/contrib/joinlibrary/Join.java +++ /dev/null @@ -1,186 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.contrib.joinlibrary; - -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.join.CoGbkResult; -import org.apache.beam.sdk.transforms.join.CoGroupByKey; -import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TupleTag; -import com.google.common.base.Preconditions; - -/** - * Utility class with different versions of joins. All methods join two collections of - * key/value pairs (KV). - */ -public class Join { - - /** - * Inner join of two collections of KV elements. - * @param leftCollection Left side collection to join. - * @param rightCollection Right side collection to join. - * @param <K> Type of the key for both collections - * @param <V1> Type of the values for the left collection. - * @param <V2> Type of the values for the right collection. - * @return A joined collection of KV where Key is the key and value is a - * KV where Key is of type V1 and Value is type V2. - */ - public static <K, V1, V2> PCollection<KV<K, KV<V1, V2>>> innerJoin( - final PCollection<KV<K, V1>> leftCollection, final PCollection<KV<K, V2>> rightCollection) { - Preconditions.checkNotNull(leftCollection); - Preconditions.checkNotNull(rightCollection); - - final TupleTag<V1> v1Tuple = new TupleTag<>(); - final TupleTag<V2> v2Tuple = new TupleTag<>(); - - PCollection<KV<K, CoGbkResult>> coGbkResultCollection = - KeyedPCollectionTuple.of(v1Tuple, leftCollection) - .and(v2Tuple, rightCollection) - .apply(CoGroupByKey.<K>create()); - - return coGbkResultCollection.apply(ParDo.of( - new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() { - @Override - public void processElement(ProcessContext c) { - KV<K, CoGbkResult> e = c.element(); - - Iterable<V1> leftValuesIterable = e.getValue().getAll(v1Tuple); - Iterable<V2> rightValuesIterable = e.getValue().getAll(v2Tuple); - - for (V1 leftValue : leftValuesIterable) { - for (V2 rightValue : rightValuesIterable) { - c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue))); - } - } - } - })) - .setCoder(KvCoder.of(((KvCoder) leftCollection.getCoder()).getKeyCoder(), - KvCoder.of(((KvCoder) leftCollection.getCoder()).getValueCoder(), - ((KvCoder) rightCollection.getCoder()).getValueCoder()))); - } - - /** - * Left Outer Join of two collections of KV elements. - * @param leftCollection Left side collection to join. - * @param rightCollection Right side collection to join. - * @param nullValue Value to use as null value when right side do not match left side. - * @param <K> Type of the key for both collections - * @param <V1> Type of the values for the left collection. - * @param <V2> Type of the values for the right collection. - * @return A joined collection of KV where Key is the key and value is a - * KV where Key is of type V1 and Value is type V2. Values that - * should be null or empty is replaced with nullValue. - */ - public static <K, V1, V2> PCollection<KV<K, KV<V1, V2>>> leftOuterJoin( - final PCollection<KV<K, V1>> leftCollection, - final PCollection<KV<K, V2>> rightCollection, - final V2 nullValue) { - Preconditions.checkNotNull(leftCollection); - Preconditions.checkNotNull(rightCollection); - Preconditions.checkNotNull(nullValue); - - final TupleTag<V1> v1Tuple = new TupleTag<>(); - final TupleTag<V2> v2Tuple = new TupleTag<>(); - - PCollection<KV<K, CoGbkResult>> coGbkResultCollection = - KeyedPCollectionTuple.of(v1Tuple, leftCollection) - .and(v2Tuple, rightCollection) - .apply(CoGroupByKey.<K>create()); - - return coGbkResultCollection.apply(ParDo.of( - new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() { - @Override - public void processElement(ProcessContext c) { - KV<K, CoGbkResult> e = c.element(); - - Iterable<V1> leftValuesIterable = e.getValue().getAll(v1Tuple); - Iterable<V2> rightValuesIterable = e.getValue().getAll(v2Tuple); - - for (V1 leftValue : leftValuesIterable) { - if (rightValuesIterable.iterator().hasNext()) { - for (V2 rightValue : rightValuesIterable) { - c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue))); - } - } else { - c.output(KV.of(e.getKey(), KV.of(leftValue, nullValue))); - } - } - } - })) - .setCoder(KvCoder.of(((KvCoder) leftCollection.getCoder()).getKeyCoder(), - KvCoder.of(((KvCoder) leftCollection.getCoder()).getValueCoder(), - ((KvCoder) rightCollection.getCoder()).getValueCoder()))); - } - - /** - * Right Outer Join of two collections of KV elements. - * @param leftCollection Left side collection to join. - * @param rightCollection Right side collection to join. - * @param nullValue Value to use as null value when left side do not match right side. - * @param <K> Type of the key for both collections - * @param <V1> Type of the values for the left collection. - * @param <V2> Type of the values for the right collection. - * @return A joined collection of KV where Key is the key and value is a - * KV where Key is of type V1 and Value is type V2. Keys that - * should be null or empty is replaced with nullValue. - */ - public static <K, V1, V2> PCollection<KV<K, KV<V1, V2>>> rightOuterJoin( - final PCollection<KV<K, V1>> leftCollection, - final PCollection<KV<K, V2>> rightCollection, - final V1 nullValue) { - Preconditions.checkNotNull(leftCollection); - Preconditions.checkNotNull(rightCollection); - Preconditions.checkNotNull(nullValue); - - final TupleTag<V1> v1Tuple = new TupleTag<>(); - final TupleTag<V2> v2Tuple = new TupleTag<>(); - - PCollection<KV<K, CoGbkResult>> coGbkResultCollection = - KeyedPCollectionTuple.of(v1Tuple, leftCollection) - .and(v2Tuple, rightCollection) - .apply(CoGroupByKey.<K>create()); - - return coGbkResultCollection.apply(ParDo.of( - new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() { - @Override - public void processElement(ProcessContext c) { - KV<K, CoGbkResult> e = c.element(); - - Iterable<V1> leftValuesIterable = e.getValue().getAll(v1Tuple); - Iterable<V2> rightValuesIterable = e.getValue().getAll(v2Tuple); - - for (V2 rightValue : rightValuesIterable) { - if (leftValuesIterable.iterator().hasNext()) { - for (V1 leftValue : leftValuesIterable) { - c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue))); - } - } else { - c.output(KV.of(e.getKey(), KV.of(nullValue, rightValue))); - } - } - } - })) - .setCoder(KvCoder.of(((KvCoder) leftCollection.getCoder()).getKeyCoder(), - KvCoder.of(((KvCoder) leftCollection.getCoder()).getValueCoder(), - ((KvCoder) rightCollection.getCoder()).getValueCoder()))); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4059412/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/InnerJoinTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/InnerJoinTest.java b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/InnerJoinTest.java new file mode 100644 index 0000000..6622fdc --- /dev/null +++ b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/InnerJoinTest.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.joinlibrary; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; + +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +/** + * This test Inner Join functionality. + */ +public class InnerJoinTest { + + Pipeline p; + List<KV<String, Long>> leftListOfKv; + List<KV<String, String>> listRightOfKv; + List<KV<String, KV<Long, String>>> expectedResult; + + @Before + public void setup() { + + p = TestPipeline.create(); + leftListOfKv = new ArrayList<>(); + listRightOfKv = new ArrayList<>(); + + expectedResult = new ArrayList<>(); + } + + @Test + public void testJoinOneToOneMapping() { + leftListOfKv.add(KV.of("Key1", 5L)); + leftListOfKv.add(KV.of("Key2", 4L)); + PCollection<KV<String, Long>> leftCollection = + p.apply("CreateLeft", Create.of(leftListOfKv)); + + listRightOfKv.add(KV.of("Key1", "foo")); + listRightOfKv.add(KV.of("Key2", "bar")); + PCollection<KV<String, String>> rightCollection = + p.apply("CreateRight", Create.of(listRightOfKv)); + + PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin( + leftCollection, rightCollection); + + expectedResult.add(KV.of("Key1", KV.of(5L, "foo"))); + expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); + PAssert.that(output).containsInAnyOrder(expectedResult); + + p.run(); + } + + @Test + public void testJoinOneToManyMapping() { + leftListOfKv.add(KV.of("Key2", 4L)); + PCollection<KV<String, Long>> leftCollection = p + .apply("CreateLeft", Create.of(leftListOfKv)); + + listRightOfKv.add(KV.of("Key2", "bar")); + listRightOfKv.add(KV.of("Key2", "gazonk")); + PCollection<KV<String, String>> rightCollection = p + .apply("CreateRight", Create.of(listRightOfKv)); + + PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin( + leftCollection, rightCollection); + + expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); + expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk"))); + PAssert.that(output).containsInAnyOrder(expectedResult); + + p.run(); + } + + @Test + public void testJoinManyToOneMapping() { + leftListOfKv.add(KV.of("Key2", 4L)); + leftListOfKv.add(KV.of("Key2", 6L)); + PCollection<KV<String, Long>> leftCollection = p + .apply("CreateLeft", Create.of(leftListOfKv)); + + listRightOfKv.add(KV.of("Key2", "bar")); + PCollection<KV<String, String>> rightCollection = p + .apply("CreateRight", Create.of(listRightOfKv)); + + PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin( + leftCollection, rightCollection); + + expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); + expectedResult.add(KV.of("Key2", KV.of(6L, "bar"))); + PAssert.that(output).containsInAnyOrder(expectedResult); + + p.run(); + } + + @Test + public void testJoinNoneToNoneMapping() { + leftListOfKv.add(KV.of("Key2", 4L)); + PCollection<KV<String, Long>> leftCollection = p + .apply("CreateLeft", Create.of(leftListOfKv)); + + listRightOfKv.add(KV.of("Key3", "bar")); + PCollection<KV<String, String>> rightCollection = p + .apply("CreateRight", Create.of(listRightOfKv)); + + PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin( + leftCollection, rightCollection); + + PAssert.that(output).containsInAnyOrder(expectedResult); + p.run(); + } + + @Test(expected = NullPointerException.class) + public void testJoinLeftCollectionNull() { + Join.innerJoin(null, p.apply(Create.of(listRightOfKv))); + } + + @Test(expected = NullPointerException.class) + public void testJoinRightCollectionNull() { + Join.innerJoin(p.apply(Create.of(leftListOfKv)), null); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4059412/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterLeftJoinTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterLeftJoinTest.java b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterLeftJoinTest.java new file mode 100644 index 0000000..91b0740 --- /dev/null +++ b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterLeftJoinTest.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.joinlibrary; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; + +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + + +/** + * This test Outer Left Join functionality. + */ +public class OuterLeftJoinTest { + + Pipeline p; + List<KV<String, Long>> leftListOfKv; + List<KV<String, String>> listRightOfKv; + List<KV<String, KV<Long, String>>> expectedResult; + + @Before + public void setup() { + + p = TestPipeline.create(); + leftListOfKv = new ArrayList<>(); + listRightOfKv = new ArrayList<>(); + + expectedResult = new ArrayList<>(); + } + + @Test + public void testJoinOneToOneMapping() { + leftListOfKv.add(KV.of("Key1", 5L)); + leftListOfKv.add(KV.of("Key2", 4L)); + PCollection<KV<String, Long>> leftCollection = p + .apply("CreateLeft", Create.of(leftListOfKv)); + + listRightOfKv.add(KV.of("Key1", "foo")); + listRightOfKv.add(KV.of("Key2", "bar")); + PCollection<KV<String, String>> rightCollection = p + .apply("CreateRight", Create.of(listRightOfKv)); + + PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin( + leftCollection, rightCollection, ""); + + expectedResult.add(KV.of("Key1", KV.of(5L, "foo"))); + expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); + PAssert.that(output).containsInAnyOrder(expectedResult); + + p.run(); + } + + @Test + public void testJoinOneToManyMapping() { + leftListOfKv.add(KV.of("Key2", 4L)); + PCollection<KV<String, Long>> leftCollection = p + .apply("CreateLeft", Create.of(leftListOfKv)); + + listRightOfKv.add(KV.of("Key2", "bar")); + listRightOfKv.add(KV.of("Key2", "gazonk")); + PCollection<KV<String, String>> rightCollection = p + .apply("CreateRight", Create.of(listRightOfKv)); + + PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin( + leftCollection, rightCollection, ""); + + expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); + expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk"))); + PAssert.that(output).containsInAnyOrder(expectedResult); + + p.run(); + } + + @Test + public void testJoinManyToOneMapping() { + leftListOfKv.add(KV.of("Key2", 4L)); + leftListOfKv.add(KV.of("Key2", 6L)); + PCollection<KV<String, Long>> leftCollection = p + .apply("CreateLeft", Create.of(leftListOfKv)); + + listRightOfKv.add(KV.of("Key2", "bar")); + PCollection<KV<String, String>> rightCollection = p + .apply("CreateRight", Create.of(listRightOfKv)); + + PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin( + leftCollection, rightCollection, ""); + + expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); + expectedResult.add(KV.of("Key2", KV.of(6L, "bar"))); + PAssert.that(output).containsInAnyOrder(expectedResult); + + p.run(); + } + + @Test + public void testJoinOneToNoneMapping() { + leftListOfKv.add(KV.of("Key2", 4L)); + PCollection<KV<String, Long>> leftCollection = p + .apply("CreateLeft", Create.of(leftListOfKv)); + + listRightOfKv.add(KV.of("Key3", "bar")); + PCollection<KV<String, String>> rightCollection = p + .apply("CreateRight", Create.of(listRightOfKv)); + + PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin( + leftCollection, rightCollection, ""); + + expectedResult.add(KV.of("Key2", KV.of(4L, ""))); + PAssert.that(output).containsInAnyOrder(expectedResult); + p.run(); + } + + @Test(expected = NullPointerException.class) + public void testJoinLeftCollectionNull() { + Join.leftOuterJoin(null, p.apply(Create.of(listRightOfKv)), ""); + } + + @Test(expected = NullPointerException.class) + public void testJoinRightCollectionNull() { + Join.leftOuterJoin(p.apply(Create.of(leftListOfKv)), null, ""); + } + + @Test(expected = NullPointerException.class) + public void testJoinNullValueIsNull() { + Join.leftOuterJoin( + p.apply("CreateLeft", Create.of(leftListOfKv)), + p.apply("CreateRight", Create.of(listRightOfKv)), + null); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4059412/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterRightJoinTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterRightJoinTest.java b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterRightJoinTest.java new file mode 100644 index 0000000..7977df7 --- /dev/null +++ b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterRightJoinTest.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.joinlibrary; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; + +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + + +/** + * This test Outer Right Join functionality. + */ +public class OuterRightJoinTest { + + Pipeline p; + List<KV<String, Long>> leftListOfKv; + List<KV<String, String>> listRightOfKv; + List<KV<String, KV<Long, String>>> expectedResult; + + @Before + public void setup() { + + p = TestPipeline.create(); + leftListOfKv = new ArrayList<>(); + listRightOfKv = new ArrayList<>(); + + expectedResult = new ArrayList<>(); + } + + @Test + public void testJoinOneToOneMapping() { + leftListOfKv.add(KV.of("Key1", 5L)); + leftListOfKv.add(KV.of("Key2", 4L)); + PCollection<KV<String, Long>> leftCollection = p + .apply("CreateLeft", Create.of(leftListOfKv)); + + listRightOfKv.add(KV.of("Key1", "foo")); + listRightOfKv.add(KV.of("Key2", "bar")); + PCollection<KV<String, String>> rightCollection = p + .apply("CreateRight", Create.of(listRightOfKv)); + + PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin( + leftCollection, rightCollection, -1L); + + expectedResult.add(KV.of("Key1", KV.of(5L, "foo"))); + expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); + PAssert.that(output).containsInAnyOrder(expectedResult); + + p.run(); + } + + @Test + public void testJoinOneToManyMapping() { + leftListOfKv.add(KV.of("Key2", 4L)); + PCollection<KV<String, Long>> leftCollection = p + .apply("CreateLeft", Create.of(leftListOfKv)); + + listRightOfKv.add(KV.of("Key2", "bar")); + listRightOfKv.add(KV.of("Key2", "gazonk")); + PCollection<KV<String, String>> rightCollection = p + .apply("CreateRight", Create.of(listRightOfKv)); + + PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin( + leftCollection, rightCollection, -1L); + + expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); + expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk"))); + PAssert.that(output).containsInAnyOrder(expectedResult); + + p.run(); + } + + @Test + public void testJoinManyToOneMapping() { + leftListOfKv.add(KV.of("Key2", 4L)); + leftListOfKv.add(KV.of("Key2", 6L)); + PCollection<KV<String, Long>> leftCollection = p + .apply("CreateLeft", Create.of(leftListOfKv)); + + listRightOfKv.add(KV.of("Key2", "bar")); + PCollection<KV<String, String>> rightCollection = p + .apply("CreateRight", Create.of(listRightOfKv)); + + PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin( + leftCollection, rightCollection, -1L); + + expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); + expectedResult.add(KV.of("Key2", KV.of(6L, "bar"))); + PAssert.that(output).containsInAnyOrder(expectedResult); + + p.run(); + } + + @Test + public void testJoinNoneToOneMapping() { + leftListOfKv.add(KV.of("Key2", 4L)); + PCollection<KV<String, Long>> leftCollection = p + .apply("CreateLeft", Create.of(leftListOfKv)); + + listRightOfKv.add(KV.of("Key3", "bar")); + PCollection<KV<String, String>> rightCollection = p + .apply("CreateRight", Create.of(listRightOfKv)); + + PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin( + leftCollection, rightCollection, -1L); + + expectedResult.add(KV.of("Key3", KV.of(-1L, "bar"))); + PAssert.that(output).containsInAnyOrder(expectedResult); + p.run(); + } + + @Test(expected = NullPointerException.class) + public void testJoinLeftCollectionNull() { + Join.rightOuterJoin(null, p.apply(Create.of(listRightOfKv)), ""); + } + + @Test(expected = NullPointerException.class) + public void testJoinRightCollectionNull() { + Join.rightOuterJoin(p.apply(Create.of(leftListOfKv)), null, -1L); + } + + @Test(expected = NullPointerException.class) + public void testJoinNullValueIsNull() { + Join.rightOuterJoin( + p.apply("CreateLeft", Create.of(leftListOfKv)), + p.apply("CreateRight", Create.of(listRightOfKv)), + null); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4059412/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/InnerJoinTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/InnerJoinTest.java b/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/InnerJoinTest.java deleted file mode 100644 index 99e9c4b..0000000 --- a/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/InnerJoinTest.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.contrib.joinlibrary; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; - -import org.junit.Before; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.List; - -/** - * This test Inner Join functionality. - */ -public class InnerJoinTest { - - Pipeline p; - List<KV<String, Long>> leftListOfKv; - List<KV<String, String>> listRightOfKv; - List<KV<String, KV<Long, String>>> expectedResult; - - @Before - public void setup() { - - p = TestPipeline.create(); - leftListOfKv = new ArrayList<>(); - listRightOfKv = new ArrayList<>(); - - expectedResult = new ArrayList<>(); - } - - @Test - public void testJoinOneToOneMapping() { - leftListOfKv.add(KV.of("Key1", 5L)); - leftListOfKv.add(KV.of("Key2", 4L)); - PCollection<KV<String, Long>> leftCollection = - p.apply("CreateLeft", Create.of(leftListOfKv)); - - listRightOfKv.add(KV.of("Key1", "foo")); - listRightOfKv.add(KV.of("Key2", "bar")); - PCollection<KV<String, String>> rightCollection = - p.apply("CreateRight", Create.of(listRightOfKv)); - - PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin( - leftCollection, rightCollection); - - expectedResult.add(KV.of("Key1", KV.of(5L, "foo"))); - expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); - PAssert.that(output).containsInAnyOrder(expectedResult); - - p.run(); - } - - @Test - public void testJoinOneToManyMapping() { - leftListOfKv.add(KV.of("Key2", 4L)); - PCollection<KV<String, Long>> leftCollection = p - .apply("CreateLeft", Create.of(leftListOfKv)); - - listRightOfKv.add(KV.of("Key2", "bar")); - listRightOfKv.add(KV.of("Key2", "gazonk")); - PCollection<KV<String, String>> rightCollection = p - .apply("CreateRight", Create.of(listRightOfKv)); - - PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin( - leftCollection, rightCollection); - - expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); - expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk"))); - PAssert.that(output).containsInAnyOrder(expectedResult); - - p.run(); - } - - @Test - public void testJoinManyToOneMapping() { - leftListOfKv.add(KV.of("Key2", 4L)); - leftListOfKv.add(KV.of("Key2", 6L)); - PCollection<KV<String, Long>> leftCollection = p - .apply("CreateLeft", Create.of(leftListOfKv)); - - listRightOfKv.add(KV.of("Key2", "bar")); - PCollection<KV<String, String>> rightCollection = p - .apply("CreateRight", Create.of(listRightOfKv)); - - PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin( - leftCollection, rightCollection); - - expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); - expectedResult.add(KV.of("Key2", KV.of(6L, "bar"))); - PAssert.that(output).containsInAnyOrder(expectedResult); - - p.run(); - } - - @Test - public void testJoinNoneToNoneMapping() { - leftListOfKv.add(KV.of("Key2", 4L)); - PCollection<KV<String, Long>> leftCollection = p - .apply("CreateLeft", Create.of(leftListOfKv)); - - listRightOfKv.add(KV.of("Key3", "bar")); - PCollection<KV<String, String>> rightCollection = p - .apply("CreateRight", Create.of(listRightOfKv)); - - PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin( - leftCollection, rightCollection); - - PAssert.that(output).containsInAnyOrder(expectedResult); - p.run(); - } - - @Test(expected = NullPointerException.class) - public void testJoinLeftCollectionNull() { - Join.innerJoin(null, p.apply(Create.of(listRightOfKv))); - } - - @Test(expected = NullPointerException.class) - public void testJoinRightCollectionNull() { - Join.innerJoin(p.apply(Create.of(leftListOfKv)), null); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4059412/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterLeftJoinTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterLeftJoinTest.java b/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterLeftJoinTest.java deleted file mode 100644 index ca09136..0000000 --- a/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterLeftJoinTest.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.contrib.joinlibrary; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; - -import org.junit.Before; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.List; - - -/** - * This test Outer Left Join functionality. - */ -public class OuterLeftJoinTest { - - Pipeline p; - List<KV<String, Long>> leftListOfKv; - List<KV<String, String>> listRightOfKv; - List<KV<String, KV<Long, String>>> expectedResult; - - @Before - public void setup() { - - p = TestPipeline.create(); - leftListOfKv = new ArrayList<>(); - listRightOfKv = new ArrayList<>(); - - expectedResult = new ArrayList<>(); - } - - @Test - public void testJoinOneToOneMapping() { - leftListOfKv.add(KV.of("Key1", 5L)); - leftListOfKv.add(KV.of("Key2", 4L)); - PCollection<KV<String, Long>> leftCollection = p - .apply("CreateLeft", Create.of(leftListOfKv)); - - listRightOfKv.add(KV.of("Key1", "foo")); - listRightOfKv.add(KV.of("Key2", "bar")); - PCollection<KV<String, String>> rightCollection = p - .apply("CreateRight", Create.of(listRightOfKv)); - - PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin( - leftCollection, rightCollection, ""); - - expectedResult.add(KV.of("Key1", KV.of(5L, "foo"))); - expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); - PAssert.that(output).containsInAnyOrder(expectedResult); - - p.run(); - } - - @Test - public void testJoinOneToManyMapping() { - leftListOfKv.add(KV.of("Key2", 4L)); - PCollection<KV<String, Long>> leftCollection = p - .apply("CreateLeft", Create.of(leftListOfKv)); - - listRightOfKv.add(KV.of("Key2", "bar")); - listRightOfKv.add(KV.of("Key2", "gazonk")); - PCollection<KV<String, String>> rightCollection = p - .apply("CreateRight", Create.of(listRightOfKv)); - - PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin( - leftCollection, rightCollection, ""); - - expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); - expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk"))); - PAssert.that(output).containsInAnyOrder(expectedResult); - - p.run(); - } - - @Test - public void testJoinManyToOneMapping() { - leftListOfKv.add(KV.of("Key2", 4L)); - leftListOfKv.add(KV.of("Key2", 6L)); - PCollection<KV<String, Long>> leftCollection = p - .apply("CreateLeft", Create.of(leftListOfKv)); - - listRightOfKv.add(KV.of("Key2", "bar")); - PCollection<KV<String, String>> rightCollection = p - .apply("CreateRight", Create.of(listRightOfKv)); - - PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin( - leftCollection, rightCollection, ""); - - expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); - expectedResult.add(KV.of("Key2", KV.of(6L, "bar"))); - PAssert.that(output).containsInAnyOrder(expectedResult); - - p.run(); - } - - @Test - public void testJoinOneToNoneMapping() { - leftListOfKv.add(KV.of("Key2", 4L)); - PCollection<KV<String, Long>> leftCollection = p - .apply("CreateLeft", Create.of(leftListOfKv)); - - listRightOfKv.add(KV.of("Key3", "bar")); - PCollection<KV<String, String>> rightCollection = p - .apply("CreateRight", Create.of(listRightOfKv)); - - PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin( - leftCollection, rightCollection, ""); - - expectedResult.add(KV.of("Key2", KV.of(4L, ""))); - PAssert.that(output).containsInAnyOrder(expectedResult); - p.run(); - } - - @Test(expected = NullPointerException.class) - public void testJoinLeftCollectionNull() { - Join.leftOuterJoin(null, p.apply(Create.of(listRightOfKv)), ""); - } - - @Test(expected = NullPointerException.class) - public void testJoinRightCollectionNull() { - Join.leftOuterJoin(p.apply(Create.of(leftListOfKv)), null, ""); - } - - @Test(expected = NullPointerException.class) - public void testJoinNullValueIsNull() { - Join.leftOuterJoin( - p.apply("CreateLeft", Create.of(leftListOfKv)), - p.apply("CreateRight", Create.of(listRightOfKv)), - null); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4059412/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterRightJoinTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterRightJoinTest.java b/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterRightJoinTest.java deleted file mode 100644 index 86028ac..0000000 --- a/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterRightJoinTest.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.contrib.joinlibrary; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; - -import org.junit.Before; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.List; - - -/** - * This test Outer Right Join functionality. - */ -public class OuterRightJoinTest { - - Pipeline p; - List<KV<String, Long>> leftListOfKv; - List<KV<String, String>> listRightOfKv; - List<KV<String, KV<Long, String>>> expectedResult; - - @Before - public void setup() { - - p = TestPipeline.create(); - leftListOfKv = new ArrayList<>(); - listRightOfKv = new ArrayList<>(); - - expectedResult = new ArrayList<>(); - } - - @Test - public void testJoinOneToOneMapping() { - leftListOfKv.add(KV.of("Key1", 5L)); - leftListOfKv.add(KV.of("Key2", 4L)); - PCollection<KV<String, Long>> leftCollection = p - .apply("CreateLeft", Create.of(leftListOfKv)); - - listRightOfKv.add(KV.of("Key1", "foo")); - listRightOfKv.add(KV.of("Key2", "bar")); - PCollection<KV<String, String>> rightCollection = p - .apply("CreateRight", Create.of(listRightOfKv)); - - PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin( - leftCollection, rightCollection, -1L); - - expectedResult.add(KV.of("Key1", KV.of(5L, "foo"))); - expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); - PAssert.that(output).containsInAnyOrder(expectedResult); - - p.run(); - } - - @Test - public void testJoinOneToManyMapping() { - leftListOfKv.add(KV.of("Key2", 4L)); - PCollection<KV<String, Long>> leftCollection = p - .apply("CreateLeft", Create.of(leftListOfKv)); - - listRightOfKv.add(KV.of("Key2", "bar")); - listRightOfKv.add(KV.of("Key2", "gazonk")); - PCollection<KV<String, String>> rightCollection = p - .apply("CreateRight", Create.of(listRightOfKv)); - - PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin( - leftCollection, rightCollection, -1L); - - expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); - expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk"))); - PAssert.that(output).containsInAnyOrder(expectedResult); - - p.run(); - } - - @Test - public void testJoinManyToOneMapping() { - leftListOfKv.add(KV.of("Key2", 4L)); - leftListOfKv.add(KV.of("Key2", 6L)); - PCollection<KV<String, Long>> leftCollection = p - .apply("CreateLeft", Create.of(leftListOfKv)); - - listRightOfKv.add(KV.of("Key2", "bar")); - PCollection<KV<String, String>> rightCollection = p - .apply("CreateRight", Create.of(listRightOfKv)); - - PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin( - leftCollection, rightCollection, -1L); - - expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); - expectedResult.add(KV.of("Key2", KV.of(6L, "bar"))); - PAssert.that(output).containsInAnyOrder(expectedResult); - - p.run(); - } - - @Test - public void testJoinNoneToOneMapping() { - leftListOfKv.add(KV.of("Key2", 4L)); - PCollection<KV<String, Long>> leftCollection = p - .apply("CreateLeft", Create.of(leftListOfKv)); - - listRightOfKv.add(KV.of("Key3", "bar")); - PCollection<KV<String, String>> rightCollection = p - .apply("CreateRight", Create.of(listRightOfKv)); - - PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin( - leftCollection, rightCollection, -1L); - - expectedResult.add(KV.of("Key3", KV.of(-1L, "bar"))); - PAssert.that(output).containsInAnyOrder(expectedResult); - p.run(); - } - - @Test(expected = NullPointerException.class) - public void testJoinLeftCollectionNull() { - Join.rightOuterJoin(null, p.apply(Create.of(listRightOfKv)), ""); - } - - @Test(expected = NullPointerException.class) - public void testJoinRightCollectionNull() { - Join.rightOuterJoin(p.apply(Create.of(leftListOfKv)), null, -1L); - } - - @Test(expected = NullPointerException.class) - public void testJoinNullValueIsNull() { - Join.rightOuterJoin( - p.apply("CreateLeft", Create.of(leftListOfKv)), - p.apply("CreateRight", Create.of(listRightOfKv)), - null); - } -}
