http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/contrib/join-library/src/test/java/com/google/cloud/dataflow/contrib/joinlibrary/OuterLeftJoinTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/join-library/src/test/java/com/google/cloud/dataflow/contrib/joinlibrary/OuterLeftJoinTest.java
 
b/contrib/join-library/src/test/java/com/google/cloud/dataflow/contrib/joinlibrary/OuterLeftJoinTest.java
deleted file mode 100644
index 35c8655..0000000
--- 
a/contrib/join-library/src/test/java/com/google/cloud/dataflow/contrib/joinlibrary/OuterLeftJoinTest.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.contrib.joinlibrary;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.testing.PAssert;
-import com.google.cloud.dataflow.sdk.testing.TestPipeline;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-
-/**
- * This test Outer Left Join functionality.
- */
-public class OuterLeftJoinTest {
-
-  Pipeline p;
-  List<KV<String, Long>> leftListOfKv;
-  List<KV<String, String>> listRightOfKv;
-  List<KV<String, KV<Long, String>>> expectedResult;
-
-  @Before
-  public void setup() {
-
-    p = TestPipeline.create();
-    leftListOfKv = new ArrayList<>();
-    listRightOfKv = new ArrayList<>();
-
-    expectedResult = new ArrayList<>();
-  }
-
-  @Test
-  public void testJoinOneToOneMapping() {
-    leftListOfKv.add(KV.of("Key1", 5L));
-    leftListOfKv.add(KV.of("Key2", 4L));
-    PCollection<KV<String, Long>> leftCollection = p
-        .apply("CreateLeft", Create.of(leftListOfKv));
-
-    listRightOfKv.add(KV.of("Key1", "foo"));
-    listRightOfKv.add(KV.of("Key2", "bar"));
-    PCollection<KV<String, String>> rightCollection = p
-        .apply("CreateRight", Create.of(listRightOfKv));
-
-    PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin(
-      leftCollection, rightCollection, "");
-
-    expectedResult.add(KV.of("Key1", KV.of(5L, "foo")));
-    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
-    PAssert.that(output).containsInAnyOrder(expectedResult);
-
-    p.run();
-  }
-
-  @Test
-  public void testJoinOneToManyMapping() {
-    leftListOfKv.add(KV.of("Key2", 4L));
-    PCollection<KV<String, Long>> leftCollection = p
-        .apply("CreateLeft", Create.of(leftListOfKv));
-
-    listRightOfKv.add(KV.of("Key2", "bar"));
-    listRightOfKv.add(KV.of("Key2", "gazonk"));
-    PCollection<KV<String, String>> rightCollection = p
-        .apply("CreateRight", Create.of(listRightOfKv));
-
-    PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin(
-      leftCollection, rightCollection, "");
-
-    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
-    expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk")));
-    PAssert.that(output).containsInAnyOrder(expectedResult);
-
-    p.run();
-  }
-
-  @Test
-  public void testJoinManyToOneMapping() {
-    leftListOfKv.add(KV.of("Key2", 4L));
-    leftListOfKv.add(KV.of("Key2", 6L));
-    PCollection<KV<String, Long>> leftCollection = p
-        .apply("CreateLeft", Create.of(leftListOfKv));
-
-    listRightOfKv.add(KV.of("Key2", "bar"));
-    PCollection<KV<String, String>> rightCollection = p
-        .apply("CreateRight", Create.of(listRightOfKv));
-
-    PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin(
-      leftCollection, rightCollection, "");
-
-    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
-    expectedResult.add(KV.of("Key2", KV.of(6L, "bar")));
-    PAssert.that(output).containsInAnyOrder(expectedResult);
-
-    p.run();
-  }
-
-  @Test
-  public void testJoinOneToNoneMapping() {
-    leftListOfKv.add(KV.of("Key2", 4L));
-    PCollection<KV<String, Long>> leftCollection = p
-        .apply("CreateLeft", Create.of(leftListOfKv));
-
-    listRightOfKv.add(KV.of("Key3", "bar"));
-    PCollection<KV<String, String>> rightCollection = p
-        .apply("CreateRight", Create.of(listRightOfKv));
-
-    PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin(
-      leftCollection, rightCollection, "");
-
-    expectedResult.add(KV.of("Key2", KV.of(4L, "")));
-    PAssert.that(output).containsInAnyOrder(expectedResult);
-    p.run();
-  }
-
-  @Test(expected = NullPointerException.class)
-  public void testJoinLeftCollectionNull() {
-    Join.leftOuterJoin(null, p.apply(Create.of(listRightOfKv)), "");
-  }
-
-  @Test(expected = NullPointerException.class)
-  public void testJoinRightCollectionNull() {
-    Join.leftOuterJoin(p.apply(Create.of(leftListOfKv)), null, "");
-  }
-
-  @Test(expected = NullPointerException.class)
-  public void testJoinNullValueIsNull() {
-    Join.leftOuterJoin(
-        p.apply("CreateLeft", Create.of(leftListOfKv)),
-        p.apply("CreateRight", Create.of(listRightOfKv)),
-        null);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/contrib/join-library/src/test/java/com/google/cloud/dataflow/contrib/joinlibrary/OuterRightJoinTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/join-library/src/test/java/com/google/cloud/dataflow/contrib/joinlibrary/OuterRightJoinTest.java
 
b/contrib/join-library/src/test/java/com/google/cloud/dataflow/contrib/joinlibrary/OuterRightJoinTest.java
deleted file mode 100644
index 9cc6785..0000000
--- 
a/contrib/join-library/src/test/java/com/google/cloud/dataflow/contrib/joinlibrary/OuterRightJoinTest.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.contrib.joinlibrary;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.testing.PAssert;
-import com.google.cloud.dataflow.sdk.testing.TestPipeline;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-
-/**
- * This test Outer Right Join functionality.
- */
-public class OuterRightJoinTest {
-
-  Pipeline p;
-  List<KV<String, Long>> leftListOfKv;
-  List<KV<String, String>> listRightOfKv;
-  List<KV<String, KV<Long, String>>> expectedResult;
-
-  @Before
-  public void setup() {
-
-    p = TestPipeline.create();
-    leftListOfKv = new ArrayList<>();
-    listRightOfKv = new ArrayList<>();
-
-    expectedResult = new ArrayList<>();
-  }
-
-  @Test
-  public void testJoinOneToOneMapping() {
-    leftListOfKv.add(KV.of("Key1", 5L));
-    leftListOfKv.add(KV.of("Key2", 4L));
-    PCollection<KV<String, Long>> leftCollection = p
-        .apply("CreateLeft", Create.of(leftListOfKv));
-
-    listRightOfKv.add(KV.of("Key1", "foo"));
-    listRightOfKv.add(KV.of("Key2", "bar"));
-    PCollection<KV<String, String>> rightCollection = p
-        .apply("CreateRight", Create.of(listRightOfKv));
-
-    PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin(
-      leftCollection, rightCollection, -1L);
-
-    expectedResult.add(KV.of("Key1", KV.of(5L, "foo")));
-    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
-    PAssert.that(output).containsInAnyOrder(expectedResult);
-
-    p.run();
-  }
-
-  @Test
-  public void testJoinOneToManyMapping() {
-    leftListOfKv.add(KV.of("Key2", 4L));
-    PCollection<KV<String, Long>> leftCollection = p
-        .apply("CreateLeft", Create.of(leftListOfKv));
-
-    listRightOfKv.add(KV.of("Key2", "bar"));
-    listRightOfKv.add(KV.of("Key2", "gazonk"));
-    PCollection<KV<String, String>> rightCollection = p
-        .apply("CreateRight", Create.of(listRightOfKv));
-
-    PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin(
-      leftCollection, rightCollection, -1L);
-
-    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
-    expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk")));
-    PAssert.that(output).containsInAnyOrder(expectedResult);
-
-    p.run();
-  }
-
-  @Test
-  public void testJoinManyToOneMapping() {
-    leftListOfKv.add(KV.of("Key2", 4L));
-    leftListOfKv.add(KV.of("Key2", 6L));
-    PCollection<KV<String, Long>> leftCollection = p
-        .apply("CreateLeft", Create.of(leftListOfKv));
-
-    listRightOfKv.add(KV.of("Key2", "bar"));
-    PCollection<KV<String, String>> rightCollection = p
-        .apply("CreateRight", Create.of(listRightOfKv));
-
-    PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin(
-      leftCollection, rightCollection, -1L);
-
-    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
-    expectedResult.add(KV.of("Key2", KV.of(6L, "bar")));
-    PAssert.that(output).containsInAnyOrder(expectedResult);
-
-    p.run();
-  }
-
-  @Test
-  public void testJoinNoneToOneMapping() {
-    leftListOfKv.add(KV.of("Key2", 4L));
-    PCollection<KV<String, Long>> leftCollection = p
-        .apply("CreateLeft", Create.of(leftListOfKv));
-
-    listRightOfKv.add(KV.of("Key3", "bar"));
-    PCollection<KV<String, String>> rightCollection = p
-        .apply("CreateRight", Create.of(listRightOfKv));
-
-    PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin(
-      leftCollection, rightCollection, -1L);
-
-    expectedResult.add(KV.of("Key3", KV.of(-1L, "bar")));
-    PAssert.that(output).containsInAnyOrder(expectedResult);
-    p.run();
-  }
-
-  @Test(expected = NullPointerException.class)
-  public void testJoinLeftCollectionNull() {
-    Join.rightOuterJoin(null, p.apply(Create.of(listRightOfKv)), "");
-  }
-
-  @Test(expected = NullPointerException.class)
-  public void testJoinRightCollectionNull() {
-    Join.rightOuterJoin(p.apply(Create.of(leftListOfKv)), null, -1L);
-  }
-
-  @Test(expected = NullPointerException.class)
-  public void testJoinNullValueIsNull() {
-    Join.rightOuterJoin(
-        p.apply("CreateLeft", Create.of(leftListOfKv)),
-        p.apply("CreateRight", Create.of(listRightOfKv)),
-        null);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/InnerJoinTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/InnerJoinTest.java
 
b/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/InnerJoinTest.java
new file mode 100644
index 0000000..c479a65
--- /dev/null
+++ 
b/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/InnerJoinTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.contrib.joinlibrary;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.testing.PAssert;
+import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This test Inner Join functionality.
+ */
+public class InnerJoinTest {
+
+  Pipeline p;
+  List<KV<String, Long>> leftListOfKv;
+  List<KV<String, String>> listRightOfKv;
+  List<KV<String, KV<Long, String>>> expectedResult;
+
+  @Before
+  public void setup() {
+
+    p = TestPipeline.create();
+    leftListOfKv = new ArrayList<>();
+    listRightOfKv = new ArrayList<>();
+
+    expectedResult = new ArrayList<>();
+  }
+
+  @Test
+  public void testJoinOneToOneMapping() {
+    leftListOfKv.add(KV.of("Key1", 5L));
+    leftListOfKv.add(KV.of("Key2", 4L));
+    PCollection<KV<String, Long>> leftCollection =
+        p.apply("CreateLeft", Create.of(leftListOfKv));
+
+    listRightOfKv.add(KV.of("Key1", "foo"));
+    listRightOfKv.add(KV.of("Key2", "bar"));
+    PCollection<KV<String, String>> rightCollection =
+        p.apply("CreateRight", Create.of(listRightOfKv));
+
+    PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin(
+      leftCollection, rightCollection);
+
+    expectedResult.add(KV.of("Key1", KV.of(5L, "foo")));
+    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+    PAssert.that(output).containsInAnyOrder(expectedResult);
+
+    p.run();
+  }
+
+  @Test
+  public void testJoinOneToManyMapping() {
+    leftListOfKv.add(KV.of("Key2", 4L));
+    PCollection<KV<String, Long>> leftCollection = p
+        .apply("CreateLeft", Create.of(leftListOfKv));
+
+    listRightOfKv.add(KV.of("Key2", "bar"));
+    listRightOfKv.add(KV.of("Key2", "gazonk"));
+    PCollection<KV<String, String>> rightCollection = p
+        .apply("CreateRight", Create.of(listRightOfKv));
+
+    PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin(
+      leftCollection, rightCollection);
+
+    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+    expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk")));
+    PAssert.that(output).containsInAnyOrder(expectedResult);
+
+    p.run();
+  }
+
+  @Test
+  public void testJoinManyToOneMapping() {
+    leftListOfKv.add(KV.of("Key2", 4L));
+    leftListOfKv.add(KV.of("Key2", 6L));
+    PCollection<KV<String, Long>> leftCollection = p
+        .apply("CreateLeft", Create.of(leftListOfKv));
+
+    listRightOfKv.add(KV.of("Key2", "bar"));
+    PCollection<KV<String, String>> rightCollection = p
+        .apply("CreateRight", Create.of(listRightOfKv));
+
+    PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin(
+      leftCollection, rightCollection);
+
+    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+    expectedResult.add(KV.of("Key2", KV.of(6L, "bar")));
+    PAssert.that(output).containsInAnyOrder(expectedResult);
+
+    p.run();
+  }
+
+  @Test
+  public void testJoinNoneToNoneMapping() {
+    leftListOfKv.add(KV.of("Key2", 4L));
+    PCollection<KV<String, Long>> leftCollection = p
+        .apply("CreateLeft", Create.of(leftListOfKv));
+
+    listRightOfKv.add(KV.of("Key3", "bar"));
+    PCollection<KV<String, String>> rightCollection = p
+        .apply("CreateRight", Create.of(listRightOfKv));
+
+    PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin(
+      leftCollection, rightCollection);
+
+    PAssert.that(output).containsInAnyOrder(expectedResult);
+    p.run();
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testJoinLeftCollectionNull() {
+    Join.innerJoin(null, p.apply(Create.of(listRightOfKv)));
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testJoinRightCollectionNull() {
+    Join.innerJoin(p.apply(Create.of(leftListOfKv)), null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterLeftJoinTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterLeftJoinTest.java
 
b/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterLeftJoinTest.java
new file mode 100644
index 0000000..35c8655
--- /dev/null
+++ 
b/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterLeftJoinTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.contrib.joinlibrary;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.testing.PAssert;
+import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * This test Outer Left Join functionality.
+ */
+public class OuterLeftJoinTest {
+
+  Pipeline p;
+  List<KV<String, Long>> leftListOfKv;
+  List<KV<String, String>> listRightOfKv;
+  List<KV<String, KV<Long, String>>> expectedResult;
+
+  @Before
+  public void setup() {
+
+    p = TestPipeline.create();
+    leftListOfKv = new ArrayList<>();
+    listRightOfKv = new ArrayList<>();
+
+    expectedResult = new ArrayList<>();
+  }
+
+  @Test
+  public void testJoinOneToOneMapping() {
+    leftListOfKv.add(KV.of("Key1", 5L));
+    leftListOfKv.add(KV.of("Key2", 4L));
+    PCollection<KV<String, Long>> leftCollection = p
+        .apply("CreateLeft", Create.of(leftListOfKv));
+
+    listRightOfKv.add(KV.of("Key1", "foo"));
+    listRightOfKv.add(KV.of("Key2", "bar"));
+    PCollection<KV<String, String>> rightCollection = p
+        .apply("CreateRight", Create.of(listRightOfKv));
+
+    PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin(
+      leftCollection, rightCollection, "");
+
+    expectedResult.add(KV.of("Key1", KV.of(5L, "foo")));
+    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+    PAssert.that(output).containsInAnyOrder(expectedResult);
+
+    p.run();
+  }
+
+  @Test
+  public void testJoinOneToManyMapping() {
+    leftListOfKv.add(KV.of("Key2", 4L));
+    PCollection<KV<String, Long>> leftCollection = p
+        .apply("CreateLeft", Create.of(leftListOfKv));
+
+    listRightOfKv.add(KV.of("Key2", "bar"));
+    listRightOfKv.add(KV.of("Key2", "gazonk"));
+    PCollection<KV<String, String>> rightCollection = p
+        .apply("CreateRight", Create.of(listRightOfKv));
+
+    PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin(
+      leftCollection, rightCollection, "");
+
+    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+    expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk")));
+    PAssert.that(output).containsInAnyOrder(expectedResult);
+
+    p.run();
+  }
+
+  @Test
+  public void testJoinManyToOneMapping() {
+    leftListOfKv.add(KV.of("Key2", 4L));
+    leftListOfKv.add(KV.of("Key2", 6L));
+    PCollection<KV<String, Long>> leftCollection = p
+        .apply("CreateLeft", Create.of(leftListOfKv));
+
+    listRightOfKv.add(KV.of("Key2", "bar"));
+    PCollection<KV<String, String>> rightCollection = p
+        .apply("CreateRight", Create.of(listRightOfKv));
+
+    PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin(
+      leftCollection, rightCollection, "");
+
+    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+    expectedResult.add(KV.of("Key2", KV.of(6L, "bar")));
+    PAssert.that(output).containsInAnyOrder(expectedResult);
+
+    p.run();
+  }
+
+  @Test
+  public void testJoinOneToNoneMapping() {
+    leftListOfKv.add(KV.of("Key2", 4L));
+    PCollection<KV<String, Long>> leftCollection = p
+        .apply("CreateLeft", Create.of(leftListOfKv));
+
+    listRightOfKv.add(KV.of("Key3", "bar"));
+    PCollection<KV<String, String>> rightCollection = p
+        .apply("CreateRight", Create.of(listRightOfKv));
+
+    PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin(
+      leftCollection, rightCollection, "");
+
+    expectedResult.add(KV.of("Key2", KV.of(4L, "")));
+    PAssert.that(output).containsInAnyOrder(expectedResult);
+    p.run();
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testJoinLeftCollectionNull() {
+    Join.leftOuterJoin(null, p.apply(Create.of(listRightOfKv)), "");
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testJoinRightCollectionNull() {
+    Join.leftOuterJoin(p.apply(Create.of(leftListOfKv)), null, "");
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testJoinNullValueIsNull() {
+    Join.leftOuterJoin(
+        p.apply("CreateLeft", Create.of(leftListOfKv)),
+        p.apply("CreateRight", Create.of(listRightOfKv)),
+        null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterRightJoinTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterRightJoinTest.java
 
b/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterRightJoinTest.java
new file mode 100644
index 0000000..9cc6785
--- /dev/null
+++ 
b/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterRightJoinTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.contrib.joinlibrary;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.testing.PAssert;
+import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * This test Outer Right Join functionality.
+ */
+public class OuterRightJoinTest {
+
+  Pipeline p;
+  List<KV<String, Long>> leftListOfKv;
+  List<KV<String, String>> listRightOfKv;
+  List<KV<String, KV<Long, String>>> expectedResult;
+
+  @Before
+  public void setup() {
+
+    p = TestPipeline.create();
+    leftListOfKv = new ArrayList<>();
+    listRightOfKv = new ArrayList<>();
+
+    expectedResult = new ArrayList<>();
+  }
+
+  @Test
+  public void testJoinOneToOneMapping() {
+    leftListOfKv.add(KV.of("Key1", 5L));
+    leftListOfKv.add(KV.of("Key2", 4L));
+    PCollection<KV<String, Long>> leftCollection = p
+        .apply("CreateLeft", Create.of(leftListOfKv));
+
+    listRightOfKv.add(KV.of("Key1", "foo"));
+    listRightOfKv.add(KV.of("Key2", "bar"));
+    PCollection<KV<String, String>> rightCollection = p
+        .apply("CreateRight", Create.of(listRightOfKv));
+
+    PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin(
+      leftCollection, rightCollection, -1L);
+
+    expectedResult.add(KV.of("Key1", KV.of(5L, "foo")));
+    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+    PAssert.that(output).containsInAnyOrder(expectedResult);
+
+    p.run();
+  }
+
+  @Test
+  public void testJoinOneToManyMapping() {
+    leftListOfKv.add(KV.of("Key2", 4L));
+    PCollection<KV<String, Long>> leftCollection = p
+        .apply("CreateLeft", Create.of(leftListOfKv));
+
+    listRightOfKv.add(KV.of("Key2", "bar"));
+    listRightOfKv.add(KV.of("Key2", "gazonk"));
+    PCollection<KV<String, String>> rightCollection = p
+        .apply("CreateRight", Create.of(listRightOfKv));
+
+    PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin(
+      leftCollection, rightCollection, -1L);
+
+    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+    expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk")));
+    PAssert.that(output).containsInAnyOrder(expectedResult);
+
+    p.run();
+  }
+
+  @Test
+  public void testJoinManyToOneMapping() {
+    leftListOfKv.add(KV.of("Key2", 4L));
+    leftListOfKv.add(KV.of("Key2", 6L));
+    PCollection<KV<String, Long>> leftCollection = p
+        .apply("CreateLeft", Create.of(leftListOfKv));
+
+    listRightOfKv.add(KV.of("Key2", "bar"));
+    PCollection<KV<String, String>> rightCollection = p
+        .apply("CreateRight", Create.of(listRightOfKv));
+
+    PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin(
+      leftCollection, rightCollection, -1L);
+
+    expectedResult.add(KV.of("Key2", KV.of(4L, "bar")));
+    expectedResult.add(KV.of("Key2", KV.of(6L, "bar")));
+    PAssert.that(output).containsInAnyOrder(expectedResult);
+
+    p.run();
+  }
+
+  @Test
+  public void testJoinNoneToOneMapping() {
+    leftListOfKv.add(KV.of("Key2", 4L));
+    PCollection<KV<String, Long>> leftCollection = p
+        .apply("CreateLeft", Create.of(leftListOfKv));
+
+    listRightOfKv.add(KV.of("Key3", "bar"));
+    PCollection<KV<String, String>> rightCollection = p
+        .apply("CreateRight", Create.of(listRightOfKv));
+
+    PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin(
+      leftCollection, rightCollection, -1L);
+
+    expectedResult.add(KV.of("Key3", KV.of(-1L, "bar")));
+    PAssert.that(output).containsInAnyOrder(expectedResult);
+    p.run();
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testJoinLeftCollectionNull() {
+    Join.rightOuterJoin(null, p.apply(Create.of(listRightOfKv)), "");
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testJoinRightCollectionNull() {
+    Join.rightOuterJoin(p.apply(Create.of(leftListOfKv)), null, -1L);
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testJoinNullValueIsNull() {
+    Join.rightOuterJoin(
+        p.apply("CreateLeft", Create.of(leftListOfKv)),
+        p.apply("CreateRight", Create.of(listRightOfKv)),
+        null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/main/java/com/google/cloud/dataflow/examples/DebuggingWordCount.java
----------------------------------------------------------------------
diff --git 
a/examples/java/src/main/java/com/google/cloud/dataflow/examples/DebuggingWordCount.java
 
b/examples/java/src/main/java/com/google/cloud/dataflow/examples/DebuggingWordCount.java
deleted file mode 100644
index 7134bca..0000000
--- 
a/examples/java/src/main/java/com/google/cloud/dataflow/examples/DebuggingWordCount.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.examples;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.testing.PAssert;
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.Sum;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.regex.Pattern;
-
-
-/**
- * An example that verifies word counts in Shakespeare and includes Dataflow 
best practices.
- *
- * <p>This class, {@link DebuggingWordCount}, is the third in a series of four 
successively more
- * detailed 'word count' examples. You may first want to take a look at {@link 
MinimalWordCount}
- * and {@link WordCount}. After you've looked at this example, then see the
- * {@link WindowedWordCount} pipeline, for introduction of additional concepts.
- *
- * <p>Basic concepts, also in the MinimalWordCount and WordCount examples:
- * Reading text files; counting a PCollection; executing a Pipeline both 
locally
- * and using the Dataflow service; defining DoFns.
- *
- * <p>New Concepts:
- * <pre>
- *   1. Logging to Cloud Logging
- *   2. Controlling Dataflow worker log levels
- *   3. Creating a custom aggregator
- *   4. Testing your Pipeline via PAssert
- * </pre>
- *
- * <p>To execute this pipeline locally, specify general pipeline configuration:
- * <pre>{@code
- *   --project=YOUR_PROJECT_ID
- * }
- * </pre>
- *
- * <p>To execute this pipeline using the Dataflow service and the additional 
logging discussed
- * below, specify pipeline configuration:
- * <pre>{@code
- *   --project=YOUR_PROJECT_ID
- *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
- *   --runner=BlockingDataflowPipelineRunner
- *   --workerLogLevelOverrides={"com.google.cloud.dataflow.examples":"DEBUG"}
- * }
- * </pre>
- *
- * <p>Note that when you run via <code>mvn exec</code>, you may need to escape
- * the quotations as appropriate for your shell. For example, in 
<code>bash</code>:
- * <pre>
- * mvn compile exec:java ... \
- *   -Dexec.args="... \
- *     
--workerLogLevelOverrides={\\\"com.google.cloud.dataflow.examples\\\":\\\"DEBUG\\\"}"
- * </pre>
- *
- * <p>Concept #2: Dataflow workers which execute user code are configured to 
log to Cloud
- * Logging by default at "INFO" log level and higher. One may override log 
levels for specific
- * logging namespaces by specifying:
- * <pre><code>
- *   --workerLogLevelOverrides={"Name1":"Level1","Name2":"Level2",...}
- * </code></pre>
- * For example, by specifying:
- * <pre><code>
- *   --workerLogLevelOverrides={"com.google.cloud.dataflow.examples":"DEBUG"}
- * </code></pre>
- * when executing this pipeline using the Dataflow service, Cloud Logging 
would contain only
- * "DEBUG" or higher level logs for the {@code 
com.google.cloud.dataflow.examples} package in
- * addition to the default "INFO" or higher level logs. In addition, the 
default Dataflow worker
- * logging configuration can be overridden by specifying
- * {@code --defaultWorkerLogLevel=<one of TRACE, DEBUG, INFO, WARN, ERROR>}. 
For example,
- * by specifying {@code --defaultWorkerLogLevel=DEBUG} when executing this 
pipeline with
- * the Dataflow service, Cloud Logging would contain all "DEBUG" or higher 
level logs. Note
- * that changing the default worker log level to TRACE or DEBUG will 
significantly increase
- * the amount of logs output.
- *
- * <p>The input file defaults to {@code 
gs://dataflow-samples/shakespeare/kinglear.txt} and can be
- * overridden with {@code --inputFile}.
- */
-public class DebuggingWordCount {
-  /** A DoFn that filters for a specific key based upon a regular expression. 
*/
-  public static class FilterTextFn extends DoFn<KV<String, Long>, KV<String, 
Long>> {
-    /**
-     * Concept #1: The logger below uses the fully qualified class name of 
FilterTextFn
-     * as the logger. All log statements emitted by this logger will be 
referenced by this name
-     * and will be visible in the Cloud Logging UI. Learn more at 
https://cloud.google.com/logging
-     * about the Cloud Logging UI.
-     */
-    private static final Logger LOG = 
LoggerFactory.getLogger(FilterTextFn.class);
-
-    private final Pattern filter;
-    public FilterTextFn(String pattern) {
-      filter = Pattern.compile(pattern);
-    }
-
-    /**
-     * Concept #3: A custom aggregator can track values in your pipeline as it 
runs. Those
-     * values will be displayed in the Dataflow Monitoring UI when this 
pipeline is run using the
-     * Dataflow service. These aggregators below track the number of matched 
and unmatched words.
-     * Learn more at 
https://cloud.google.com/dataflow/pipelines/dataflow-monitoring-intf about
-     * the Dataflow Monitoring UI.
-     */
-    private final Aggregator<Long, Long> matchedWords =
-        createAggregator("matchedWords", new Sum.SumLongFn());
-    private final Aggregator<Long, Long> unmatchedWords =
-        createAggregator("umatchedWords", new Sum.SumLongFn());
-
-    @Override
-    public void processElement(ProcessContext c) {
-      if (filter.matcher(c.element().getKey()).matches()) {
-        // Log at the "DEBUG" level each element that we match. When executing 
this pipeline
-        // using the Dataflow service, these log lines will appear in the 
Cloud Logging UI
-        // only if the log level is set to "DEBUG" or lower.
-        LOG.debug("Matched: " + c.element().getKey());
-        matchedWords.addValue(1L);
-        c.output(c.element());
-      } else {
-        // Log at the "TRACE" level each element that is not matched. 
Different log levels
-        // can be used to control the verbosity of logging providing an 
effective mechanism
-        // to filter less important information.
-        LOG.trace("Did not match: " + c.element().getKey());
-        unmatchedWords.addValue(1L);
-      }
-    }
-  }
-
-  /**
-   * Options supported by {@link DebuggingWordCount}.
-   *
-   * <p>Inherits standard configuration options and all options defined in
-   * {@link WordCount.WordCountOptions}.
-   */
-  public static interface WordCountOptions extends WordCount.WordCountOptions {
-
-    @Description("Regex filter pattern to use in DebuggingWordCount. "
-        + "Only words matching this pattern will be counted.")
-    @Default.String("Flourish|stomach")
-    String getFilterPattern();
-    void setFilterPattern(String value);
-  }
-
-  public static void main(String[] args) {
-    WordCountOptions options = 
PipelineOptionsFactory.fromArgs(args).withValidation()
-      .as(WordCountOptions.class);
-    Pipeline p = Pipeline.create(options);
-
-    PCollection<KV<String, Long>> filteredWords =
-        p.apply(TextIO.Read.named("ReadLines").from(options.getInputFile()))
-         .apply(new WordCount.CountWords())
-         .apply(ParDo.of(new FilterTextFn(options.getFilterPattern())));
-
-    /**
-     * Concept #4: PAssert is a set of convenient PTransforms in the style of
-     * Hamcrest's collection matchers that can be used when writing Pipeline 
level tests
-     * to validate the contents of PCollections. PAssert is best used in unit 
tests
-     * with small data sets but is demonstrated here as a teaching tool.
-     *
-     * <p>Below we verify that the set of filtered words matches our expected 
counts. Note
-     * that PAssert does not provide any output and that successful completion 
of the
-     * Pipeline implies that the expectations were met. Learn more at
-     * https://cloud.google.com/dataflow/pipelines/testing-your-pipeline on 
how to test
-     * your Pipeline and see {@link DebuggingWordCountTest} for an example 
unit test.
-     */
-    List<KV<String, Long>> expectedResults = Arrays.asList(
-        KV.of("Flourish", 3L),
-        KV.of("stomach", 1L));
-    PAssert.that(filteredWords).containsInAnyOrder(expectedResults);
-
-    p.run();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/main/java/com/google/cloud/dataflow/examples/MinimalWordCount.java
----------------------------------------------------------------------
diff --git 
a/examples/java/src/main/java/com/google/cloud/dataflow/examples/MinimalWordCount.java
 
b/examples/java/src/main/java/com/google/cloud/dataflow/examples/MinimalWordCount.java
deleted file mode 100644
index f3be5a3..0000000
--- 
a/examples/java/src/main/java/com/google/cloud/dataflow/examples/MinimalWordCount.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.examples;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner;
-import com.google.cloud.dataflow.sdk.transforms.Count;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.MapElements;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.SimpleFunction;
-import com.google.cloud.dataflow.sdk.values.KV;
-
-
-/**
- * An example that counts words in Shakespeare.
- *
- * <p>This class, {@link MinimalWordCount}, is the first in a series of four 
successively more
- * detailed 'word count' examples. Here, for simplicity, we don't show any 
error-checking or
- * argument processing, and focus on construction of the pipeline, which 
chains together the
- * application of core transforms.
- *
- * <p>Next, see the {@link WordCount} pipeline, then the {@link 
DebuggingWordCount}, and finally
- * the {@link WindowedWordCount} pipeline, for more detailed examples that 
introduce additional
- * concepts.
- *
- * <p>Concepts:
- * <pre>
- *   1. Reading data from text files
- *   2. Specifying 'inline' transforms
- *   3. Counting a PCollection
- *   4. Writing data to Cloud Storage as text files
- * </pre>
- *
- * <p>To execute this pipeline, first edit the code to set your project ID, 
the staging
- * location, and the output location. The specified GCS bucket(s) must already 
exist.
- *
- * <p>Then, run the pipeline as described in the README. It will be deployed 
and run using the
- * Dataflow service. No args are required to run the pipeline. You can see the 
results in your
- * output bucket in the GCS browser.
- */
-public class MinimalWordCount {
-
-  public static void main(String[] args) {
-    // Create a DataflowPipelineOptions object. This object lets us set 
various execution
-    // options for our pipeline, such as the associated Cloud Platform project 
and the location
-    // in Google Cloud Storage to stage files.
-    DataflowPipelineOptions options = PipelineOptionsFactory.create()
-      .as(DataflowPipelineOptions.class);
-    options.setRunner(BlockingDataflowPipelineRunner.class);
-    // CHANGE 1/3: Your project ID is required in order to run your pipeline 
on the Google Cloud.
-    options.setProject("SET_YOUR_PROJECT_ID_HERE");
-    // CHANGE 2/3: Your Google Cloud Storage path is required for staging 
local files.
-    
options.setStagingLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_STAGING_DIRECTORY");
-
-    // Create the Pipeline object with the options we defined above.
-    Pipeline p = Pipeline.create(options);
-
-    // Apply the pipeline's transforms.
-
-    // Concept #1: Apply a root transform to the pipeline; in this case, 
TextIO.Read to read a set
-    // of input text files. TextIO.Read returns a PCollection where each 
element is one line from
-    // the input text (a set of Shakespeare's texts).
-    p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*"))
-     // Concept #2: Apply a ParDo transform to our PCollection of text lines. 
This ParDo invokes a
-     // DoFn (defined in-line) on each element that tokenizes the text line 
into individual words.
-     // The ParDo returns a PCollection<String>, where each element is an 
individual word in
-     // Shakespeare's collected texts.
-     .apply(ParDo.named("ExtractWords").of(new DoFn<String, String>() {
-                       @Override
-                       public void processElement(ProcessContext c) {
-                         for (String word : c.element().split("[^a-zA-Z']+")) {
-                           if (!word.isEmpty()) {
-                             c.output(word);
-                           }
-                         }
-                       }
-                     }))
-     // Concept #3: Apply the Count transform to our PCollection of individual 
words. The Count
-     // transform returns a new PCollection of key/value pairs, where each key 
represents a unique
-     // word in the text. The associated value is the occurrence count for 
that word.
-     .apply(Count.<String>perElement())
-     // Apply a MapElements transform that formats our PCollection of word 
counts into a printable
-     // string, suitable for writing to an output file.
-     .apply("FormatResults", MapElements.via(new SimpleFunction<KV<String, 
Long>, String>() {
-                       @Override
-                       public String apply(KV<String, Long> input) {
-                         return input.getKey() + ": " + input.getValue();
-                       }
-                     }))
-     // Concept #4: Apply a write transform, TextIO.Write, at the end of the 
pipeline.
-     // TextIO.Write writes the contents of a PCollection (in this case, our 
PCollection of
-     // formatted strings) to a series of text files in Google Cloud Storage.
-     // CHANGE 3/3: The Google Cloud Storage path is required for outputting 
the results to.
-     .apply(TextIO.Write.to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX"));
-
-    // Run the pipeline.
-    p.run();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/main/java/com/google/cloud/dataflow/examples/WindowedWordCount.java
----------------------------------------------------------------------
diff --git 
a/examples/java/src/main/java/com/google/cloud/dataflow/examples/WindowedWordCount.java
 
b/examples/java/src/main/java/com/google/cloud/dataflow/examples/WindowedWordCount.java
deleted file mode 100644
index 8f49dd2..0000000
--- 
a/examples/java/src/main/java/com/google/cloud/dataflow/examples/WindowedWordCount.java
+++ /dev/null
@@ -1,270 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.examples;
-
-import com.google.api.services.bigquery.model.TableFieldSchema;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-import com.google.cloud.dataflow.examples.common.DataflowExampleOptions;
-import com.google.cloud.dataflow.examples.common.DataflowExampleUtils;
-import com.google.cloud.dataflow.examples.common.ExampleBigQueryTableOptions;
-import com.google.cloud.dataflow.examples.common.ExamplePubsubTopicOptions;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.PipelineResult;
-import com.google.cloud.dataflow.sdk.io.BigQueryIO;
-import com.google.cloud.dataflow.sdk.io.PubsubIO;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-
-/**
- * An example that counts words in text, and can run over either unbounded or 
bounded input
- * collections.
- *
- * <p>This class, {@link WindowedWordCount}, is the last in a series of four 
successively more
- * detailed 'word count' examples. First take a look at {@link 
MinimalWordCount},
- * {@link WordCount}, and {@link DebuggingWordCount}.
- *
- * <p>Basic concepts, also in the MinimalWordCount, WordCount, and 
DebuggingWordCount examples:
- * Reading text files; counting a PCollection; writing to GCS; executing a 
Pipeline both locally
- * and using the Dataflow service; defining DoFns; creating a custom 
aggregator;
- * user-defined PTransforms; defining PipelineOptions.
- *
- * <p>New Concepts:
- * <pre>
- *   1. Unbounded and bounded pipeline input modes
- *   2. Adding timestamps to data
- *   3. PubSub topics as sources
- *   4. Windowing
- *   5. Re-using PTransforms over windowed PCollections
- *   6. Writing to BigQuery
- * </pre>
- *
- * <p>To execute this pipeline locally, specify general pipeline configuration:
- * <pre>{@code
- *   --project=YOUR_PROJECT_ID
- * }
- * </pre>
- *
- * <p>To execute this pipeline using the Dataflow service, specify pipeline 
configuration:
- * <pre>{@code
- *   --project=YOUR_PROJECT_ID
- *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
- *   --runner=BlockingDataflowPipelineRunner
- * }
- * </pre>
- *
- * <p>Optionally specify the input file path via:
- * {@code --inputFile=gs://INPUT_PATH},
- * which defaults to {@code gs://dataflow-samples/shakespeare/kinglear.txt}.
- *
- * <p>Specify an output BigQuery dataset and optionally, a table for the 
output. If you don't
- * specify the table, one will be created for you using the job name. If you 
don't specify the
- * dataset, a dataset called {@code dataflow-examples} must already exist in 
your project.
- * {@code --bigQueryDataset=YOUR-DATASET --bigQueryTable=YOUR-NEW-TABLE-NAME}.
- *
- * <p>Decide whether you want your pipeline to run with 'bounded' (such as 
files in GCS) or
- * 'unbounded' input (such as a PubSub topic). To run with unbounded input, set
- * {@code --unbounded=true}. Then, optionally specify the Google Cloud PubSub 
topic to read from
- * via {@code --pubsubTopic=projects/PROJECT_ID/topics/YOUR_TOPIC_NAME}. If 
the topic does not
- * exist, the pipeline will create one for you. It will delete this topic when 
it terminates.
- * The pipeline will automatically launch an auxiliary batch pipeline to 
populate the given PubSub
- * topic with the contents of the {@code --inputFile}, in order to make the 
example easy to run.
- * If you want to use an independently-populated PubSub topic, indicate this 
by setting
- * {@code --inputFile=""}. In that case, the auxiliary pipeline will not be 
started.
- *
- * <p>By default, the pipeline will do fixed windowing, on 1-minute windows.  
You can
- * change this interval by setting the {@code --windowSize} parameter, e.g. 
{@code --windowSize=10}
- * for 10-minute windows.
- */
-public class WindowedWordCount {
-    private static final Logger LOG = 
LoggerFactory.getLogger(WindowedWordCount.class);
-    static final int WINDOW_SIZE = 1;  // Default window duration in minutes
-
-  /**
-   * Concept #2: A DoFn that sets the data element timestamp. This is a silly 
method, just for
-   * this example, for the bounded data case.
-   *
-   * <p>Imagine that many ghosts of Shakespeare are all typing madly at the 
same time to recreate
-   * his masterworks. Each line of the corpus will get a random associated 
timestamp somewhere in a
-   * 2-hour period.
-   */
-  static class AddTimestampFn extends DoFn<String, String> {
-    private static final long RAND_RANGE = 7200000; // 2 hours in ms
-
-    @Override
-    public void processElement(ProcessContext c) {
-      // Generate a timestamp that falls somewhere in the past two hours.
-      long randomTimestamp = System.currentTimeMillis()
-        - (int) (Math.random() * RAND_RANGE);
-      /**
-       * Concept #2: Set the data element with that timestamp.
-       */
-      c.outputWithTimestamp(c.element(), new Instant(randomTimestamp));
-    }
-  }
-
-  /** A DoFn that converts a Word and Count into a BigQuery table row. */
-  static class FormatAsTableRowFn extends DoFn<KV<String, Long>, TableRow> {
-    @Override
-    public void processElement(ProcessContext c) {
-      TableRow row = new TableRow()
-          .set("word", c.element().getKey())
-          .set("count", c.element().getValue())
-          // include a field for the window timestamp
-         .set("window_timestamp", c.timestamp().toString());
-      c.output(row);
-    }
-  }
-
-  /**
-   * Helper method that defines the BigQuery schema used for the output.
-   */
-  private static TableSchema getSchema() {
-    List<TableFieldSchema> fields = new ArrayList<>();
-    fields.add(new TableFieldSchema().setName("word").setType("STRING"));
-    fields.add(new TableFieldSchema().setName("count").setType("INTEGER"));
-    fields.add(new 
TableFieldSchema().setName("window_timestamp").setType("TIMESTAMP"));
-    TableSchema schema = new TableSchema().setFields(fields);
-    return schema;
-  }
-
-  /**
-   * Concept #6: We'll stream the results to a BigQuery table. The BigQuery 
output source is one
-   * that supports both bounded and unbounded data. This is a helper method 
that creates a
-   * TableReference from input options, to tell the pipeline where to write 
its BigQuery results.
-   */
-  private static TableReference getTableReference(Options options) {
-    TableReference tableRef = new TableReference();
-    tableRef.setProjectId(options.getProject());
-    tableRef.setDatasetId(options.getBigQueryDataset());
-    tableRef.setTableId(options.getBigQueryTable());
-    return tableRef;
-  }
-
-  /**
-   * Options supported by {@link WindowedWordCount}.
-   *
-   * <p>Inherits standard example configuration options, which allow 
specification of the BigQuery
-   * table and the PubSub topic, as well as the {@link 
WordCount.WordCountOptions} support for
-   * specification of the input file.
-   */
-  public static interface Options extends WordCount.WordCountOptions,
-      DataflowExampleOptions, ExamplePubsubTopicOptions, 
ExampleBigQueryTableOptions {
-    @Description("Fixed window duration, in minutes")
-    @Default.Integer(WINDOW_SIZE)
-    Integer getWindowSize();
-    void setWindowSize(Integer value);
-
-    @Description("Whether to run the pipeline with unbounded input")
-    boolean isUnbounded();
-    void setUnbounded(boolean value);
-  }
-
-  public static void main(String[] args) throws IOException {
-    Options options = 
PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
-    options.setBigQuerySchema(getSchema());
-    // DataflowExampleUtils creates the necessary input sources to simplify 
execution of this
-    // Pipeline.
-    DataflowExampleUtils exampleDataflowUtils = new 
DataflowExampleUtils(options,
-      options.isUnbounded());
-
-    Pipeline pipeline = Pipeline.create(options);
-
-    /**
-     * Concept #1: the Dataflow SDK lets us run the same pipeline with either 
a bounded or
-     * unbounded input source.
-     */
-    PCollection<String> input;
-    if (options.isUnbounded()) {
-      LOG.info("Reading from PubSub.");
-      /**
-       * Concept #3: Read from the PubSub topic. A topic will be created if it 
wasn't
-       * specified as an argument. The data elements' timestamps will come 
from the pubsub
-       * injection.
-       */
-      input = pipeline
-          .apply(PubsubIO.Read.topic(options.getPubsubTopic()));
-    } else {
-      /** Else, this is a bounded pipeline. Read from the GCS file. */
-      input = pipeline
-          .apply(TextIO.Read.from(options.getInputFile()))
-          // Concept #2: Add an element timestamp, using an artificial time 
just to show windowing.
-          // See AddTimestampFn for more detail on this.
-          .apply(ParDo.of(new AddTimestampFn()));
-    }
-
-    /**
-     * Concept #4: Window into fixed windows. The fixed window size for this 
example defaults to 1
-     * minute (you can change this with a command-line option). See the 
documentation for more
-     * information on how fixed windows work, and for information on the other 
types of windowing
-     * available (e.g., sliding windows).
-     */
-    PCollection<String> windowedWords = input
-      .apply(Window.<String>into(
-        FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))));
-
-    /**
-     * Concept #5: Re-use our existing CountWords transform that does not have 
knowledge of
-     * windows over a PCollection containing windowed values.
-     */
-    PCollection<KV<String, Long>> wordCounts = windowedWords.apply(new 
WordCount.CountWords());
-
-    /**
-     * Concept #6: Format the results for a BigQuery table, then write to 
BigQuery.
-     * The BigQuery output source supports both bounded and unbounded data.
-     */
-    wordCounts.apply(ParDo.of(new FormatAsTableRowFn()))
-        .apply(BigQueryIO.Write
-          .to(getTableReference(options))
-          .withSchema(getSchema())
-          
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
-          
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
-
-    PipelineResult result = pipeline.run();
-
-    /**
-     * To mock unbounded input from PubSub, we'll now start an auxiliary 
'injector' pipeline that
-     * runs for a limited time, and publishes to the input PubSub topic.
-     *
-     * With an unbounded input source, you will need to explicitly shut down 
this pipeline when you
-     * are done with it, so that you do not continue to be charged for the 
instances. You can do
-     * this via a ctrl-C from the command line, or from the developer's 
console UI for Dataflow
-     * pipelines. The PubSub topic will also be deleted at this time.
-     */
-    exampleDataflowUtils.mockUnboundedSource(options.getInputFile(), result);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/main/java/com/google/cloud/dataflow/examples/WordCount.java
----------------------------------------------------------------------
diff --git 
a/examples/java/src/main/java/com/google/cloud/dataflow/examples/WordCount.java 
b/examples/java/src/main/java/com/google/cloud/dataflow/examples/WordCount.java
deleted file mode 100644
index ad08d13..0000000
--- 
a/examples/java/src/main/java/com/google/cloud/dataflow/examples/WordCount.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.examples;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.io.TextIO;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.DefaultValueFactory;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.transforms.Aggregator;
-import com.google.cloud.dataflow.sdk.transforms.Count;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.MapElements;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.SimpleFunction;
-import com.google.cloud.dataflow.sdk.transforms.Sum;
-import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-
-/**
- * An example that counts words in Shakespeare and includes Dataflow best 
practices.
- *
- * <p>This class, {@link WordCount}, is the second in a series of four 
successively more detailed
- * 'word count' examples. You may first want to take a look at {@link 
MinimalWordCount}.
- * After you've looked at this example, then see the {@link DebuggingWordCount}
- * pipeline, for introduction of additional concepts.
- *
- * <p>For a detailed walkthrough of this example, see
- *   <a href="https://cloud.google.com/dataflow/java-sdk/wordcount-example";>
- *   https://cloud.google.com/dataflow/java-sdk/wordcount-example
- *   </a>
- *
- * <p>Basic concepts, also in the MinimalWordCount example:
- * Reading text files; counting a PCollection; writing to GCS.
- *
- * <p>New Concepts:
- * <pre>
- *   1. Executing a Pipeline both locally and using the Dataflow service
- *   2. Using ParDo with static DoFns defined out-of-line
- *   3. Building a composite transform
- *   4. Defining your own pipeline options
- * </pre>
- *
- * <p>Concept #1: you can execute this pipeline either locally or using the 
Dataflow service.
- * These are now command-line options and not hard-coded as they were in the 
MinimalWordCount
- * example.
- * To execute this pipeline locally, specify general pipeline configuration:
- * <pre>{@code
- *   --project=YOUR_PROJECT_ID
- * }
- * </pre>
- * and a local output file or output prefix on GCS:
- * <pre>{@code
- *   --output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX]
- * }</pre>
- *
- * <p>To execute this pipeline using the Dataflow service, specify pipeline 
configuration:
- * <pre>{@code
- *   --project=YOUR_PROJECT_ID
- *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
- *   --runner=BlockingDataflowPipelineRunner
- * }
- * </pre>
- * and an output prefix on GCS:
- * <pre>{@code
- *   --output=gs://YOUR_OUTPUT_PREFIX
- * }</pre>
- *
- * <p>The input file defaults to {@code 
gs://dataflow-samples/shakespeare/kinglear.txt} and can be
- * overridden with {@code --inputFile}.
- */
-public class WordCount {
-
-  /**
-   * Concept #2: You can make your pipeline code less verbose by defining your 
DoFns statically out-
-   * of-line. This DoFn tokenizes lines of text into individual words; we pass 
it to a ParDo in the
-   * pipeline.
-   */
-  static class ExtractWordsFn extends DoFn<String, String> {
-    private final Aggregator<Long, Long> emptyLines =
-        createAggregator("emptyLines", new Sum.SumLongFn());
-
-    @Override
-    public void processElement(ProcessContext c) {
-      if (c.element().trim().isEmpty()) {
-        emptyLines.addValue(1L);
-      }
-
-      // Split the line into words.
-      String[] words = c.element().split("[^a-zA-Z']+");
-
-      // Output each word encountered into the output PCollection.
-      for (String word : words) {
-        if (!word.isEmpty()) {
-          c.output(word);
-        }
-      }
-    }
-  }
-
-  /** A SimpleFunction that converts a Word and Count into a printable string. 
*/
-  public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, 
String> {
-    @Override
-    public String apply(KV<String, Long> input) {
-      return input.getKey() + ": " + input.getValue();
-    }
-  }
-
-  /**
-   * A PTransform that converts a PCollection containing lines of text into a 
PCollection of
-   * formatted word counts.
-   *
-   * <p>Concept #3: This is a custom composite transform that bundles two 
transforms (ParDo and
-   * Count) as a reusable PTransform subclass. Using composite transforms 
allows for easy reuse,
-   * modular testing, and an improved monitoring experience.
-   */
-  public static class CountWords extends PTransform<PCollection<String>,
-      PCollection<KV<String, Long>>> {
-    @Override
-    public PCollection<KV<String, Long>> apply(PCollection<String> lines) {
-
-      // Convert lines of text into individual words.
-      PCollection<String> words = lines.apply(
-          ParDo.of(new ExtractWordsFn()));
-
-      // Count the number of times each word occurs.
-      PCollection<KV<String, Long>> wordCounts =
-          words.apply(Count.<String>perElement());
-
-      return wordCounts;
-    }
-  }
-
-  /**
-   * Options supported by {@link WordCount}.
-   *
-   * <p>Concept #4: Defining your own configuration options. Here, you can add 
your own arguments
-   * to be processed by the command-line parser, and specify default values 
for them. You can then
-   * access the options values in your pipeline code.
-   *
-   * <p>Inherits standard configuration options.
-   */
-  public static interface WordCountOptions extends PipelineOptions {
-    @Description("Path of the file to read from")
-    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
-    String getInputFile();
-    void setInputFile(String value);
-
-    @Description("Path of the file to write to")
-    @Default.InstanceFactory(OutputFactory.class)
-    String getOutput();
-    void setOutput(String value);
-
-    /**
-     * Returns "gs://${YOUR_STAGING_DIRECTORY}/counts.txt" as the default 
destination.
-     */
-    public static class OutputFactory implements DefaultValueFactory<String> {
-      @Override
-      public String create(PipelineOptions options) {
-        DataflowPipelineOptions dataflowOptions = 
options.as(DataflowPipelineOptions.class);
-        if (dataflowOptions.getStagingLocation() != null) {
-          return GcsPath.fromUri(dataflowOptions.getStagingLocation())
-              .resolve("counts.txt").toString();
-        } else {
-          throw new IllegalArgumentException("Must specify --output or 
--stagingLocation");
-        }
-      }
-    }
-
-  }
-
-  public static void main(String[] args) {
-    WordCountOptions options = 
PipelineOptionsFactory.fromArgs(args).withValidation()
-      .as(WordCountOptions.class);
-    Pipeline p = Pipeline.create(options);
-
-    // Concepts #2 and #3: Our pipeline applies the composite CountWords 
transform, and passes the
-    // static FormatAsTextFn() to the ParDo transform.
-    p.apply(TextIO.Read.named("ReadLines").from(options.getInputFile()))
-     .apply(new CountWords())
-     .apply(MapElements.via(new FormatAsTextFn()))
-     .apply(TextIO.Write.named("WriteCounts").to(options.getOutput()));
-
-    p.run();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/main/java/com/google/cloud/dataflow/examples/common/DataflowExampleOptions.java
----------------------------------------------------------------------
diff --git 
a/examples/java/src/main/java/com/google/cloud/dataflow/examples/common/DataflowExampleOptions.java
 
b/examples/java/src/main/java/com/google/cloud/dataflow/examples/common/DataflowExampleOptions.java
deleted file mode 100644
index 557e903..0000000
--- 
a/examples/java/src/main/java/com/google/cloud/dataflow/examples/common/DataflowExampleOptions.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.examples.common;
-
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.Description;
-
-/**
- * Options that can be used to configure the Dataflow examples.
- */
-public interface DataflowExampleOptions extends DataflowPipelineOptions {
-  @Description("Whether to keep jobs running on the Dataflow service after 
local process exit")
-  @Default.Boolean(false)
-  boolean getKeepJobsRunning();
-  void setKeepJobsRunning(boolean keepJobsRunning);
-
-  @Description("Number of workers to use when executing the injector pipeline")
-  @Default.Integer(1)
-  int getInjectorNumWorkers();
-  void setInjectorNumWorkers(int numWorkers);
-}

Reply via email to