Repository: incubator-beam
Updated Branches:
  refs/heads/master d5814a38d -> 0f7b81615


Reorganize Java packages

Move org.apache.contrib.joinlibrary to 
org.apache.beam.sdk.extensions.joinlibrary.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e4059412
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e4059412
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e4059412

Branch: refs/heads/master
Commit: e4059412d111a223ee7cfda0004a62699b38f41f
Parents: e2ca889
Author: Davor Bonaci <[email protected]>
Authored: Mon Apr 25 16:45:59 2016 -0700
Committer: bchambers <[email protected]>
Committed: Mon Apr 25 17:32:23 2016 -0700

----------------------------------------------------------------------
 .../beam/sdk/extensions/joinlibrary/Join.java   | 186 +++++++++++++++++++
 .../org/apache/contrib/joinlibrary/Join.java    | 186 -------------------
 .../extensions/joinlibrary/InnerJoinTest.java   | 143 ++++++++++++++
 .../joinlibrary/OuterLeftJoinTest.java          | 153 +++++++++++++++
 .../joinlibrary/OuterRightJoinTest.java         | 153 +++++++++++++++
 .../contrib/joinlibrary/InnerJoinTest.java      | 143 --------------
 .../contrib/joinlibrary/OuterLeftJoinTest.java  | 153 ---------------
 .../contrib/joinlibrary/OuterRightJoinTest.java | 153 ---------------
 8 files changed, 635 insertions(+), 635 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4059412/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
 
b/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
new file mode 100644
index 0000000..968a613
--- /dev/null
+++ 
b/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.joinlibrary;
+
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.join.CoGbkResult;
+import org.apache.beam.sdk.transforms.join.CoGroupByKey;
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+import com.google.common.base.Preconditions;
+
+/**
+ * Utility class with different versions of joins. All methods join two 
collections of
+ * key/value pairs (KV).
+ */
+public class Join {
+
+  /**
+   * Inner join of two collections of KV elements.
+   * @param leftCollection Left side collection to join.
+   * @param rightCollection Right side collection to join.
+   * @param <K> Type of the key for both collections
+   * @param <V1> Type of the values for the left collection.
+   * @param <V2> Type of the values for the right collection.
+   * @return A joined collection of KV where Key is the key and value is a
+   *         KV where Key is of type V1 and Value is type V2.
+   */
+  public static <K, V1, V2> PCollection<KV<K, KV<V1, V2>>> innerJoin(
+    final PCollection<KV<K, V1>> leftCollection, final PCollection<KV<K, V2>> 
rightCollection) {
+    Preconditions.checkNotNull(leftCollection);
+    Preconditions.checkNotNull(rightCollection);
+
+    final TupleTag<V1> v1Tuple = new TupleTag<>();
+    final TupleTag<V2> v2Tuple = new TupleTag<>();
+
+    PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
+      KeyedPCollectionTuple.of(v1Tuple, leftCollection)
+        .and(v2Tuple, rightCollection)
+        .apply(CoGroupByKey.<K>create());
+
+    return coGbkResultCollection.apply(ParDo.of(
+      new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
+        @Override
+        public void processElement(ProcessContext c) {
+          KV<K, CoGbkResult> e = c.element();
+
+          Iterable<V1> leftValuesIterable = e.getValue().getAll(v1Tuple);
+          Iterable<V2> rightValuesIterable = e.getValue().getAll(v2Tuple);
+
+          for (V1 leftValue : leftValuesIterable) {
+            for (V2 rightValue : rightValuesIterable) {
+              c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue)));
+            }
+          }
+        }
+      }))
+      .setCoder(KvCoder.of(((KvCoder) leftCollection.getCoder()).getKeyCoder(),
+                           KvCoder.of(((KvCoder) 
leftCollection.getCoder()).getValueCoder(),
+                                      ((KvCoder) 
rightCollection.getCoder()).getValueCoder())));
+  }
+
+  /**
+   * Left Outer Join of two collections of KV elements.
+   * @param leftCollection Left side collection to join.
+   * @param rightCollection Right side collection to join.
+   * @param nullValue Value to use as null value when right side do not match 
left side.
+   * @param <K> Type of the key for both collections
+   * @param <V1> Type of the values for the left collection.
+   * @param <V2> Type of the values for the right collection.
+   * @return A joined collection of KV where Key is the key and value is a
+   *         KV where Key is of type V1 and Value is type V2. Values that
+   *         should be null or empty is replaced with nullValue.
+   */
+  public static <K, V1, V2> PCollection<KV<K, KV<V1, V2>>> leftOuterJoin(
+    final PCollection<KV<K, V1>> leftCollection,
+    final PCollection<KV<K, V2>> rightCollection,
+    final V2 nullValue) {
+    Preconditions.checkNotNull(leftCollection);
+    Preconditions.checkNotNull(rightCollection);
+    Preconditions.checkNotNull(nullValue);
+
+    final TupleTag<V1> v1Tuple = new TupleTag<>();
+    final TupleTag<V2> v2Tuple = new TupleTag<>();
+
+    PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
+      KeyedPCollectionTuple.of(v1Tuple, leftCollection)
+        .and(v2Tuple, rightCollection)
+        .apply(CoGroupByKey.<K>create());
+
+    return coGbkResultCollection.apply(ParDo.of(
+      new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
+        @Override
+        public void processElement(ProcessContext c) {
+          KV<K, CoGbkResult> e = c.element();
+
+          Iterable<V1> leftValuesIterable = e.getValue().getAll(v1Tuple);
+          Iterable<V2> rightValuesIterable = e.getValue().getAll(v2Tuple);
+
+          for (V1 leftValue : leftValuesIterable) {
+            if (rightValuesIterable.iterator().hasNext()) {
+              for (V2 rightValue : rightValuesIterable) {
+                c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue)));
+              }
+            } else {
+              c.output(KV.of(e.getKey(), KV.of(leftValue, nullValue)));
+            }
+          }
+        }
+      }))
+      .setCoder(KvCoder.of(((KvCoder) leftCollection.getCoder()).getKeyCoder(),
+                           KvCoder.of(((KvCoder) 
leftCollection.getCoder()).getValueCoder(),
+                                      ((KvCoder) 
rightCollection.getCoder()).getValueCoder())));
+  }
+
+  /**
+   * Right Outer Join of two collections of KV elements.
+   * @param leftCollection Left side collection to join.
+   * @param rightCollection Right side collection to join.
+   * @param nullValue Value to use as null value when left side do not match 
right side.
+   * @param <K> Type of the key for both collections
+   * @param <V1> Type of the values for the left collection.
+   * @param <V2> Type of the values for the right collection.
+   * @return A joined collection of KV where Key is the key and value is a
+   *         KV where Key is of type V1 and Value is type V2. Keys that
+   *         should be null or empty is replaced with nullValue.
+   */
+  public static <K, V1, V2> PCollection<KV<K, KV<V1, V2>>> rightOuterJoin(
+    final PCollection<KV<K, V1>> leftCollection,
+    final PCollection<KV<K, V2>> rightCollection,
+    final V1 nullValue) {
+    Preconditions.checkNotNull(leftCollection);
+    Preconditions.checkNotNull(rightCollection);
+    Preconditions.checkNotNull(nullValue);
+
+    final TupleTag<V1> v1Tuple = new TupleTag<>();
+    final TupleTag<V2> v2Tuple = new TupleTag<>();
+
+    PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
+      KeyedPCollectionTuple.of(v1Tuple, leftCollection)
+        .and(v2Tuple, rightCollection)
+        .apply(CoGroupByKey.<K>create());
+
+    return coGbkResultCollection.apply(ParDo.of(
+      new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
+        @Override
+        public void processElement(ProcessContext c) {
+          KV<K, CoGbkResult> e = c.element();
+
+          Iterable<V1> leftValuesIterable = e.getValue().getAll(v1Tuple);
+          Iterable<V2> rightValuesIterable = e.getValue().getAll(v2Tuple);
+
+          for (V2 rightValue : rightValuesIterable) {
+            if (leftValuesIterable.iterator().hasNext()) {
+              for (V1 leftValue : leftValuesIterable) {
+                c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue)));
+              }
+            } else {
+              c.output(KV.of(e.getKey(), KV.of(nullValue, rightValue)));
+            }
+          }
+        }
+      }))
+      .setCoder(KvCoder.of(((KvCoder) leftCollection.getCoder()).getKeyCoder(),
+                           KvCoder.of(((KvCoder) 
leftCollection.getCoder()).getValueCoder(),
+                                      ((KvCoder) 
rightCollection.getCoder()).getValueCoder())));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4059412/sdks/java/extensions/join-library/src/main/java/org/apache/contrib/joinlibrary/Join.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/extensions/join-library/src/main/java/org/apache/contrib/joinlibrary/Join.java
 
b/sdks/java/extensions/join-library/src/main/java/org/apache/contrib/joinlibrary/Join.java
deleted file mode 100644
index 6421e97..0000000
--- 
a/sdks/java/extensions/join-library/src/main/java/org/apache/contrib/joinlibrary/Join.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.contrib.joinlibrary;
-
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.join.CoGbkResult;
-import org.apache.beam.sdk.transforms.join.CoGroupByKey;
-import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TupleTag;
-import com.google.common.base.Preconditions;
-
-/**
- * Utility class with different versions of joins. All methods join two 
collections of
- * key/value pairs (KV).
- */
-public class Join {
-
-  /**
-   * Inner join of two collections of KV elements.
-   * @param leftCollection Left side collection to join.
-   * @param rightCollection Right side collection to join.
-   * @param <K> Type of the key for both collections
-   * @param <V1> Type of the values for the left collection.
-   * @param <V2> Type of the values for the right collection.
-   * @return A joined collection of KV where Key is the key and value is a
-   *         KV where Key is of type V1 and Value is type V2.
-   */
-  public static <K, V1, V2> PCollection<KV<K, KV<V1, V2>>> innerJoin(
-    final PCollection<KV<K, V1>> leftCollection, final PCollection<KV<K, V2>> 
rightCollection) {
-    Preconditions.checkNotNull(leftCollection);
-    Preconditions.checkNotNull(rightCollection);
-
-    final TupleTag<V1> v1Tuple = new TupleTag<>();
-    final TupleTag<V2> v2Tuple = new TupleTag<>();
-
-    PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
-      KeyedPCollectionTuple.of(v1Tuple, leftCollection)
-        .and(v2Tuple, rightCollection)
-        .apply(CoGroupByKey.<K>create());
-
-    return coGbkResultCollection.apply(ParDo.of(
-      new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
-        @Override
-        public void processElement(ProcessContext c) {
-          KV<K, CoGbkResult> e = c.element();
-
-          Iterable<V1> leftValuesIterable = e.getValue().getAll(v1Tuple);
-          Iterable<V2> rightValuesIterable = e.getValue().getAll(v2Tuple);
-
-          for (V1 leftValue : leftValuesIterable) {
-            for (V2 rightValue : rightValuesIterable) {
-              c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue)));
-            }
-          }
-        }
-      }))
-      .setCoder(KvCoder.of(((KvCoder) leftCollection.getCoder()).getKeyCoder(),
-                           KvCoder.of(((KvCoder) 
leftCollection.getCoder()).getValueCoder(),
-                                      ((KvCoder) 
rightCollection.getCoder()).getValueCoder())));
-  }
-
-  /**
-   * Left Outer Join of two collections of KV elements.
-   * @param leftCollection Left side collection to join.
-   * @param rightCollection Right side collection to join.
-   * @param nullValue Value to use as null value when right side do not match 
left side.
-   * @param <K> Type of the key for both collections
-   * @param <V1> Type of the values for the left collection.
-   * @param <V2> Type of the values for the right collection.
-   * @return A joined collection of KV where Key is the key and value is a
-   *         KV where Key is of type V1 and Value is type V2. Values that
-   *         should be null or empty is replaced with nullValue.
-   */
-  public static <K, V1, V2> PCollection<KV<K, KV<V1, V2>>> leftOuterJoin(
-    final PCollection<KV<K, V1>> leftCollection,
-    final PCollection<KV<K, V2>> rightCollection,
-    final V2 nullValue) {
-    Preconditions.checkNotNull(leftCollection);
-    Preconditions.checkNotNull(rightCollection);
-    Preconditions.checkNotNull(nullValue);
-
-    final TupleTag<V1> v1Tuple = new TupleTag<>();
-    final TupleTag<V2> v2Tuple = new TupleTag<>();
-
-    PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
-      KeyedPCollectionTuple.of(v1Tuple, leftCollection)
-        .and(v2Tuple, rightCollection)
-        .apply(CoGroupByKey.<K>create());
-
-    return coGbkResultCollection.apply(ParDo.of(
-      new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
-        @Override
-        public void processElement(ProcessContext c) {
-          KV<K, CoGbkResult> e = c.element();
-
-          Iterable<V1> leftValuesIterable = e.getValue().getAll(v1Tuple);
-          Iterable<V2> rightValuesIterable = e.getValue().getAll(v2Tuple);
-
-          for (V1 leftValue : leftValuesIterable) {
-            if (rightValuesIterable.iterator().hasNext()) {
-              for (V2 rightValue : rightValuesIterable) {
-                c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue)));
-              }
-            } else {
-              c.output(KV.of(e.getKey(), KV.of(leftValue, nullValue)));
-            }
-          }
-        }
-      }))
-      .setCoder(KvCoder.of(((KvCoder) leftCollection.getCoder()).getKeyCoder(),
-                           KvCoder.of(((KvCoder) 
leftCollection.getCoder()).getValueCoder(),
-                                      ((KvCoder) 
rightCollection.getCoder()).getValueCoder())));
-  }
-
-  /**
-   * Right Outer Join of two collections of KV elements.
-   * @param leftCollection Left side collection to join.
-   * @param rightCollection Right side collection to join.
-   * @param nullValue Value to use as null value when left side do not match 
right side.
-   * @param <K> Type of the key for both collections
-   * @param <V1> Type of the values for the left collection.
-   * @param <V2> Type of the values for the right collection.
-   * @return A joined collection of KV where Key is the key and value is a
-   *         KV where Key is of type V1 and Value is type V2. Keys that
-   *         should be null or empty is replaced with nullValue.
-   */
-  public static <K, V1, V2> PCollection<KV<K, KV<V1, V2>>> rightOuterJoin(
-    final PCollection<KV<K, V1>> leftCollection,
-    final PCollection<KV<K, V2>> rightCollection,
-    final V1 nullValue) {
-    Preconditions.checkNotNull(leftCollection);
-    Preconditions.checkNotNull(rightCollection);
-    Preconditions.checkNotNull(nullValue);
-
-    final TupleTag<V1> v1Tuple = new TupleTag<>();
-    final TupleTag<V2> v2Tuple = new TupleTag<>();
-
-    PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
-      KeyedPCollectionTuple.of(v1Tuple, leftCollection)
-        .and(v2Tuple, rightCollection)
-        .apply(CoGroupByKey.<K>create());
-
-    return coGbkResultCollection.apply(ParDo.of(
-      new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
-        @Override
-        public void processElement(ProcessContext c) {
-          KV<K, CoGbkResult> e = c.element();
-
-          Iterable<V1> leftValuesIterable = e.getValue().getAll(v1Tuple);
-          Iterable<V2> rightValuesIterable = e.getValue().getAll(v2Tuple);
-
-          for (V2 rightValue : rightValuesIterable) {
-            if (leftValuesIterable.iterator().hasNext()) {
-              for (V1 leftValue : leftValuesIterable) {
-                c.output(KV.of(e.getKey(), KV.of(leftValue, rightValue)));
-              }
-            } else {
-              c.output(KV.of(e.getKey(), KV.of(nullValue, rightValue)));
-            }
-          }
-        }
-      }))
-      .setCoder(KvCoder.of(((KvCoder) leftCollection.getCoder()).getKeyCoder(),
-                           KvCoder.of(((KvCoder) 
leftCollection.getCoder()).getValueCoder(),
-                                      ((KvCoder) 
rightCollection.getCoder()).getValueCoder())));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4059412/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/InnerJoinTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/InnerJoinTest.java
 
b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/InnerJoinTest.java
new file mode 100644
index 0000000..6622fdc
--- /dev/null
+++ 
b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/InnerJoinTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.joinlibrary;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This test Inner Join functionality.
+ */
+public class InnerJoinTest {
+
+  Pipeline p;
+  List<KV<String, Long>> leftListOfKv;
+  List<KV<String, String>> listRightOfKv;
+  List<KV<String, KV<Long, String>>> expectedResult;
+
+  @Before
+  public void setup() {
+
+    p = TestPipeline.create();
+    leftListOfKv = new ArrayList<>();
+    listRightOfKv = new ArrayList<>();
+
+    expectedResult = new ArrayList<>();
+  }
+
+  @Test
+  public void testJoinOneToOneMapping() {
+    leftListOfKv.add(KV.of("Key1", 5L));
+    leftListOfKv.add(KV.of("Key2", 4L));
+    PCollection<KV<String, Long>> leftCollection =
+        p.apply("CreateLeft", Create.of(leftListOfKv));
+
+    listRightOfKv.add(KV.of("Key1", "foo"));
+    listRightOfKv.add(KV.of("Key2", "bar"));
+    PCollection<KV<String, String>> rightCollection =
+        p.apply("CreateRight", Create.of(listRightOfKv));
+
+    PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin(
+      leftCollection, rightCollection);
+
+    expectedResult.add(KV.of("Key1", KV.of(5L, "foo")));
+    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+    PAssert.that(output).containsInAnyOrder(expectedResult);
+
+    p.run();
+  }
+
+  @Test
+  public void testJoinOneToManyMapping() {
+    leftListOfKv.add(KV.of("Key2", 4L));
+    PCollection<KV<String, Long>> leftCollection = p
+        .apply("CreateLeft", Create.of(leftListOfKv));
+
+    listRightOfKv.add(KV.of("Key2", "bar"));
+    listRightOfKv.add(KV.of("Key2", "gazonk"));
+    PCollection<KV<String, String>> rightCollection = p
+        .apply("CreateRight", Create.of(listRightOfKv));
+
+    PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin(
+      leftCollection, rightCollection);
+
+    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+    expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk")));
+    PAssert.that(output).containsInAnyOrder(expectedResult);
+
+    p.run();
+  }
+
+  @Test
+  public void testJoinManyToOneMapping() {
+    leftListOfKv.add(KV.of("Key2", 4L));
+    leftListOfKv.add(KV.of("Key2", 6L));
+    PCollection<KV<String, Long>> leftCollection = p
+        .apply("CreateLeft", Create.of(leftListOfKv));
+
+    listRightOfKv.add(KV.of("Key2", "bar"));
+    PCollection<KV<String, String>> rightCollection = p
+        .apply("CreateRight", Create.of(listRightOfKv));
+
+    PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin(
+      leftCollection, rightCollection);
+
+    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+    expectedResult.add(KV.of("Key2", KV.of(6L, "bar")));
+    PAssert.that(output).containsInAnyOrder(expectedResult);
+
+    p.run();
+  }
+
+  @Test
+  public void testJoinNoneToNoneMapping() {
+    leftListOfKv.add(KV.of("Key2", 4L));
+    PCollection<KV<String, Long>> leftCollection = p
+        .apply("CreateLeft", Create.of(leftListOfKv));
+
+    listRightOfKv.add(KV.of("Key3", "bar"));
+    PCollection<KV<String, String>> rightCollection = p
+        .apply("CreateRight", Create.of(listRightOfKv));
+
+    PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin(
+      leftCollection, rightCollection);
+
+    PAssert.that(output).containsInAnyOrder(expectedResult);
+    p.run();
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testJoinLeftCollectionNull() {
+    Join.innerJoin(null, p.apply(Create.of(listRightOfKv)));
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testJoinRightCollectionNull() {
+    Join.innerJoin(p.apply(Create.of(leftListOfKv)), null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4059412/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterLeftJoinTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterLeftJoinTest.java
 
b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterLeftJoinTest.java
new file mode 100644
index 0000000..91b0740
--- /dev/null
+++ 
b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterLeftJoinTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.joinlibrary;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * This test Outer Left Join functionality.
+ */
+public class OuterLeftJoinTest {
+
+  Pipeline p;
+  List<KV<String, Long>> leftListOfKv;
+  List<KV<String, String>> listRightOfKv;
+  List<KV<String, KV<Long, String>>> expectedResult;
+
+  @Before
+  public void setup() {
+
+    p = TestPipeline.create();
+    leftListOfKv = new ArrayList<>();
+    listRightOfKv = new ArrayList<>();
+
+    expectedResult = new ArrayList<>();
+  }
+
+  @Test
+  public void testJoinOneToOneMapping() {
+    leftListOfKv.add(KV.of("Key1", 5L));
+    leftListOfKv.add(KV.of("Key2", 4L));
+    PCollection<KV<String, Long>> leftCollection = p
+        .apply("CreateLeft", Create.of(leftListOfKv));
+
+    listRightOfKv.add(KV.of("Key1", "foo"));
+    listRightOfKv.add(KV.of("Key2", "bar"));
+    PCollection<KV<String, String>> rightCollection = p
+        .apply("CreateRight", Create.of(listRightOfKv));
+
+    PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin(
+      leftCollection, rightCollection, "");
+
+    expectedResult.add(KV.of("Key1", KV.of(5L, "foo")));
+    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+    PAssert.that(output).containsInAnyOrder(expectedResult);
+
+    p.run();
+  }
+
+  @Test
+  public void testJoinOneToManyMapping() {
+    leftListOfKv.add(KV.of("Key2", 4L));
+    PCollection<KV<String, Long>> leftCollection = p
+        .apply("CreateLeft", Create.of(leftListOfKv));
+
+    listRightOfKv.add(KV.of("Key2", "bar"));
+    listRightOfKv.add(KV.of("Key2", "gazonk"));
+    PCollection<KV<String, String>> rightCollection = p
+        .apply("CreateRight", Create.of(listRightOfKv));
+
+    PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin(
+      leftCollection, rightCollection, "");
+
+    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+    expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk")));
+    PAssert.that(output).containsInAnyOrder(expectedResult);
+
+    p.run();
+  }
+
+  @Test
+  public void testJoinManyToOneMapping() {
+    leftListOfKv.add(KV.of("Key2", 4L));
+    leftListOfKv.add(KV.of("Key2", 6L));
+    PCollection<KV<String, Long>> leftCollection = p
+        .apply("CreateLeft", Create.of(leftListOfKv));
+
+    listRightOfKv.add(KV.of("Key2", "bar"));
+    PCollection<KV<String, String>> rightCollection = p
+        .apply("CreateRight", Create.of(listRightOfKv));
+
+    PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin(
+      leftCollection, rightCollection, "");
+
+    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+    expectedResult.add(KV.of("Key2", KV.of(6L, "bar")));
+    PAssert.that(output).containsInAnyOrder(expectedResult);
+
+    p.run();
+  }
+
+  @Test
+  public void testJoinOneToNoneMapping() {
+    leftListOfKv.add(KV.of("Key2", 4L));
+    PCollection<KV<String, Long>> leftCollection = p
+        .apply("CreateLeft", Create.of(leftListOfKv));
+
+    listRightOfKv.add(KV.of("Key3", "bar"));
+    PCollection<KV<String, String>> rightCollection = p
+        .apply("CreateRight", Create.of(listRightOfKv));
+
+    PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin(
+      leftCollection, rightCollection, "");
+
+    expectedResult.add(KV.of("Key2", KV.of(4L, "")));
+    PAssert.that(output).containsInAnyOrder(expectedResult);
+    p.run();
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testJoinLeftCollectionNull() {
+    Join.leftOuterJoin(null, p.apply(Create.of(listRightOfKv)), "");
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testJoinRightCollectionNull() {
+    Join.leftOuterJoin(p.apply(Create.of(leftListOfKv)), null, "");
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testJoinNullValueIsNull() {
+    Join.leftOuterJoin(
+        p.apply("CreateLeft", Create.of(leftListOfKv)),
+        p.apply("CreateRight", Create.of(listRightOfKv)),
+        null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4059412/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterRightJoinTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterRightJoinTest.java
 
b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterRightJoinTest.java
new file mode 100644
index 0000000..7977df7
--- /dev/null
+++ 
b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterRightJoinTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.joinlibrary;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * This test Outer Right Join functionality.
+ */
+public class OuterRightJoinTest {
+
+  Pipeline p;
+  List<KV<String, Long>> leftListOfKv;
+  List<KV<String, String>> listRightOfKv;
+  List<KV<String, KV<Long, String>>> expectedResult;
+
+  @Before
+  public void setup() {
+
+    p = TestPipeline.create();
+    leftListOfKv = new ArrayList<>();
+    listRightOfKv = new ArrayList<>();
+
+    expectedResult = new ArrayList<>();
+  }
+
+  @Test
+  public void testJoinOneToOneMapping() {
+    leftListOfKv.add(KV.of("Key1", 5L));
+    leftListOfKv.add(KV.of("Key2", 4L));
+    PCollection<KV<String, Long>> leftCollection = p
+        .apply("CreateLeft", Create.of(leftListOfKv));
+
+    listRightOfKv.add(KV.of("Key1", "foo"));
+    listRightOfKv.add(KV.of("Key2", "bar"));
+    PCollection<KV<String, String>> rightCollection = p
+        .apply("CreateRight", Create.of(listRightOfKv));
+
+    PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin(
+      leftCollection, rightCollection, -1L);
+
+    expectedResult.add(KV.of("Key1", KV.of(5L, "foo")));
+    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+    PAssert.that(output).containsInAnyOrder(expectedResult);
+
+    p.run();
+  }
+
+  @Test
+  public void testJoinOneToManyMapping() {
+    leftListOfKv.add(KV.of("Key2", 4L));
+    PCollection<KV<String, Long>> leftCollection = p
+        .apply("CreateLeft", Create.of(leftListOfKv));
+
+    listRightOfKv.add(KV.of("Key2", "bar"));
+    listRightOfKv.add(KV.of("Key2", "gazonk"));
+    PCollection<KV<String, String>> rightCollection = p
+        .apply("CreateRight", Create.of(listRightOfKv));
+
+    PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin(
+      leftCollection, rightCollection, -1L);
+
+    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+    expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk")));
+    PAssert.that(output).containsInAnyOrder(expectedResult);
+
+    p.run();
+  }
+
+  @Test
+  public void testJoinManyToOneMapping() {
+    leftListOfKv.add(KV.of("Key2", 4L));
+    leftListOfKv.add(KV.of("Key2", 6L));
+    PCollection<KV<String, Long>> leftCollection = p
+        .apply("CreateLeft", Create.of(leftListOfKv));
+
+    listRightOfKv.add(KV.of("Key2", "bar"));
+    PCollection<KV<String, String>> rightCollection = p
+        .apply("CreateRight", Create.of(listRightOfKv));
+
+    PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin(
+      leftCollection, rightCollection, -1L);
+
+    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+    expectedResult.add(KV.of("Key2", KV.of(6L, "bar")));
+    PAssert.that(output).containsInAnyOrder(expectedResult);
+
+    p.run();
+  }
+
+  @Test
+  public void testJoinNoneToOneMapping() {
+    leftListOfKv.add(KV.of("Key2", 4L));
+    PCollection<KV<String, Long>> leftCollection = p
+        .apply("CreateLeft", Create.of(leftListOfKv));
+
+    listRightOfKv.add(KV.of("Key3", "bar"));
+    PCollection<KV<String, String>> rightCollection = p
+        .apply("CreateRight", Create.of(listRightOfKv));
+
+    PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin(
+      leftCollection, rightCollection, -1L);
+
+    expectedResult.add(KV.of("Key3", KV.of(-1L, "bar")));
+    PAssert.that(output).containsInAnyOrder(expectedResult);
+    p.run();
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testJoinLeftCollectionNull() {
+    Join.rightOuterJoin(null, p.apply(Create.of(listRightOfKv)), "");
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testJoinRightCollectionNull() {
+    Join.rightOuterJoin(p.apply(Create.of(leftListOfKv)), null, -1L);
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testJoinNullValueIsNull() {
+    Join.rightOuterJoin(
+        p.apply("CreateLeft", Create.of(leftListOfKv)),
+        p.apply("CreateRight", Create.of(listRightOfKv)),
+        null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4059412/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/InnerJoinTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/InnerJoinTest.java
 
b/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/InnerJoinTest.java
deleted file mode 100644
index 99e9c4b..0000000
--- 
a/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/InnerJoinTest.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.contrib.joinlibrary;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * This test Inner Join functionality.
- */
-public class InnerJoinTest {
-
-  Pipeline p;
-  List<KV<String, Long>> leftListOfKv;
-  List<KV<String, String>> listRightOfKv;
-  List<KV<String, KV<Long, String>>> expectedResult;
-
-  @Before
-  public void setup() {
-
-    p = TestPipeline.create();
-    leftListOfKv = new ArrayList<>();
-    listRightOfKv = new ArrayList<>();
-
-    expectedResult = new ArrayList<>();
-  }
-
-  @Test
-  public void testJoinOneToOneMapping() {
-    leftListOfKv.add(KV.of("Key1", 5L));
-    leftListOfKv.add(KV.of("Key2", 4L));
-    PCollection<KV<String, Long>> leftCollection =
-        p.apply("CreateLeft", Create.of(leftListOfKv));
-
-    listRightOfKv.add(KV.of("Key1", "foo"));
-    listRightOfKv.add(KV.of("Key2", "bar"));
-    PCollection<KV<String, String>> rightCollection =
-        p.apply("CreateRight", Create.of(listRightOfKv));
-
-    PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin(
-      leftCollection, rightCollection);
-
-    expectedResult.add(KV.of("Key1", KV.of(5L, "foo")));
-    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
-    PAssert.that(output).containsInAnyOrder(expectedResult);
-
-    p.run();
-  }
-
-  @Test
-  public void testJoinOneToManyMapping() {
-    leftListOfKv.add(KV.of("Key2", 4L));
-    PCollection<KV<String, Long>> leftCollection = p
-        .apply("CreateLeft", Create.of(leftListOfKv));
-
-    listRightOfKv.add(KV.of("Key2", "bar"));
-    listRightOfKv.add(KV.of("Key2", "gazonk"));
-    PCollection<KV<String, String>> rightCollection = p
-        .apply("CreateRight", Create.of(listRightOfKv));
-
-    PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin(
-      leftCollection, rightCollection);
-
-    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
-    expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk")));
-    PAssert.that(output).containsInAnyOrder(expectedResult);
-
-    p.run();
-  }
-
-  @Test
-  public void testJoinManyToOneMapping() {
-    leftListOfKv.add(KV.of("Key2", 4L));
-    leftListOfKv.add(KV.of("Key2", 6L));
-    PCollection<KV<String, Long>> leftCollection = p
-        .apply("CreateLeft", Create.of(leftListOfKv));
-
-    listRightOfKv.add(KV.of("Key2", "bar"));
-    PCollection<KV<String, String>> rightCollection = p
-        .apply("CreateRight", Create.of(listRightOfKv));
-
-    PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin(
-      leftCollection, rightCollection);
-
-    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
-    expectedResult.add(KV.of("Key2", KV.of(6L, "bar")));
-    PAssert.that(output).containsInAnyOrder(expectedResult);
-
-    p.run();
-  }
-
-  @Test
-  public void testJoinNoneToNoneMapping() {
-    leftListOfKv.add(KV.of("Key2", 4L));
-    PCollection<KV<String, Long>> leftCollection = p
-        .apply("CreateLeft", Create.of(leftListOfKv));
-
-    listRightOfKv.add(KV.of("Key3", "bar"));
-    PCollection<KV<String, String>> rightCollection = p
-        .apply("CreateRight", Create.of(listRightOfKv));
-
-    PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin(
-      leftCollection, rightCollection);
-
-    PAssert.that(output).containsInAnyOrder(expectedResult);
-    p.run();
-  }
-
-  @Test(expected = NullPointerException.class)
-  public void testJoinLeftCollectionNull() {
-    Join.innerJoin(null, p.apply(Create.of(listRightOfKv)));
-  }
-
-  @Test(expected = NullPointerException.class)
-  public void testJoinRightCollectionNull() {
-    Join.innerJoin(p.apply(Create.of(leftListOfKv)), null);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4059412/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterLeftJoinTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterLeftJoinTest.java
 
b/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterLeftJoinTest.java
deleted file mode 100644
index ca09136..0000000
--- 
a/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterLeftJoinTest.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.contrib.joinlibrary;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-
-/**
- * This test Outer Left Join functionality.
- */
-public class OuterLeftJoinTest {
-
-  Pipeline p;
-  List<KV<String, Long>> leftListOfKv;
-  List<KV<String, String>> listRightOfKv;
-  List<KV<String, KV<Long, String>>> expectedResult;
-
-  @Before
-  public void setup() {
-
-    p = TestPipeline.create();
-    leftListOfKv = new ArrayList<>();
-    listRightOfKv = new ArrayList<>();
-
-    expectedResult = new ArrayList<>();
-  }
-
-  @Test
-  public void testJoinOneToOneMapping() {
-    leftListOfKv.add(KV.of("Key1", 5L));
-    leftListOfKv.add(KV.of("Key2", 4L));
-    PCollection<KV<String, Long>> leftCollection = p
-        .apply("CreateLeft", Create.of(leftListOfKv));
-
-    listRightOfKv.add(KV.of("Key1", "foo"));
-    listRightOfKv.add(KV.of("Key2", "bar"));
-    PCollection<KV<String, String>> rightCollection = p
-        .apply("CreateRight", Create.of(listRightOfKv));
-
-    PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin(
-      leftCollection, rightCollection, "");
-
-    expectedResult.add(KV.of("Key1", KV.of(5L, "foo")));
-    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
-    PAssert.that(output).containsInAnyOrder(expectedResult);
-
-    p.run();
-  }
-
-  @Test
-  public void testJoinOneToManyMapping() {
-    leftListOfKv.add(KV.of("Key2", 4L));
-    PCollection<KV<String, Long>> leftCollection = p
-        .apply("CreateLeft", Create.of(leftListOfKv));
-
-    listRightOfKv.add(KV.of("Key2", "bar"));
-    listRightOfKv.add(KV.of("Key2", "gazonk"));
-    PCollection<KV<String, String>> rightCollection = p
-        .apply("CreateRight", Create.of(listRightOfKv));
-
-    PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin(
-      leftCollection, rightCollection, "");
-
-    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
-    expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk")));
-    PAssert.that(output).containsInAnyOrder(expectedResult);
-
-    p.run();
-  }
-
-  @Test
-  public void testJoinManyToOneMapping() {
-    leftListOfKv.add(KV.of("Key2", 4L));
-    leftListOfKv.add(KV.of("Key2", 6L));
-    PCollection<KV<String, Long>> leftCollection = p
-        .apply("CreateLeft", Create.of(leftListOfKv));
-
-    listRightOfKv.add(KV.of("Key2", "bar"));
-    PCollection<KV<String, String>> rightCollection = p
-        .apply("CreateRight", Create.of(listRightOfKv));
-
-    PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin(
-      leftCollection, rightCollection, "");
-
-    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
-    expectedResult.add(KV.of("Key2", KV.of(6L, "bar")));
-    PAssert.that(output).containsInAnyOrder(expectedResult);
-
-    p.run();
-  }
-
-  @Test
-  public void testJoinOneToNoneMapping() {
-    leftListOfKv.add(KV.of("Key2", 4L));
-    PCollection<KV<String, Long>> leftCollection = p
-        .apply("CreateLeft", Create.of(leftListOfKv));
-
-    listRightOfKv.add(KV.of("Key3", "bar"));
-    PCollection<KV<String, String>> rightCollection = p
-        .apply("CreateRight", Create.of(listRightOfKv));
-
-    PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin(
-      leftCollection, rightCollection, "");
-
-    expectedResult.add(KV.of("Key2", KV.of(4L, "")));
-    PAssert.that(output).containsInAnyOrder(expectedResult);
-    p.run();
-  }
-
-  @Test(expected = NullPointerException.class)
-  public void testJoinLeftCollectionNull() {
-    Join.leftOuterJoin(null, p.apply(Create.of(listRightOfKv)), "");
-  }
-
-  @Test(expected = NullPointerException.class)
-  public void testJoinRightCollectionNull() {
-    Join.leftOuterJoin(p.apply(Create.of(leftListOfKv)), null, "");
-  }
-
-  @Test(expected = NullPointerException.class)
-  public void testJoinNullValueIsNull() {
-    Join.leftOuterJoin(
-        p.apply("CreateLeft", Create.of(leftListOfKv)),
-        p.apply("CreateRight", Create.of(listRightOfKv)),
-        null);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4059412/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterRightJoinTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterRightJoinTest.java
 
b/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterRightJoinTest.java
deleted file mode 100644
index 86028ac..0000000
--- 
a/sdks/java/extensions/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterRightJoinTest.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.contrib.joinlibrary;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-
-/**
- * This test Outer Right Join functionality.
- */
-public class OuterRightJoinTest {
-
-  Pipeline p;
-  List<KV<String, Long>> leftListOfKv;
-  List<KV<String, String>> listRightOfKv;
-  List<KV<String, KV<Long, String>>> expectedResult;
-
-  @Before
-  public void setup() {
-
-    p = TestPipeline.create();
-    leftListOfKv = new ArrayList<>();
-    listRightOfKv = new ArrayList<>();
-
-    expectedResult = new ArrayList<>();
-  }
-
-  @Test
-  public void testJoinOneToOneMapping() {
-    leftListOfKv.add(KV.of("Key1", 5L));
-    leftListOfKv.add(KV.of("Key2", 4L));
-    PCollection<KV<String, Long>> leftCollection = p
-        .apply("CreateLeft", Create.of(leftListOfKv));
-
-    listRightOfKv.add(KV.of("Key1", "foo"));
-    listRightOfKv.add(KV.of("Key2", "bar"));
-    PCollection<KV<String, String>> rightCollection = p
-        .apply("CreateRight", Create.of(listRightOfKv));
-
-    PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin(
-      leftCollection, rightCollection, -1L);
-
-    expectedResult.add(KV.of("Key1", KV.of(5L, "foo")));
-    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
-    PAssert.that(output).containsInAnyOrder(expectedResult);
-
-    p.run();
-  }
-
-  @Test
-  public void testJoinOneToManyMapping() {
-    leftListOfKv.add(KV.of("Key2", 4L));
-    PCollection<KV<String, Long>> leftCollection = p
-        .apply("CreateLeft", Create.of(leftListOfKv));
-
-    listRightOfKv.add(KV.of("Key2", "bar"));
-    listRightOfKv.add(KV.of("Key2", "gazonk"));
-    PCollection<KV<String, String>> rightCollection = p
-        .apply("CreateRight", Create.of(listRightOfKv));
-
-    PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin(
-      leftCollection, rightCollection, -1L);
-
-    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
-    expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk")));
-    PAssert.that(output).containsInAnyOrder(expectedResult);
-
-    p.run();
-  }
-
-  @Test
-  public void testJoinManyToOneMapping() {
-    leftListOfKv.add(KV.of("Key2", 4L));
-    leftListOfKv.add(KV.of("Key2", 6L));
-    PCollection<KV<String, Long>> leftCollection = p
-        .apply("CreateLeft", Create.of(leftListOfKv));
-
-    listRightOfKv.add(KV.of("Key2", "bar"));
-    PCollection<KV<String, String>> rightCollection = p
-        .apply("CreateRight", Create.of(listRightOfKv));
-
-    PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin(
-      leftCollection, rightCollection, -1L);
-
-    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
-    expectedResult.add(KV.of("Key2", KV.of(6L, "bar")));
-    PAssert.that(output).containsInAnyOrder(expectedResult);
-
-    p.run();
-  }
-
-  @Test
-  public void testJoinNoneToOneMapping() {
-    leftListOfKv.add(KV.of("Key2", 4L));
-    PCollection<KV<String, Long>> leftCollection = p
-        .apply("CreateLeft", Create.of(leftListOfKv));
-
-    listRightOfKv.add(KV.of("Key3", "bar"));
-    PCollection<KV<String, String>> rightCollection = p
-        .apply("CreateRight", Create.of(listRightOfKv));
-
-    PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin(
-      leftCollection, rightCollection, -1L);
-
-    expectedResult.add(KV.of("Key3", KV.of(-1L, "bar")));
-    PAssert.that(output).containsInAnyOrder(expectedResult);
-    p.run();
-  }
-
-  @Test(expected = NullPointerException.class)
-  public void testJoinLeftCollectionNull() {
-    Join.rightOuterJoin(null, p.apply(Create.of(listRightOfKv)), "");
-  }
-
-  @Test(expected = NullPointerException.class)
-  public void testJoinRightCollectionNull() {
-    Join.rightOuterJoin(p.apply(Create.of(leftListOfKv)), null, -1L);
-  }
-
-  @Test(expected = NullPointerException.class)
-  public void testJoinNullValueIsNull() {
-    Join.rightOuterJoin(
-        p.apply("CreateLeft", Create.of(leftListOfKv)),
-        p.apply("CreateRight", Create.of(listRightOfKv)),
-        null);
-  }
-}

Reply via email to