Move contrib/join-library to sdks/java/extensions.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e2ca8890 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e2ca8890 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e2ca8890 Branch: refs/heads/master Commit: e2ca88908097e58c4a4e022a86de7a23d9412850 Parents: d5814a3 Author: Davor Bonaci <[email protected]> Authored: Mon Apr 25 16:42:31 2016 -0700 Committer: bchambers <[email protected]> Committed: Mon Apr 25 17:32:23 2016 -0700 ---------------------------------------------------------------------- contrib/README.md | 53 ------ contrib/hadoop/AUTHORS.md | 7 - contrib/join-library/AUTHORS.md | 6 - contrib/join-library/README.md | 33 ---- contrib/join-library/pom.xml | 185 ------------------ .../org/apache/contrib/joinlibrary/Join.java | 186 ------------------- .../contrib/joinlibrary/InnerJoinTest.java | 143 -------------- .../contrib/joinlibrary/OuterLeftJoinTest.java | 153 --------------- .../contrib/joinlibrary/OuterRightJoinTest.java | 153 --------------- sdks/java/extensions/join-library/README.md | 33 ++++ sdks/java/extensions/join-library/pom.xml | 111 +++++++++++ .../org/apache/contrib/joinlibrary/Join.java | 186 +++++++++++++++++++ .../contrib/joinlibrary/InnerJoinTest.java | 143 ++++++++++++++ .../contrib/joinlibrary/OuterLeftJoinTest.java | 153 +++++++++++++++ .../contrib/joinlibrary/OuterRightJoinTest.java | 153 +++++++++++++++ sdks/java/extensions/pom.xml | 40 ++++ sdks/java/pom.xml | 1 + 17 files changed, 820 insertions(+), 919 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2ca8890/contrib/README.md ---------------------------------------------------------------------- diff --git a/contrib/README.md b/contrib/README.md deleted file mode 100644 index b99cf46..0000000 --- a/contrib/README.md +++ /dev/null @@ -1,53 +0,0 @@ -# Community contributions - -This directory hosts a wide variety of community contributions that may be -useful to other users of -[Google Cloud Dataflow](https://cloud.google.com/dataflow/), -but may not be appropriate or ready yet for inclusion into the -[mainline SDK](https://github.com/GoogleCloudPlatform/DataflowJavaSDK/) or a -separate Google-maintained artifact. - -## Organization - -Each subdirectory represents a logically separate and independent module. -Preferably, the code is hosted directly in this repository. When appropriate, we -are also open to linking external repositories via -[`submodule`](http://git-scm.com/docs/git-submodule/) functionality within Git. - -While we are happy to host individual modules to provide additional value to all -Cloud Dataflow users, the modules are _maintained solely by their respective -authors_. We will make sure that modules are related to Cloud Dataflow, that -they are distributed under the same license as the mainline SDK, and provide -some guidance to the authors to make the quality as high as possible. - -We __cannot__, however, provide _any_ guarantees about correctness, -compatibility, performance, support, test coverage, maintenance or future -availability of individual modules hosted here. - -## Process - -In general, we recommend to get in touch with us through the issue tracker -first. That way we can help out and possibly guide you. Coordinating up front -makes it much easier to avoid frustration later on. - -We welcome pull requests with a new module from everyone. Every module must be -related to Cloud Dataflow and must have an informative README.md file. We will -provide general guidance, but usually won't be reviewing the module in detail. -We reserve the right to refuse acceptance to any module, or remove it at any -time in the future. - -We also welcome improvements to an existing module from everyone. We'll often -wait for comments from the primary author of the module before merging a pull -request from a non-primary author. - -As the module matures, we may choose to pull it directly into the mainline SDK -or promote it to a Google-managed artifact. - -## Licensing - -We require all contributors to sign the Contributor License Agreement, exactly -as we require for any contributions to the mainline SDK. More information is -available in our [CONTRIBUTING.md](https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/CONTRIBUTING.md) -file. - -_Thank you for your contribution to the Cloud Dataflow community!_ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2ca8890/contrib/hadoop/AUTHORS.md ---------------------------------------------------------------------- diff --git a/contrib/hadoop/AUTHORS.md b/contrib/hadoop/AUTHORS.md deleted file mode 100644 index 6effdb9..0000000 --- a/contrib/hadoop/AUTHORS.md +++ /dev/null @@ -1,7 +0,0 @@ -# Authors of 'hadoop' module - -The following is the official list of authors for copyright purposes of this community-contributed module. - - Cloudera - Tom White, tom [at] cloudera [dot] com - Google Inc. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2ca8890/contrib/join-library/AUTHORS.md ---------------------------------------------------------------------- diff --git a/contrib/join-library/AUTHORS.md b/contrib/join-library/AUTHORS.md deleted file mode 100644 index d32b6a7..0000000 --- a/contrib/join-library/AUTHORS.md +++ /dev/null @@ -1,6 +0,0 @@ -# Authors of join-library - -The following is the official list of authors for copyright purposes of this community-contributed module. - - Google Inc. - Magnus Runesson, M.Runesson [at] gmail [dot] com http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2ca8890/contrib/join-library/README.md ---------------------------------------------------------------------- diff --git a/contrib/join-library/README.md b/contrib/join-library/README.md deleted file mode 100644 index 8e2a011..0000000 --- a/contrib/join-library/README.md +++ /dev/null @@ -1,33 +0,0 @@ -Join-library -============ - -Join-library provides inner join, outer left and right join functions to -Google Cloud Dataflow. The aim is to simplify the most common cases of join to a -simple function call. - -The functions are generic so it supports join of any types supported by -Dataflow. Input to the join functions are PCollections of Key/Values. Both the -left and right PCollections need the same type for the key. All the join -functions return a Key/Value where Key is the join key and value is -a Key/Value where the key is the left value and right is the value. - -In the cases of outer join, since null cannot be serialized the user have -to provide a value that represent null for that particular use case. - -Example how to use join-library: - - PCollection<KV<String, String>> leftPcollection = ... - PCollection<KV<String, Long>> rightPcollection = ... - - PCollection<KV<String, KV<String, Long>>> joinedPcollection = - Join.innerJoin(leftPcollection, rightPcollection); - -Join-library can be found on maven-central: - - <dependency> - <groupId>org.linuxalert.dataflow</groupId> - <artifactId>google-cloud-dataflow-java-contrib-joinlibrary</artifactId> - <version>0.0.3</version> - </dependency> - -Questions or comments: `M.Runesson [at] gmail [dot] com` http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2ca8890/contrib/join-library/pom.xml ---------------------------------------------------------------------- diff --git a/contrib/join-library/pom.xml b/contrib/join-library/pom.xml deleted file mode 100644 index 090f445..0000000 --- a/contrib/join-library/pom.xml +++ /dev/null @@ -1,185 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <groupId>org.linuxalert.dataflow</groupId> - <artifactId>google-cloud-dataflow-java-contrib-joinlibrary</artifactId> - <name>Google Cloud Dataflow Join Library</name> - <description>Library with generic join functions for Dataflow.</description> - <url>https://github.com/GoogleCloudPlatform/DataflowJavaSDK/tree/master/contrib/join-library</url> - <developers> - <developer> - <organization>Google Inc.</organization> - <organizationUrl>http://www.google.com</organizationUrl> - </developer> - <developer> - <name>Magnus Runesson</name> - <email>M (dot) Runesson (at) gmail (dot) com</email> - <roles> - <role>Developer</role> - </roles> - <timezone>+1</timezone> - </developer> - </developers> - <contributors> - <contributor> - <name>Magnus Runesson</name> - <email>M (dot) Runesson (at) gmail (dot) com</email> - <url>https://github.com/mrunesson</url> - </contributor> - </contributors> - <version>0.0.4</version> - <packaging>jar</packaging> - - <licenses> - <license> - <name>Apache License, Version 2.0</name> - <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> - <distribution>repo</distribution> - </license> - </licenses> - - <scm> - <connection>scm:git:[email protected]:GoogleCloudPlatform/DataflowJavaSDK.git</connection> - <developerConnection>scm:git:[email protected]:GoogleCloudPlatform/DataflowJavaSDK.git</developerConnection> - <url>https://github.com/GoogleCloudPlatform/DataflowJavaSDK/tree/master/contrib/join-library</url> - </scm> - - <properties> - <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - <beam-version>[0.1.0, 1.0.0)</beam-version> - </properties> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-compiler-plugin</artifactId> - <version>3.2</version> - <configuration> - <source>1.7</source> - <target>1.7</target> - </configuration> - </plugin> - - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-checkstyle-plugin</artifactId> - <version>2.17</version> - <configuration> - <configLocation>../../sdks/java/checkstyle.xml</configLocation> - <consoleOutput>true</consoleOutput> - <failOnViolation>true</failOnViolation> - <includeTestSourceDirectory>true</includeTestSourceDirectory> - </configuration> - <executions> - <execution> - <phase>validate</phase> - <goals> - <goal>check</goal> - </goals> - </execution> - </executions> - </plugin> - - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-source-plugin</artifactId> - <version>2.4</version> - <executions> - <execution> - <id>attach-sources</id> - <goals> - <goal>jar</goal> - </goals> - </execution> - </executions> - </plugin> - - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-javadoc-plugin</artifactId> - <version>2.10.3</version> - <executions> - <execution> - <id>attach-javadocs</id> - <goals> - <goal>jar</goal> - </goals> - </execution> - </executions> - </plugin> - - <plugin> - <groupId>org.sonatype.plugins</groupId> - <artifactId>nexus-staging-maven-plugin</artifactId> - <version>1.6.3</version> - <extensions>true</extensions> - <configuration> - <serverId>ossrh</serverId> - <nexusUrl>https://oss.sonatype.org/</nexusUrl> - <autoReleaseAfterClose>true</autoReleaseAfterClose> - </configuration> - </plugin> - - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-gpg-plugin</artifactId> - <version>1.5</version> - <executions> - <execution> - <id>sign-artifacts</id> - <phase>verify</phase> - <goals> - <goal>sign</goal> - </goals> - </execution> - </executions> - </plugin> - </plugins> - </build> - - <distributionManagement> - <snapshotRepository> - <id>ossrh</id> - <url>https://oss.sonatype.org/content/repositories/snapshots</url> - </snapshotRepository> - <repository> - <id>ossrh</id> - <url>https://oss.sonatype.org/service/local/staging/deploy/maven2/</url> - </repository> - </distributionManagement> - - <dependencies> - <dependency> - <groupId>org.apache.beam</groupId> - <artifactId>java-sdk-all</artifactId> - <version>${beam-version}</version> - </dependency> - - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - <version>19.0</version> - </dependency> - - <!-- Dependency for tests --> - <dependency> - <groupId>org.hamcrest</groupId> - <artifactId>hamcrest-all</artifactId> - <version>1.3</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <version>4.12</version> - <scope>test</scope> - </dependency> - </dependencies> - -</project> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2ca8890/contrib/join-library/src/main/java/org/apache/contrib/joinlibrary/Join.java ---------------------------------------------------------------------- diff --git a/contrib/join-library/src/main/java/org/apache/contrib/joinlibrary/Join.java b/contrib/join-library/src/main/java/org/apache/contrib/joinlibrary/Join.java deleted file mode 100644 index 6421e97..0000000 --- a/contrib/join-library/src/main/java/org/apache/contrib/joinlibrary/Join.java +++ /dev/null @@ -1,186 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.contrib.joinlibrary; - -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.join.CoGbkResult; -import org.apache.beam.sdk.transforms.join.CoGroupByKey; -import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TupleTag; -import com.google.common.base.Preconditions; - -/** - * Utility class with different versions of joins. All methods join two collections of - * key/value pairs (KV). - */ -public class Join { - - /** - * Inner join of two collections of KV elements. - * @param leftCollection Left side collection to join. - * @param rightCollection Right side collection to join. - * @param <K> Type of the key for both collections - * @param <V1> Type of the values for the left collection. - * @param <V2> Type of the values for the right collection. - * @return A joined collection of KV where Key is the key and value is a - * KV where Key is of type V1 and Value is type V2. - */ - public static <K, V1, V2> PCollection<KV<K, KV<V1, V2>>> innerJoin( - final PCollection<KV<K, V1>> leftCollection, final PCollection<KV<K, V2>> rightCollection) { - Preconditions.checkNotNull(leftCollection); - Preconditions.checkNotNull(rightCollection); - - final TupleTag<V1> v1Tuple = new TupleTag<>(); - final TupleTag<V2> v2Tuple = new TupleTag<>(); - - PCollection<KV<K, CoGbkResult>> coGbkResultCollection = - KeyedPCollectionTuple.of(v1Tuple, leftCollection) - .and(v2Tuple, rightCollection) - .apply(CoGroupByKey.<K>create()); - - return coGbkResultCollection.apply(ParDo.of( - new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() { - @Override - public void processElement(ProcessContext c) { - KV<K, CoGbkResult> e = c.element(); - - Iterable<V1> leftValuesIterable = e.getValue().getAll(v1Tuple); - Iterable<V2> rightValuesIterable = e.getValue().getAll(v2Tuple); - - for (V1 leftValue : leftValuesIterable) { - for (V2 rightValue : rightValuesIterable) { - c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue))); - } - } - } - })) - .setCoder(KvCoder.of(((KvCoder) leftCollection.getCoder()).getKeyCoder(), - KvCoder.of(((KvCoder) leftCollection.getCoder()).getValueCoder(), - ((KvCoder) rightCollection.getCoder()).getValueCoder()))); - } - - /** - * Left Outer Join of two collections of KV elements. - * @param leftCollection Left side collection to join. - * @param rightCollection Right side collection to join. - * @param nullValue Value to use as null value when right side do not match left side. - * @param <K> Type of the key for both collections - * @param <V1> Type of the values for the left collection. - * @param <V2> Type of the values for the right collection. - * @return A joined collection of KV where Key is the key and value is a - * KV where Key is of type V1 and Value is type V2. Values that - * should be null or empty is replaced with nullValue. - */ - public static <K, V1, V2> PCollection<KV<K, KV<V1, V2>>> leftOuterJoin( - final PCollection<KV<K, V1>> leftCollection, - final PCollection<KV<K, V2>> rightCollection, - final V2 nullValue) { - Preconditions.checkNotNull(leftCollection); - Preconditions.checkNotNull(rightCollection); - Preconditions.checkNotNull(nullValue); - - final TupleTag<V1> v1Tuple = new TupleTag<>(); - final TupleTag<V2> v2Tuple = new TupleTag<>(); - - PCollection<KV<K, CoGbkResult>> coGbkResultCollection = - KeyedPCollectionTuple.of(v1Tuple, leftCollection) - .and(v2Tuple, rightCollection) - .apply(CoGroupByKey.<K>create()); - - return coGbkResultCollection.apply(ParDo.of( - new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() { - @Override - public void processElement(ProcessContext c) { - KV<K, CoGbkResult> e = c.element(); - - Iterable<V1> leftValuesIterable = e.getValue().getAll(v1Tuple); - Iterable<V2> rightValuesIterable = e.getValue().getAll(v2Tuple); - - for (V1 leftValue : leftValuesIterable) { - if (rightValuesIterable.iterator().hasNext()) { - for (V2 rightValue : rightValuesIterable) { - c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue))); - } - } else { - c.output(KV.of(e.getKey(), KV.of(leftValue, nullValue))); - } - } - } - })) - .setCoder(KvCoder.of(((KvCoder) leftCollection.getCoder()).getKeyCoder(), - KvCoder.of(((KvCoder) leftCollection.getCoder()).getValueCoder(), - ((KvCoder) rightCollection.getCoder()).getValueCoder()))); - } - - /** - * Right Outer Join of two collections of KV elements. - * @param leftCollection Left side collection to join. - * @param rightCollection Right side collection to join. - * @param nullValue Value to use as null value when left side do not match right side. - * @param <K> Type of the key for both collections - * @param <V1> Type of the values for the left collection. - * @param <V2> Type of the values for the right collection. - * @return A joined collection of KV where Key is the key and value is a - * KV where Key is of type V1 and Value is type V2. Keys that - * should be null or empty is replaced with nullValue. - */ - public static <K, V1, V2> PCollection<KV<K, KV<V1, V2>>> rightOuterJoin( - final PCollection<KV<K, V1>> leftCollection, - final PCollection<KV<K, V2>> rightCollection, - final V1 nullValue) { - Preconditions.checkNotNull(leftCollection); - Preconditions.checkNotNull(rightCollection); - Preconditions.checkNotNull(nullValue); - - final TupleTag<V1> v1Tuple = new TupleTag<>(); - final TupleTag<V2> v2Tuple = new TupleTag<>(); - - PCollection<KV<K, CoGbkResult>> coGbkResultCollection = - KeyedPCollectionTuple.of(v1Tuple, leftCollection) - .and(v2Tuple, rightCollection) - .apply(CoGroupByKey.<K>create()); - - return coGbkResultCollection.apply(ParDo.of( - new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() { - @Override - public void processElement(ProcessContext c) { - KV<K, CoGbkResult> e = c.element(); - - Iterable<V1> leftValuesIterable = e.getValue().getAll(v1Tuple); - Iterable<V2> rightValuesIterable = e.getValue().getAll(v2Tuple); - - for (V2 rightValue : rightValuesIterable) { - if (leftValuesIterable.iterator().hasNext()) { - for (V1 leftValue : leftValuesIterable) { - c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue))); - } - } else { - c.output(KV.of(e.getKey(), KV.of(nullValue, rightValue))); - } - } - } - })) - .setCoder(KvCoder.of(((KvCoder) leftCollection.getCoder()).getKeyCoder(), - KvCoder.of(((KvCoder) leftCollection.getCoder()).getValueCoder(), - ((KvCoder) rightCollection.getCoder()).getValueCoder()))); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2ca8890/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/InnerJoinTest.java ---------------------------------------------------------------------- diff --git a/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/InnerJoinTest.java b/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/InnerJoinTest.java deleted file mode 100644 index 99e9c4b..0000000 --- a/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/InnerJoinTest.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.contrib.joinlibrary; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; - -import org.junit.Before; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.List; - -/** - * This test Inner Join functionality. - */ -public class InnerJoinTest { - - Pipeline p; - List<KV<String, Long>> leftListOfKv; - List<KV<String, String>> listRightOfKv; - List<KV<String, KV<Long, String>>> expectedResult; - - @Before - public void setup() { - - p = TestPipeline.create(); - leftListOfKv = new ArrayList<>(); - listRightOfKv = new ArrayList<>(); - - expectedResult = new ArrayList<>(); - } - - @Test - public void testJoinOneToOneMapping() { - leftListOfKv.add(KV.of("Key1", 5L)); - leftListOfKv.add(KV.of("Key2", 4L)); - PCollection<KV<String, Long>> leftCollection = - p.apply("CreateLeft", Create.of(leftListOfKv)); - - listRightOfKv.add(KV.of("Key1", "foo")); - listRightOfKv.add(KV.of("Key2", "bar")); - PCollection<KV<String, String>> rightCollection = - p.apply("CreateRight", Create.of(listRightOfKv)); - - PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin( - leftCollection, rightCollection); - - expectedResult.add(KV.of("Key1", KV.of(5L, "foo"))); - expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); - PAssert.that(output).containsInAnyOrder(expectedResult); - - p.run(); - } - - @Test - public void testJoinOneToManyMapping() { - leftListOfKv.add(KV.of("Key2", 4L)); - PCollection<KV<String, Long>> leftCollection = p - .apply("CreateLeft", Create.of(leftListOfKv)); - - listRightOfKv.add(KV.of("Key2", "bar")); - listRightOfKv.add(KV.of("Key2", "gazonk")); - PCollection<KV<String, String>> rightCollection = p - .apply("CreateRight", Create.of(listRightOfKv)); - - PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin( - leftCollection, rightCollection); - - expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); - expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk"))); - PAssert.that(output).containsInAnyOrder(expectedResult); - - p.run(); - } - - @Test - public void testJoinManyToOneMapping() { - leftListOfKv.add(KV.of("Key2", 4L)); - leftListOfKv.add(KV.of("Key2", 6L)); - PCollection<KV<String, Long>> leftCollection = p - .apply("CreateLeft", Create.of(leftListOfKv)); - - listRightOfKv.add(KV.of("Key2", "bar")); - PCollection<KV<String, String>> rightCollection = p - .apply("CreateRight", Create.of(listRightOfKv)); - - PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin( - leftCollection, rightCollection); - - expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); - expectedResult.add(KV.of("Key2", KV.of(6L, "bar"))); - PAssert.that(output).containsInAnyOrder(expectedResult); - - p.run(); - } - - @Test - public void testJoinNoneToNoneMapping() { - leftListOfKv.add(KV.of("Key2", 4L)); - PCollection<KV<String, Long>> leftCollection = p - .apply("CreateLeft", Create.of(leftListOfKv)); - - listRightOfKv.add(KV.of("Key3", "bar")); - PCollection<KV<String, String>> rightCollection = p - .apply("CreateRight", Create.of(listRightOfKv)); - - PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin( - leftCollection, rightCollection); - - PAssert.that(output).containsInAnyOrder(expectedResult); - p.run(); - } - - @Test(expected = NullPointerException.class) - public void testJoinLeftCollectionNull() { - Join.innerJoin(null, p.apply(Create.of(listRightOfKv))); - } - - @Test(expected = NullPointerException.class) - public void testJoinRightCollectionNull() { - Join.innerJoin(p.apply(Create.of(leftListOfKv)), null); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2ca8890/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterLeftJoinTest.java ---------------------------------------------------------------------- diff --git a/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterLeftJoinTest.java b/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterLeftJoinTest.java deleted file mode 100644 index ca09136..0000000 --- a/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterLeftJoinTest.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.contrib.joinlibrary; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; - -import org.junit.Before; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.List; - - -/** - * This test Outer Left Join functionality. - */ -public class OuterLeftJoinTest { - - Pipeline p; - List<KV<String, Long>> leftListOfKv; - List<KV<String, String>> listRightOfKv; - List<KV<String, KV<Long, String>>> expectedResult; - - @Before - public void setup() { - - p = TestPipeline.create(); - leftListOfKv = new ArrayList<>(); - listRightOfKv = new ArrayList<>(); - - expectedResult = new ArrayList<>(); - } - - @Test - public void testJoinOneToOneMapping() { - leftListOfKv.add(KV.of("Key1", 5L)); - leftListOfKv.add(KV.of("Key2", 4L)); - PCollection<KV<String, Long>> leftCollection = p - .apply("CreateLeft", Create.of(leftListOfKv)); - - listRightOfKv.add(KV.of("Key1", "foo")); - listRightOfKv.add(KV.of("Key2", "bar")); - PCollection<KV<String, String>> rightCollection = p - .apply("CreateRight", Create.of(listRightOfKv)); - - PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin( - leftCollection, rightCollection, ""); - - expectedResult.add(KV.of("Key1", KV.of(5L, "foo"))); - expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); - PAssert.that(output).containsInAnyOrder(expectedResult); - - p.run(); - } - - @Test - public void testJoinOneToManyMapping() { - leftListOfKv.add(KV.of("Key2", 4L)); - PCollection<KV<String, Long>> leftCollection = p - .apply("CreateLeft", Create.of(leftListOfKv)); - - listRightOfKv.add(KV.of("Key2", "bar")); - listRightOfKv.add(KV.of("Key2", "gazonk")); - PCollection<KV<String, String>> rightCollection = p - .apply("CreateRight", Create.of(listRightOfKv)); - - PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin( - leftCollection, rightCollection, ""); - - expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); - expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk"))); - PAssert.that(output).containsInAnyOrder(expectedResult); - - p.run(); - } - - @Test - public void testJoinManyToOneMapping() { - leftListOfKv.add(KV.of("Key2", 4L)); - leftListOfKv.add(KV.of("Key2", 6L)); - PCollection<KV<String, Long>> leftCollection = p - .apply("CreateLeft", Create.of(leftListOfKv)); - - listRightOfKv.add(KV.of("Key2", "bar")); - PCollection<KV<String, String>> rightCollection = p - .apply("CreateRight", Create.of(listRightOfKv)); - - PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin( - leftCollection, rightCollection, ""); - - expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); - expectedResult.add(KV.of("Key2", KV.of(6L, "bar"))); - PAssert.that(output).containsInAnyOrder(expectedResult); - - p.run(); - } - - @Test - public void testJoinOneToNoneMapping() { - leftListOfKv.add(KV.of("Key2", 4L)); - PCollection<KV<String, Long>> leftCollection = p - .apply("CreateLeft", Create.of(leftListOfKv)); - - listRightOfKv.add(KV.of("Key3", "bar")); - PCollection<KV<String, String>> rightCollection = p - .apply("CreateRight", Create.of(listRightOfKv)); - - PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin( - leftCollection, rightCollection, ""); - - expectedResult.add(KV.of("Key2", KV.of(4L, ""))); - PAssert.that(output).containsInAnyOrder(expectedResult); - p.run(); - } - - @Test(expected = NullPointerException.class) - public void testJoinLeftCollectionNull() { - Join.leftOuterJoin(null, p.apply(Create.of(listRightOfKv)), ""); - } - - @Test(expected = NullPointerException.class) - public void testJoinRightCollectionNull() { - Join.leftOuterJoin(p.apply(Create.of(leftListOfKv)), null, ""); - } - - @Test(expected = NullPointerException.class) - public void testJoinNullValueIsNull() { - Join.leftOuterJoin( - p.apply("CreateLeft", Create.of(leftListOfKv)), - p.apply("CreateRight", Create.of(listRightOfKv)), - null); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2ca8890/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterRightJoinTest.java ---------------------------------------------------------------------- diff --git a/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterRightJoinTest.java b/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterRightJoinTest.java deleted file mode 100644 index 86028ac..0000000 --- a/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterRightJoinTest.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.contrib.joinlibrary; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; - -import org.junit.Before; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.List; - - -/** - * This test Outer Right Join functionality. - */ -public class OuterRightJoinTest { - - Pipeline p; - List<KV<String, Long>> leftListOfKv; - List<KV<String, String>> listRightOfKv; - List<KV<String, KV<Long, String>>> expectedResult; - - @Before - public void setup() { - - p = TestPipeline.create(); - leftListOfKv = new ArrayList<>(); - listRightOfKv = new ArrayList<>(); - - expectedResult = new ArrayList<>(); - } - - @Test - public void testJoinOneToOneMapping() { - leftListOfKv.add(KV.of("Key1", 5L)); - leftListOfKv.add(KV.of("Key2", 4L)); - PCollection<KV<String, Long>> leftCollection = p - .apply("CreateLeft", Create.of(leftListOfKv)); - - listRightOfKv.add(KV.of("Key1", "foo")); - listRightOfKv.add(KV.of("Key2", "bar")); - PCollection<KV<String, String>> rightCollection = p - .apply("CreateRight", Create.of(listRightOfKv)); - - PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin( - leftCollection, rightCollection, -1L); - - expectedResult.add(KV.of("Key1", KV.of(5L, "foo"))); - expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); - PAssert.that(output).containsInAnyOrder(expectedResult); - - p.run(); - } - - @Test - public void testJoinOneToManyMapping() { - leftListOfKv.add(KV.of("Key2", 4L)); - PCollection<KV<String, Long>> leftCollection = p - .apply("CreateLeft", Create.of(leftListOfKv)); - - listRightOfKv.add(KV.of("Key2", "bar")); - listRightOfKv.add(KV.of("Key2", "gazonk")); - PCollection<KV<String, String>> rightCollection = p - .apply("CreateRight", Create.of(listRightOfKv)); - - PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin( - leftCollection, rightCollection, -1L); - - expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); - expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk"))); - PAssert.that(output).containsInAnyOrder(expectedResult); - - p.run(); - } - - @Test - public void testJoinManyToOneMapping() { - leftListOfKv.add(KV.of("Key2", 4L)); - leftListOfKv.add(KV.of("Key2", 6L)); - PCollection<KV<String, Long>> leftCollection = p - .apply("CreateLeft", Create.of(leftListOfKv)); - - listRightOfKv.add(KV.of("Key2", "bar")); - PCollection<KV<String, String>> rightCollection = p - .apply("CreateRight", Create.of(listRightOfKv)); - - PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin( - leftCollection, rightCollection, -1L); - - expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); - expectedResult.add(KV.of("Key2", KV.of(6L, "bar"))); - PAssert.that(output).containsInAnyOrder(expectedResult); - - p.run(); - } - - @Test - public void testJoinNoneToOneMapping() { - leftListOfKv.add(KV.of("Key2", 4L)); - PCollection<KV<String, Long>> leftCollection = p - .apply("CreateLeft", Create.of(leftListOfKv)); - - listRightOfKv.add(KV.of("Key3", "bar")); - PCollection<KV<String, String>> rightCollection = p - .apply("CreateRight", Create.of(listRightOfKv)); - - PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin( - leftCollection, rightCollection, -1L); - - expectedResult.add(KV.of("Key3", KV.of(-1L, "bar"))); - PAssert.that(output).containsInAnyOrder(expectedResult); - p.run(); - } - - @Test(expected = NullPointerException.class) - public void testJoinLeftCollectionNull() { - Join.rightOuterJoin(null, p.apply(Create.of(listRightOfKv)), ""); - } - - @Test(expected = NullPointerException.class) - public void testJoinRightCollectionNull() { - Join.rightOuterJoin(p.apply(Create.of(leftListOfKv)), null, -1L); - } - - @Test(expected = NullPointerException.class) - public void testJoinNullValueIsNull() { - Join.rightOuterJoin( - p.apply("CreateLeft", Create.of(leftListOfKv)), - p.apply("CreateRight", Create.of(listRightOfKv)), - null); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2ca8890/sdks/java/extensions/join-library/README.md ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/join-library/README.md b/sdks/java/extensions/join-library/README.md new file mode 100644 index 0000000..8e2a011 --- /dev/null +++ b/sdks/java/extensions/join-library/README.md @@ -0,0 +1,33 @@ +Join-library +============ + +Join-library provides inner join, outer left and right join functions to +Google Cloud Dataflow. The aim is to simplify the most common cases of join to a +simple function call. + +The functions are generic so it supports join of any types supported by +Dataflow. Input to the join functions are PCollections of Key/Values. Both the +left and right PCollections need the same type for the key. All the join +functions return a Key/Value where Key is the join key and value is +a Key/Value where the key is the left value and right is the value. + +In the cases of outer join, since null cannot be serialized the user have +to provide a value that represent null for that particular use case. + +Example how to use join-library: + + PCollection<KV<String, String>> leftPcollection = ... + PCollection<KV<String, Long>> rightPcollection = ... + + PCollection<KV<String, KV<String, Long>>> joinedPcollection = + Join.innerJoin(leftPcollection, rightPcollection); + +Join-library can be found on maven-central: + + <dependency> + <groupId>org.linuxalert.dataflow</groupId> + <artifactId>google-cloud-dataflow-java-contrib-joinlibrary</artifactId> + <version>0.0.3</version> + </dependency> + +Questions or comments: `M.Runesson [at] gmail [dot] com` http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2ca8890/sdks/java/extensions/join-library/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/join-library/pom.xml b/sdks/java/extensions/join-library/pom.xml new file mode 100644 index 0000000..2765276 --- /dev/null +++ b/sdks/java/extensions/join-library/pom.xml @@ -0,0 +1,111 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.beam</groupId> + <artifactId>extensions-parent</artifactId> + <version>0.1.0-incubating-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>join-library</artifactId> + <name>Apache Beam :: SDKs :: Java :: Extensions :: Join library</name> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.2</version> + <configuration> + <source>1.7</source> + <target>1.7</target> + </configuration> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + <version>2.17</version> + <configuration> + <configLocation>../../checkstyle.xml</configLocation> + <consoleOutput>true</consoleOutput> + <failOnViolation>true</failOnViolation> + <includeTestSourceDirectory>true</includeTestSourceDirectory> + </configuration> + <executions> + <execution> + <phase>validate</phase> + <goals> + <goal>check</goal> + </goals> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-source-plugin</artifactId> + <version>2.4</version> + <executions> + <execution> + <id>attach-sources</id> + <goals> + <goal>jar</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>java-sdk-all</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>19.0</version> + </dependency> + + <!-- Dependency for tests --> + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + <version>1.3</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.12</version> + <scope>test</scope> + </dependency> + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2ca8890/sdks/java/extensions/join-library/src/main/java/org/apache/contrib/joinlibrary/Join.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/join-library/src/main/java/org/apache/contrib/joinlibrary/Join.java b/sdks/java/extensions/join-library/src/main/java/org/apache/contrib/joinlibrary/Join.java new file mode 100644 index 0000000..6421e97 --- /dev/null +++ b/sdks/java/extensions/join-library/src/main/java/org/apache/contrib/joinlibrary/Join.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.contrib.joinlibrary; + +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.join.CoGbkResult; +import org.apache.beam.sdk.transforms.join.CoGroupByKey; +import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TupleTag; +import com.google.common.base.Preconditions; + +/** + * Utility class with different versions of joins. All methods join two collections of + * key/value pairs (KV). + */ +public class Join { + + /** + * Inner join of two collections of KV elements. + * @param leftCollection Left side collection to join. + * @param rightCollection Right side collection to join. + * @param <K> Type of the key for both collections + * @param <V1> Type of the values for the left collection. + * @param <V2> Type of the values for the right collection. + * @return A joined collection of KV where Key is the key and value is a + * KV where Key is of type V1 and Value is type V2. + */ + public static <K, V1, V2> PCollection<KV<K, KV<V1, V2>>> innerJoin( + final PCollection<KV<K, V1>> leftCollection, final PCollection<KV<K, V2>> rightCollection) { + Preconditions.checkNotNull(leftCollection); + Preconditions.checkNotNull(rightCollection); + + final TupleTag<V1> v1Tuple = new TupleTag<>(); + final TupleTag<V2> v2Tuple = new TupleTag<>(); + + PCollection<KV<K, CoGbkResult>> coGbkResultCollection = + KeyedPCollectionTuple.of(v1Tuple, leftCollection) + .and(v2Tuple, rightCollection) + .apply(CoGroupByKey.<K>create()); + + return coGbkResultCollection.apply(ParDo.of( + new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() { + @Override + public void processElement(ProcessContext c) { + KV<K, CoGbkResult> e = c.element(); + + Iterable<V1> leftValuesIterable = e.getValue().getAll(v1Tuple); + Iterable<V2> rightValuesIterable = e.getValue().getAll(v2Tuple); + + for (V1 leftValue : leftValuesIterable) { + for (V2 rightValue : rightValuesIterable) { + c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue))); + } + } + } + })) + .setCoder(KvCoder.of(((KvCoder) leftCollection.getCoder()).getKeyCoder(), + KvCoder.of(((KvCoder) leftCollection.getCoder()).getValueCoder(), + ((KvCoder) rightCollection.getCoder()).getValueCoder()))); + } + + /** + * Left Outer Join of two collections of KV elements. + * @param leftCollection Left side collection to join. + * @param rightCollection Right side collection to join. + * @param nullValue Value to use as null value when right side do not match left side. + * @param <K> Type of the key for both collections + * @param <V1> Type of the values for the left collection. + * @param <V2> Type of the values for the right collection. + * @return A joined collection of KV where Key is the key and value is a + * KV where Key is of type V1 and Value is type V2. Values that + * should be null or empty is replaced with nullValue. + */ + public static <K, V1, V2> PCollection<KV<K, KV<V1, V2>>> leftOuterJoin( + final PCollection<KV<K, V1>> leftCollection, + final PCollection<KV<K, V2>> rightCollection, + final V2 nullValue) { + Preconditions.checkNotNull(leftCollection); + Preconditions.checkNotNull(rightCollection); + Preconditions.checkNotNull(nullValue); + + final TupleTag<V1> v1Tuple = new TupleTag<>(); + final TupleTag<V2> v2Tuple = new TupleTag<>(); + + PCollection<KV<K, CoGbkResult>> coGbkResultCollection = + KeyedPCollectionTuple.of(v1Tuple, leftCollection) + .and(v2Tuple, rightCollection) + .apply(CoGroupByKey.<K>create()); + + return coGbkResultCollection.apply(ParDo.of( + new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() { + @Override + public void processElement(ProcessContext c) { + KV<K, CoGbkResult> e = c.element(); + + Iterable<V1> leftValuesIterable = e.getValue().getAll(v1Tuple); + Iterable<V2> rightValuesIterable = e.getValue().getAll(v2Tuple); + + for (V1 leftValue : leftValuesIterable) { + if (rightValuesIterable.iterator().hasNext()) { + for (V2 rightValue : rightValuesIterable) { + c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue))); + } + } else { + c.output(KV.of(e.getKey(), KV.of(leftValue, nullValue))); + } + } + } + })) + .setCoder(KvCoder.of(((KvCoder) leftCollection.getCoder()).getKeyCoder(), + KvCoder.of(((KvCoder) leftCollection.getCoder()).getValueCoder(), + ((KvCoder) rightCollection.getCoder()).getValueCoder()))); + } + + /** + * Right Outer Join of two collections of KV elements. + * @param leftCollection Left side collection to join. + * @param rightCollection Right side collection to join. + * @param nullValue Value to use as null value when left side do not match right side. + * @param <K> Type of the key for both collections + * @param <V1> Type of the values for the left collection. + * @param <V2> Type of the values for the right collection. + * @return A joined collection of KV where Key is the key and value is a + * KV where Key is of type V1 and Value is type V2. Keys that + * should be null or empty is replaced with nullValue. + */ + public static <K, V1, V2> PCollection<KV<K, KV<V1, V2>>> rightOuterJoin( + final PCollection<KV<K, V1>> leftCollection, + final PCollection<KV<K, V2>> rightCollection, + final V1 nullValue) { + Preconditions.checkNotNull(leftCollection); + Preconditions.checkNotNull(rightCollection); + Preconditions.checkNotNull(nullValue); + + final TupleTag<V1> v1Tuple = new TupleTag<>(); + final TupleTag<V2> v2Tuple = new TupleTag<>(); + + PCollection<KV<K, CoGbkResult>> coGbkResultCollection = + KeyedPCollectionTuple.of(v1Tuple, leftCollection) + .and(v2Tuple, rightCollection) + .apply(CoGroupByKey.<K>create()); + + return coGbkResultCollection.apply(ParDo.of( + new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() { + @Override + public void processElement(ProcessContext c) { + KV<K, CoGbkResult> e = c.element(); + + Iterable<V1> leftValuesIterable = e.getValue().getAll(v1Tuple); + Iterable<V2> rightValuesIterable = e.getValue().getAll(v2Tuple); + + for (V2 rightValue : rightValuesIterable) { + if (leftValuesIterable.iterator().hasNext()) { + for (V1 leftValue : leftValuesIterable) { + c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue))); + } + } else { + c.output(KV.of(e.getKey(), KV.of(nullValue, rightValue))); + } + } + } + })) + .setCoder(KvCoder.of(((KvCoder) leftCollection.getCoder()).getKeyCoder(), + KvCoder.of(((KvCoder) leftCollection.getCoder()).getValueCoder(), + ((KvCoder) rightCollection.getCoder()).getValueCoder()))); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2ca8890/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/InnerJoinTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/InnerJoinTest.java b/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/InnerJoinTest.java new file mode 100644 index 0000000..99e9c4b --- /dev/null +++ b/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/InnerJoinTest.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.contrib.joinlibrary; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; + +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +/** + * This test Inner Join functionality. + */ +public class InnerJoinTest { + + Pipeline p; + List<KV<String, Long>> leftListOfKv; + List<KV<String, String>> listRightOfKv; + List<KV<String, KV<Long, String>>> expectedResult; + + @Before + public void setup() { + + p = TestPipeline.create(); + leftListOfKv = new ArrayList<>(); + listRightOfKv = new ArrayList<>(); + + expectedResult = new ArrayList<>(); + } + + @Test + public void testJoinOneToOneMapping() { + leftListOfKv.add(KV.of("Key1", 5L)); + leftListOfKv.add(KV.of("Key2", 4L)); + PCollection<KV<String, Long>> leftCollection = + p.apply("CreateLeft", Create.of(leftListOfKv)); + + listRightOfKv.add(KV.of("Key1", "foo")); + listRightOfKv.add(KV.of("Key2", "bar")); + PCollection<KV<String, String>> rightCollection = + p.apply("CreateRight", Create.of(listRightOfKv)); + + PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin( + leftCollection, rightCollection); + + expectedResult.add(KV.of("Key1", KV.of(5L, "foo"))); + expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); + PAssert.that(output).containsInAnyOrder(expectedResult); + + p.run(); + } + + @Test + public void testJoinOneToManyMapping() { + leftListOfKv.add(KV.of("Key2", 4L)); + PCollection<KV<String, Long>> leftCollection = p + .apply("CreateLeft", Create.of(leftListOfKv)); + + listRightOfKv.add(KV.of("Key2", "bar")); + listRightOfKv.add(KV.of("Key2", "gazonk")); + PCollection<KV<String, String>> rightCollection = p + .apply("CreateRight", Create.of(listRightOfKv)); + + PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin( + leftCollection, rightCollection); + + expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); + expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk"))); + PAssert.that(output).containsInAnyOrder(expectedResult); + + p.run(); + } + + @Test + public void testJoinManyToOneMapping() { + leftListOfKv.add(KV.of("Key2", 4L)); + leftListOfKv.add(KV.of("Key2", 6L)); + PCollection<KV<String, Long>> leftCollection = p + .apply("CreateLeft", Create.of(leftListOfKv)); + + listRightOfKv.add(KV.of("Key2", "bar")); + PCollection<KV<String, String>> rightCollection = p + .apply("CreateRight", Create.of(listRightOfKv)); + + PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin( + leftCollection, rightCollection); + + expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); + expectedResult.add(KV.of("Key2", KV.of(6L, "bar"))); + PAssert.that(output).containsInAnyOrder(expectedResult); + + p.run(); + } + + @Test + public void testJoinNoneToNoneMapping() { + leftListOfKv.add(KV.of("Key2", 4L)); + PCollection<KV<String, Long>> leftCollection = p + .apply("CreateLeft", Create.of(leftListOfKv)); + + listRightOfKv.add(KV.of("Key3", "bar")); + PCollection<KV<String, String>> rightCollection = p + .apply("CreateRight", Create.of(listRightOfKv)); + + PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin( + leftCollection, rightCollection); + + PAssert.that(output).containsInAnyOrder(expectedResult); + p.run(); + } + + @Test(expected = NullPointerException.class) + public void testJoinLeftCollectionNull() { + Join.innerJoin(null, p.apply(Create.of(listRightOfKv))); + } + + @Test(expected = NullPointerException.class) + public void testJoinRightCollectionNull() { + Join.innerJoin(p.apply(Create.of(leftListOfKv)), null); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2ca8890/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterLeftJoinTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterLeftJoinTest.java b/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterLeftJoinTest.java new file mode 100644 index 0000000..ca09136 --- /dev/null +++ b/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterLeftJoinTest.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.contrib.joinlibrary; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; + +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + + +/** + * This test Outer Left Join functionality. + */ +public class OuterLeftJoinTest { + + Pipeline p; + List<KV<String, Long>> leftListOfKv; + List<KV<String, String>> listRightOfKv; + List<KV<String, KV<Long, String>>> expectedResult; + + @Before + public void setup() { + + p = TestPipeline.create(); + leftListOfKv = new ArrayList<>(); + listRightOfKv = new ArrayList<>(); + + expectedResult = new ArrayList<>(); + } + + @Test + public void testJoinOneToOneMapping() { + leftListOfKv.add(KV.of("Key1", 5L)); + leftListOfKv.add(KV.of("Key2", 4L)); + PCollection<KV<String, Long>> leftCollection = p + .apply("CreateLeft", Create.of(leftListOfKv)); + + listRightOfKv.add(KV.of("Key1", "foo")); + listRightOfKv.add(KV.of("Key2", "bar")); + PCollection<KV<String, String>> rightCollection = p + .apply("CreateRight", Create.of(listRightOfKv)); + + PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin( + leftCollection, rightCollection, ""); + + expectedResult.add(KV.of("Key1", KV.of(5L, "foo"))); + expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); + PAssert.that(output).containsInAnyOrder(expectedResult); + + p.run(); + } + + @Test + public void testJoinOneToManyMapping() { + leftListOfKv.add(KV.of("Key2", 4L)); + PCollection<KV<String, Long>> leftCollection = p + .apply("CreateLeft", Create.of(leftListOfKv)); + + listRightOfKv.add(KV.of("Key2", "bar")); + listRightOfKv.add(KV.of("Key2", "gazonk")); + PCollection<KV<String, String>> rightCollection = p + .apply("CreateRight", Create.of(listRightOfKv)); + + PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin( + leftCollection, rightCollection, ""); + + expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); + expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk"))); + PAssert.that(output).containsInAnyOrder(expectedResult); + + p.run(); + } + + @Test + public void testJoinManyToOneMapping() { + leftListOfKv.add(KV.of("Key2", 4L)); + leftListOfKv.add(KV.of("Key2", 6L)); + PCollection<KV<String, Long>> leftCollection = p + .apply("CreateLeft", Create.of(leftListOfKv)); + + listRightOfKv.add(KV.of("Key2", "bar")); + PCollection<KV<String, String>> rightCollection = p + .apply("CreateRight", Create.of(listRightOfKv)); + + PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin( + leftCollection, rightCollection, ""); + + expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); + expectedResult.add(KV.of("Key2", KV.of(6L, "bar"))); + PAssert.that(output).containsInAnyOrder(expectedResult); + + p.run(); + } + + @Test + public void testJoinOneToNoneMapping() { + leftListOfKv.add(KV.of("Key2", 4L)); + PCollection<KV<String, Long>> leftCollection = p + .apply("CreateLeft", Create.of(leftListOfKv)); + + listRightOfKv.add(KV.of("Key3", "bar")); + PCollection<KV<String, String>> rightCollection = p + .apply("CreateRight", Create.of(listRightOfKv)); + + PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin( + leftCollection, rightCollection, ""); + + expectedResult.add(KV.of("Key2", KV.of(4L, ""))); + PAssert.that(output).containsInAnyOrder(expectedResult); + p.run(); + } + + @Test(expected = NullPointerException.class) + public void testJoinLeftCollectionNull() { + Join.leftOuterJoin(null, p.apply(Create.of(listRightOfKv)), ""); + } + + @Test(expected = NullPointerException.class) + public void testJoinRightCollectionNull() { + Join.leftOuterJoin(p.apply(Create.of(leftListOfKv)), null, ""); + } + + @Test(expected = NullPointerException.class) + public void testJoinNullValueIsNull() { + Join.leftOuterJoin( + p.apply("CreateLeft", Create.of(leftListOfKv)), + p.apply("CreateRight", Create.of(listRightOfKv)), + null); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2ca8890/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterRightJoinTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterRightJoinTest.java b/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterRightJoinTest.java new file mode 100644 index 0000000..86028ac --- /dev/null +++ b/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterRightJoinTest.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.contrib.joinlibrary; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; + +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + + +/** + * This test Outer Right Join functionality. + */ +public class OuterRightJoinTest { + + Pipeline p; + List<KV<String, Long>> leftListOfKv; + List<KV<String, String>> listRightOfKv; + List<KV<String, KV<Long, String>>> expectedResult; + + @Before + public void setup() { + + p = TestPipeline.create(); + leftListOfKv = new ArrayList<>(); + listRightOfKv = new ArrayList<>(); + + expectedResult = new ArrayList<>(); + } + + @Test + public void testJoinOneToOneMapping() { + leftListOfKv.add(KV.of("Key1", 5L)); + leftListOfKv.add(KV.of("Key2", 4L)); + PCollection<KV<String, Long>> leftCollection = p + .apply("CreateLeft", Create.of(leftListOfKv)); + + listRightOfKv.add(KV.of("Key1", "foo")); + listRightOfKv.add(KV.of("Key2", "bar")); + PCollection<KV<String, String>> rightCollection = p + .apply("CreateRight", Create.of(listRightOfKv)); + + PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin( + leftCollection, rightCollection, -1L); + + expectedResult.add(KV.of("Key1", KV.of(5L, "foo"))); + expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); + PAssert.that(output).containsInAnyOrder(expectedResult); + + p.run(); + } + + @Test + public void testJoinOneToManyMapping() { + leftListOfKv.add(KV.of("Key2", 4L)); + PCollection<KV<String, Long>> leftCollection = p + .apply("CreateLeft", Create.of(leftListOfKv)); + + listRightOfKv.add(KV.of("Key2", "bar")); + listRightOfKv.add(KV.of("Key2", "gazonk")); + PCollection<KV<String, String>> rightCollection = p + .apply("CreateRight", Create.of(listRightOfKv)); + + PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin( + leftCollection, rightCollection, -1L); + + expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); + expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk"))); + PAssert.that(output).containsInAnyOrder(expectedResult); + + p.run(); + } + + @Test + public void testJoinManyToOneMapping() { + leftListOfKv.add(KV.of("Key2", 4L)); + leftListOfKv.add(KV.of("Key2", 6L)); + PCollection<KV<String, Long>> leftCollection = p + .apply("CreateLeft", Create.of(leftListOfKv)); + + listRightOfKv.add(KV.of("Key2", "bar")); + PCollection<KV<String, String>> rightCollection = p + .apply("CreateRight", Create.of(listRightOfKv)); + + PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin( + leftCollection, rightCollection, -1L); + + expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); + expectedResult.add(KV.of("Key2", KV.of(6L, "bar"))); + PAssert.that(output).containsInAnyOrder(expectedResult); + + p.run(); + } + + @Test + public void testJoinNoneToOneMapping() { + leftListOfKv.add(KV.of("Key2", 4L)); + PCollection<KV<String, Long>> leftCollection = p + .apply("CreateLeft", Create.of(leftListOfKv)); + + listRightOfKv.add(KV.of("Key3", "bar")); + PCollection<KV<String, String>> rightCollection = p + .apply("CreateRight", Create.of(listRightOfKv)); + + PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin( + leftCollection, rightCollection, -1L); + + expectedResult.add(KV.of("Key3", KV.of(-1L, "bar"))); + PAssert.that(output).containsInAnyOrder(expectedResult); + p.run(); + } + + @Test(expected = NullPointerException.class) + public void testJoinLeftCollectionNull() { + Join.rightOuterJoin(null, p.apply(Create.of(listRightOfKv)), ""); + } + + @Test(expected = NullPointerException.class) + public void testJoinRightCollectionNull() { + Join.rightOuterJoin(p.apply(Create.of(leftListOfKv)), null, -1L); + } + + @Test(expected = NullPointerException.class) + public void testJoinNullValueIsNull() { + Join.rightOuterJoin( + p.apply("CreateLeft", Create.of(leftListOfKv)), + p.apply("CreateRight", Create.of(listRightOfKv)), + null); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2ca8890/sdks/java/extensions/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/pom.xml b/sdks/java/extensions/pom.xml new file mode 100644 index 0000000..180b8b7 --- /dev/null +++ b/sdks/java/extensions/pom.xml @@ -0,0 +1,40 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.beam</groupId> + <artifactId>java-sdk-parent</artifactId> + <version>0.1.0-incubating-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>extensions-parent</artifactId> + <packaging>pom</packaging> + + <name>Apache Beam :: SDKs :: Java :: Extensions</name> + + <modules> + <module>join-library</module> + </modules> + +</project> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2ca8890/sdks/java/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/pom.xml b/sdks/java/pom.xml index 6bd7ee7..e5da2d5 100644 --- a/sdks/java/pom.xml +++ b/sdks/java/pom.xml @@ -41,6 +41,7 @@ DataflowPipelineRunner. Until these are refactored out or a released artifact exists, we need to modify the build order. <module>maven-archetypes</module> --> + <module>extensions</module> </modules> <profiles>
