http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/contrib/join-library/src/test/java/com/google/cloud/dataflow/contrib/joinlibrary/OuterLeftJoinTest.java ---------------------------------------------------------------------- diff --git a/contrib/join-library/src/test/java/com/google/cloud/dataflow/contrib/joinlibrary/OuterLeftJoinTest.java b/contrib/join-library/src/test/java/com/google/cloud/dataflow/contrib/joinlibrary/OuterLeftJoinTest.java deleted file mode 100644 index 35c8655..0000000 --- a/contrib/join-library/src/test/java/com/google/cloud/dataflow/contrib/joinlibrary/OuterLeftJoinTest.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.contrib.joinlibrary; - -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.testing.PAssert; -import com.google.cloud.dataflow.sdk.testing.TestPipeline; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PCollection; - -import org.junit.Before; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.List; - - -/** - * This test Outer Left Join functionality. - */ -public class OuterLeftJoinTest { - - Pipeline p; - List<KV<String, Long>> leftListOfKv; - List<KV<String, String>> listRightOfKv; - List<KV<String, KV<Long, String>>> expectedResult; - - @Before - public void setup() { - - p = TestPipeline.create(); - leftListOfKv = new ArrayList<>(); - listRightOfKv = new ArrayList<>(); - - expectedResult = new ArrayList<>(); - } - - @Test - public void testJoinOneToOneMapping() { - leftListOfKv.add(KV.of("Key1", 5L)); - leftListOfKv.add(KV.of("Key2", 4L)); - PCollection<KV<String, Long>> leftCollection = p - .apply("CreateLeft", Create.of(leftListOfKv)); - - listRightOfKv.add(KV.of("Key1", "foo")); - listRightOfKv.add(KV.of("Key2", "bar")); - PCollection<KV<String, String>> rightCollection = p - .apply("CreateRight", Create.of(listRightOfKv)); - - PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin( - leftCollection, rightCollection, ""); - - expectedResult.add(KV.of("Key1", KV.of(5L, "foo"))); - expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); - PAssert.that(output).containsInAnyOrder(expectedResult); - - p.run(); - } - - @Test - public void testJoinOneToManyMapping() { - leftListOfKv.add(KV.of("Key2", 4L)); - PCollection<KV<String, Long>> leftCollection = p - .apply("CreateLeft", Create.of(leftListOfKv)); - - listRightOfKv.add(KV.of("Key2", "bar")); - listRightOfKv.add(KV.of("Key2", "gazonk")); - PCollection<KV<String, String>> rightCollection = p - .apply("CreateRight", Create.of(listRightOfKv)); - - PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin( - leftCollection, rightCollection, ""); - - expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); - expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk"))); - PAssert.that(output).containsInAnyOrder(expectedResult); - - p.run(); - } - - @Test - public void testJoinManyToOneMapping() { - leftListOfKv.add(KV.of("Key2", 4L)); - leftListOfKv.add(KV.of("Key2", 6L)); - PCollection<KV<String, Long>> leftCollection = p - .apply("CreateLeft", Create.of(leftListOfKv)); - - listRightOfKv.add(KV.of("Key2", "bar")); - PCollection<KV<String, String>> rightCollection = p - .apply("CreateRight", Create.of(listRightOfKv)); - - PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin( - leftCollection, rightCollection, ""); - - expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); - expectedResult.add(KV.of("Key2", KV.of(6L, "bar"))); - PAssert.that(output).containsInAnyOrder(expectedResult); - - p.run(); - } - - @Test - public void testJoinOneToNoneMapping() { - leftListOfKv.add(KV.of("Key2", 4L)); - PCollection<KV<String, Long>> leftCollection = p - .apply("CreateLeft", Create.of(leftListOfKv)); - - listRightOfKv.add(KV.of("Key3", "bar")); - PCollection<KV<String, String>> rightCollection = p - .apply("CreateRight", Create.of(listRightOfKv)); - - PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin( - leftCollection, rightCollection, ""); - - expectedResult.add(KV.of("Key2", KV.of(4L, ""))); - PAssert.that(output).containsInAnyOrder(expectedResult); - p.run(); - } - - @Test(expected = NullPointerException.class) - public void testJoinLeftCollectionNull() { - Join.leftOuterJoin(null, p.apply(Create.of(listRightOfKv)), ""); - } - - @Test(expected = NullPointerException.class) - public void testJoinRightCollectionNull() { - Join.leftOuterJoin(p.apply(Create.of(leftListOfKv)), null, ""); - } - - @Test(expected = NullPointerException.class) - public void testJoinNullValueIsNull() { - Join.leftOuterJoin( - p.apply("CreateLeft", Create.of(leftListOfKv)), - p.apply("CreateRight", Create.of(listRightOfKv)), - null); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/contrib/join-library/src/test/java/com/google/cloud/dataflow/contrib/joinlibrary/OuterRightJoinTest.java ---------------------------------------------------------------------- diff --git a/contrib/join-library/src/test/java/com/google/cloud/dataflow/contrib/joinlibrary/OuterRightJoinTest.java b/contrib/join-library/src/test/java/com/google/cloud/dataflow/contrib/joinlibrary/OuterRightJoinTest.java deleted file mode 100644 index 9cc6785..0000000 --- a/contrib/join-library/src/test/java/com/google/cloud/dataflow/contrib/joinlibrary/OuterRightJoinTest.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.contrib.joinlibrary; - -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.testing.PAssert; -import com.google.cloud.dataflow.sdk.testing.TestPipeline; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PCollection; - -import org.junit.Before; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.List; - - -/** - * This test Outer Right Join functionality. - */ -public class OuterRightJoinTest { - - Pipeline p; - List<KV<String, Long>> leftListOfKv; - List<KV<String, String>> listRightOfKv; - List<KV<String, KV<Long, String>>> expectedResult; - - @Before - public void setup() { - - p = TestPipeline.create(); - leftListOfKv = new ArrayList<>(); - listRightOfKv = new ArrayList<>(); - - expectedResult = new ArrayList<>(); - } - - @Test - public void testJoinOneToOneMapping() { - leftListOfKv.add(KV.of("Key1", 5L)); - leftListOfKv.add(KV.of("Key2", 4L)); - PCollection<KV<String, Long>> leftCollection = p - .apply("CreateLeft", Create.of(leftListOfKv)); - - listRightOfKv.add(KV.of("Key1", "foo")); - listRightOfKv.add(KV.of("Key2", "bar")); - PCollection<KV<String, String>> rightCollection = p - .apply("CreateRight", Create.of(listRightOfKv)); - - PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin( - leftCollection, rightCollection, -1L); - - expectedResult.add(KV.of("Key1", KV.of(5L, "foo"))); - expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); - PAssert.that(output).containsInAnyOrder(expectedResult); - - p.run(); - } - - @Test - public void testJoinOneToManyMapping() { - leftListOfKv.add(KV.of("Key2", 4L)); - PCollection<KV<String, Long>> leftCollection = p - .apply("CreateLeft", Create.of(leftListOfKv)); - - listRightOfKv.add(KV.of("Key2", "bar")); - listRightOfKv.add(KV.of("Key2", "gazonk")); - PCollection<KV<String, String>> rightCollection = p - .apply("CreateRight", Create.of(listRightOfKv)); - - PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin( - leftCollection, rightCollection, -1L); - - expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); - expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk"))); - PAssert.that(output).containsInAnyOrder(expectedResult); - - p.run(); - } - - @Test - public void testJoinManyToOneMapping() { - leftListOfKv.add(KV.of("Key2", 4L)); - leftListOfKv.add(KV.of("Key2", 6L)); - PCollection<KV<String, Long>> leftCollection = p - .apply("CreateLeft", Create.of(leftListOfKv)); - - listRightOfKv.add(KV.of("Key2", "bar")); - PCollection<KV<String, String>> rightCollection = p - .apply("CreateRight", Create.of(listRightOfKv)); - - PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin( - leftCollection, rightCollection, -1L); - - expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); - expectedResult.add(KV.of("Key2", KV.of(6L, "bar"))); - PAssert.that(output).containsInAnyOrder(expectedResult); - - p.run(); - } - - @Test - public void testJoinNoneToOneMapping() { - leftListOfKv.add(KV.of("Key2", 4L)); - PCollection<KV<String, Long>> leftCollection = p - .apply("CreateLeft", Create.of(leftListOfKv)); - - listRightOfKv.add(KV.of("Key3", "bar")); - PCollection<KV<String, String>> rightCollection = p - .apply("CreateRight", Create.of(listRightOfKv)); - - PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin( - leftCollection, rightCollection, -1L); - - expectedResult.add(KV.of("Key3", KV.of(-1L, "bar"))); - PAssert.that(output).containsInAnyOrder(expectedResult); - p.run(); - } - - @Test(expected = NullPointerException.class) - public void testJoinLeftCollectionNull() { - Join.rightOuterJoin(null, p.apply(Create.of(listRightOfKv)), ""); - } - - @Test(expected = NullPointerException.class) - public void testJoinRightCollectionNull() { - Join.rightOuterJoin(p.apply(Create.of(leftListOfKv)), null, -1L); - } - - @Test(expected = NullPointerException.class) - public void testJoinNullValueIsNull() { - Join.rightOuterJoin( - p.apply("CreateLeft", Create.of(leftListOfKv)), - p.apply("CreateRight", Create.of(listRightOfKv)), - null); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/InnerJoinTest.java ---------------------------------------------------------------------- diff --git a/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/InnerJoinTest.java b/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/InnerJoinTest.java new file mode 100644 index 0000000..c479a65 --- /dev/null +++ b/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/InnerJoinTest.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.contrib.joinlibrary; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.testing.PAssert; +import com.google.cloud.dataflow.sdk.testing.TestPipeline; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; + +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +/** + * This test Inner Join functionality. + */ +public class InnerJoinTest { + + Pipeline p; + List<KV<String, Long>> leftListOfKv; + List<KV<String, String>> listRightOfKv; + List<KV<String, KV<Long, String>>> expectedResult; + + @Before + public void setup() { + + p = TestPipeline.create(); + leftListOfKv = new ArrayList<>(); + listRightOfKv = new ArrayList<>(); + + expectedResult = new ArrayList<>(); + } + + @Test + public void testJoinOneToOneMapping() { + leftListOfKv.add(KV.of("Key1", 5L)); + leftListOfKv.add(KV.of("Key2", 4L)); + PCollection<KV<String, Long>> leftCollection = + p.apply("CreateLeft", Create.of(leftListOfKv)); + + listRightOfKv.add(KV.of("Key1", "foo")); + listRightOfKv.add(KV.of("Key2", "bar")); + PCollection<KV<String, String>> rightCollection = + p.apply("CreateRight", Create.of(listRightOfKv)); + + PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin( + leftCollection, rightCollection); + + expectedResult.add(KV.of("Key1", KV.of(5L, "foo"))); + expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); + PAssert.that(output).containsInAnyOrder(expectedResult); + + p.run(); + } + + @Test + public void testJoinOneToManyMapping() { + leftListOfKv.add(KV.of("Key2", 4L)); + PCollection<KV<String, Long>> leftCollection = p + .apply("CreateLeft", Create.of(leftListOfKv)); + + listRightOfKv.add(KV.of("Key2", "bar")); + listRightOfKv.add(KV.of("Key2", "gazonk")); + PCollection<KV<String, String>> rightCollection = p + .apply("CreateRight", Create.of(listRightOfKv)); + + PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin( + leftCollection, rightCollection); + + expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); + expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk"))); + PAssert.that(output).containsInAnyOrder(expectedResult); + + p.run(); + } + + @Test + public void testJoinManyToOneMapping() { + leftListOfKv.add(KV.of("Key2", 4L)); + leftListOfKv.add(KV.of("Key2", 6L)); + PCollection<KV<String, Long>> leftCollection = p + .apply("CreateLeft", Create.of(leftListOfKv)); + + listRightOfKv.add(KV.of("Key2", "bar")); + PCollection<KV<String, String>> rightCollection = p + .apply("CreateRight", Create.of(listRightOfKv)); + + PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin( + leftCollection, rightCollection); + + expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); + expectedResult.add(KV.of("Key2", KV.of(6L, "bar"))); + PAssert.that(output).containsInAnyOrder(expectedResult); + + p.run(); + } + + @Test + public void testJoinNoneToNoneMapping() { + leftListOfKv.add(KV.of("Key2", 4L)); + PCollection<KV<String, Long>> leftCollection = p + .apply("CreateLeft", Create.of(leftListOfKv)); + + listRightOfKv.add(KV.of("Key3", "bar")); + PCollection<KV<String, String>> rightCollection = p + .apply("CreateRight", Create.of(listRightOfKv)); + + PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin( + leftCollection, rightCollection); + + PAssert.that(output).containsInAnyOrder(expectedResult); + p.run(); + } + + @Test(expected = NullPointerException.class) + public void testJoinLeftCollectionNull() { + Join.innerJoin(null, p.apply(Create.of(listRightOfKv))); + } + + @Test(expected = NullPointerException.class) + public void testJoinRightCollectionNull() { + Join.innerJoin(p.apply(Create.of(leftListOfKv)), null); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterLeftJoinTest.java ---------------------------------------------------------------------- diff --git a/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterLeftJoinTest.java b/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterLeftJoinTest.java new file mode 100644 index 0000000..35c8655 --- /dev/null +++ b/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterLeftJoinTest.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.contrib.joinlibrary; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.testing.PAssert; +import com.google.cloud.dataflow.sdk.testing.TestPipeline; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; + +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + + +/** + * This test Outer Left Join functionality. + */ +public class OuterLeftJoinTest { + + Pipeline p; + List<KV<String, Long>> leftListOfKv; + List<KV<String, String>> listRightOfKv; + List<KV<String, KV<Long, String>>> expectedResult; + + @Before + public void setup() { + + p = TestPipeline.create(); + leftListOfKv = new ArrayList<>(); + listRightOfKv = new ArrayList<>(); + + expectedResult = new ArrayList<>(); + } + + @Test + public void testJoinOneToOneMapping() { + leftListOfKv.add(KV.of("Key1", 5L)); + leftListOfKv.add(KV.of("Key2", 4L)); + PCollection<KV<String, Long>> leftCollection = p + .apply("CreateLeft", Create.of(leftListOfKv)); + + listRightOfKv.add(KV.of("Key1", "foo")); + listRightOfKv.add(KV.of("Key2", "bar")); + PCollection<KV<String, String>> rightCollection = p + .apply("CreateRight", Create.of(listRightOfKv)); + + PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin( + leftCollection, rightCollection, ""); + + expectedResult.add(KV.of("Key1", KV.of(5L, "foo"))); + expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); + PAssert.that(output).containsInAnyOrder(expectedResult); + + p.run(); + } + + @Test + public void testJoinOneToManyMapping() { + leftListOfKv.add(KV.of("Key2", 4L)); + PCollection<KV<String, Long>> leftCollection = p + .apply("CreateLeft", Create.of(leftListOfKv)); + + listRightOfKv.add(KV.of("Key2", "bar")); + listRightOfKv.add(KV.of("Key2", "gazonk")); + PCollection<KV<String, String>> rightCollection = p + .apply("CreateRight", Create.of(listRightOfKv)); + + PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin( + leftCollection, rightCollection, ""); + + expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); + expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk"))); + PAssert.that(output).containsInAnyOrder(expectedResult); + + p.run(); + } + + @Test + public void testJoinManyToOneMapping() { + leftListOfKv.add(KV.of("Key2", 4L)); + leftListOfKv.add(KV.of("Key2", 6L)); + PCollection<KV<String, Long>> leftCollection = p + .apply("CreateLeft", Create.of(leftListOfKv)); + + listRightOfKv.add(KV.of("Key2", "bar")); + PCollection<KV<String, String>> rightCollection = p + .apply("CreateRight", Create.of(listRightOfKv)); + + PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin( + leftCollection, rightCollection, ""); + + expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); + expectedResult.add(KV.of("Key2", KV.of(6L, "bar"))); + PAssert.that(output).containsInAnyOrder(expectedResult); + + p.run(); + } + + @Test + public void testJoinOneToNoneMapping() { + leftListOfKv.add(KV.of("Key2", 4L)); + PCollection<KV<String, Long>> leftCollection = p + .apply("CreateLeft", Create.of(leftListOfKv)); + + listRightOfKv.add(KV.of("Key3", "bar")); + PCollection<KV<String, String>> rightCollection = p + .apply("CreateRight", Create.of(listRightOfKv)); + + PCollection<KV<String, KV<Long, String>>> output = Join.leftOuterJoin( + leftCollection, rightCollection, ""); + + expectedResult.add(KV.of("Key2", KV.of(4L, ""))); + PAssert.that(output).containsInAnyOrder(expectedResult); + p.run(); + } + + @Test(expected = NullPointerException.class) + public void testJoinLeftCollectionNull() { + Join.leftOuterJoin(null, p.apply(Create.of(listRightOfKv)), ""); + } + + @Test(expected = NullPointerException.class) + public void testJoinRightCollectionNull() { + Join.leftOuterJoin(p.apply(Create.of(leftListOfKv)), null, ""); + } + + @Test(expected = NullPointerException.class) + public void testJoinNullValueIsNull() { + Join.leftOuterJoin( + p.apply("CreateLeft", Create.of(leftListOfKv)), + p.apply("CreateRight", Create.of(listRightOfKv)), + null); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterRightJoinTest.java ---------------------------------------------------------------------- diff --git a/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterRightJoinTest.java b/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterRightJoinTest.java new file mode 100644 index 0000000..9cc6785 --- /dev/null +++ b/contrib/join-library/src/test/java/org/apache/contrib/joinlibrary/OuterRightJoinTest.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.contrib.joinlibrary; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.testing.PAssert; +import com.google.cloud.dataflow.sdk.testing.TestPipeline; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; + +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + + +/** + * This test Outer Right Join functionality. + */ +public class OuterRightJoinTest { + + Pipeline p; + List<KV<String, Long>> leftListOfKv; + List<KV<String, String>> listRightOfKv; + List<KV<String, KV<Long, String>>> expectedResult; + + @Before + public void setup() { + + p = TestPipeline.create(); + leftListOfKv = new ArrayList<>(); + listRightOfKv = new ArrayList<>(); + + expectedResult = new ArrayList<>(); + } + + @Test + public void testJoinOneToOneMapping() { + leftListOfKv.add(KV.of("Key1", 5L)); + leftListOfKv.add(KV.of("Key2", 4L)); + PCollection<KV<String, Long>> leftCollection = p + .apply("CreateLeft", Create.of(leftListOfKv)); + + listRightOfKv.add(KV.of("Key1", "foo")); + listRightOfKv.add(KV.of("Key2", "bar")); + PCollection<KV<String, String>> rightCollection = p + .apply("CreateRight", Create.of(listRightOfKv)); + + PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin( + leftCollection, rightCollection, -1L); + + expectedResult.add(KV.of("Key1", KV.of(5L, "foo"))); + expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); + PAssert.that(output).containsInAnyOrder(expectedResult); + + p.run(); + } + + @Test + public void testJoinOneToManyMapping() { + leftListOfKv.add(KV.of("Key2", 4L)); + PCollection<KV<String, Long>> leftCollection = p + .apply("CreateLeft", Create.of(leftListOfKv)); + + listRightOfKv.add(KV.of("Key2", "bar")); + listRightOfKv.add(KV.of("Key2", "gazonk")); + PCollection<KV<String, String>> rightCollection = p + .apply("CreateRight", Create.of(listRightOfKv)); + + PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin( + leftCollection, rightCollection, -1L); + + expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); + expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk"))); + PAssert.that(output).containsInAnyOrder(expectedResult); + + p.run(); + } + + @Test + public void testJoinManyToOneMapping() { + leftListOfKv.add(KV.of("Key2", 4L)); + leftListOfKv.add(KV.of("Key2", 6L)); + PCollection<KV<String, Long>> leftCollection = p + .apply("CreateLeft", Create.of(leftListOfKv)); + + listRightOfKv.add(KV.of("Key2", "bar")); + PCollection<KV<String, String>> rightCollection = p + .apply("CreateRight", Create.of(listRightOfKv)); + + PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin( + leftCollection, rightCollection, -1L); + + expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); + expectedResult.add(KV.of("Key2", KV.of(6L, "bar"))); + PAssert.that(output).containsInAnyOrder(expectedResult); + + p.run(); + } + + @Test + public void testJoinNoneToOneMapping() { + leftListOfKv.add(KV.of("Key2", 4L)); + PCollection<KV<String, Long>> leftCollection = p + .apply("CreateLeft", Create.of(leftListOfKv)); + + listRightOfKv.add(KV.of("Key3", "bar")); + PCollection<KV<String, String>> rightCollection = p + .apply("CreateRight", Create.of(listRightOfKv)); + + PCollection<KV<String, KV<Long, String>>> output = Join.rightOuterJoin( + leftCollection, rightCollection, -1L); + + expectedResult.add(KV.of("Key3", KV.of(-1L, "bar"))); + PAssert.that(output).containsInAnyOrder(expectedResult); + p.run(); + } + + @Test(expected = NullPointerException.class) + public void testJoinLeftCollectionNull() { + Join.rightOuterJoin(null, p.apply(Create.of(listRightOfKv)), ""); + } + + @Test(expected = NullPointerException.class) + public void testJoinRightCollectionNull() { + Join.rightOuterJoin(p.apply(Create.of(leftListOfKv)), null, -1L); + } + + @Test(expected = NullPointerException.class) + public void testJoinNullValueIsNull() { + Join.rightOuterJoin( + p.apply("CreateLeft", Create.of(leftListOfKv)), + p.apply("CreateRight", Create.of(listRightOfKv)), + null); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/main/java/com/google/cloud/dataflow/examples/DebuggingWordCount.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/com/google/cloud/dataflow/examples/DebuggingWordCount.java b/examples/java/src/main/java/com/google/cloud/dataflow/examples/DebuggingWordCount.java deleted file mode 100644 index 7134bca..0000000 --- a/examples/java/src/main/java/com/google/cloud/dataflow/examples/DebuggingWordCount.java +++ /dev/null @@ -1,199 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.examples; - -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.options.Default; -import com.google.cloud.dataflow.sdk.options.Description; -import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; -import com.google.cloud.dataflow.sdk.testing.PAssert; -import com.google.cloud.dataflow.sdk.transforms.Aggregator; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.transforms.Sum; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PCollection; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Arrays; -import java.util.List; -import java.util.regex.Pattern; - - -/** - * An example that verifies word counts in Shakespeare and includes Dataflow best practices. - * - * <p>This class, {@link DebuggingWordCount}, is the third in a series of four successively more - * detailed 'word count' examples. You may first want to take a look at {@link MinimalWordCount} - * and {@link WordCount}. After you've looked at this example, then see the - * {@link WindowedWordCount} pipeline, for introduction of additional concepts. - * - * <p>Basic concepts, also in the MinimalWordCount and WordCount examples: - * Reading text files; counting a PCollection; executing a Pipeline both locally - * and using the Dataflow service; defining DoFns. - * - * <p>New Concepts: - * <pre> - * 1. Logging to Cloud Logging - * 2. Controlling Dataflow worker log levels - * 3. Creating a custom aggregator - * 4. Testing your Pipeline via PAssert - * </pre> - * - * <p>To execute this pipeline locally, specify general pipeline configuration: - * <pre>{@code - * --project=YOUR_PROJECT_ID - * } - * </pre> - * - * <p>To execute this pipeline using the Dataflow service and the additional logging discussed - * below, specify pipeline configuration: - * <pre>{@code - * --project=YOUR_PROJECT_ID - * --stagingLocation=gs://YOUR_STAGING_DIRECTORY - * --runner=BlockingDataflowPipelineRunner - * --workerLogLevelOverrides={"com.google.cloud.dataflow.examples":"DEBUG"} - * } - * </pre> - * - * <p>Note that when you run via <code>mvn exec</code>, you may need to escape - * the quotations as appropriate for your shell. For example, in <code>bash</code>: - * <pre> - * mvn compile exec:java ... \ - * -Dexec.args="... \ - * --workerLogLevelOverrides={\\\"com.google.cloud.dataflow.examples\\\":\\\"DEBUG\\\"}" - * </pre> - * - * <p>Concept #2: Dataflow workers which execute user code are configured to log to Cloud - * Logging by default at "INFO" log level and higher. One may override log levels for specific - * logging namespaces by specifying: - * <pre><code> - * --workerLogLevelOverrides={"Name1":"Level1","Name2":"Level2",...} - * </code></pre> - * For example, by specifying: - * <pre><code> - * --workerLogLevelOverrides={"com.google.cloud.dataflow.examples":"DEBUG"} - * </code></pre> - * when executing this pipeline using the Dataflow service, Cloud Logging would contain only - * "DEBUG" or higher level logs for the {@code com.google.cloud.dataflow.examples} package in - * addition to the default "INFO" or higher level logs. In addition, the default Dataflow worker - * logging configuration can be overridden by specifying - * {@code --defaultWorkerLogLevel=<one of TRACE, DEBUG, INFO, WARN, ERROR>}. For example, - * by specifying {@code --defaultWorkerLogLevel=DEBUG} when executing this pipeline with - * the Dataflow service, Cloud Logging would contain all "DEBUG" or higher level logs. Note - * that changing the default worker log level to TRACE or DEBUG will significantly increase - * the amount of logs output. - * - * <p>The input file defaults to {@code gs://dataflow-samples/shakespeare/kinglear.txt} and can be - * overridden with {@code --inputFile}. - */ -public class DebuggingWordCount { - /** A DoFn that filters for a specific key based upon a regular expression. */ - public static class FilterTextFn extends DoFn<KV<String, Long>, KV<String, Long>> { - /** - * Concept #1: The logger below uses the fully qualified class name of FilterTextFn - * as the logger. All log statements emitted by this logger will be referenced by this name - * and will be visible in the Cloud Logging UI. Learn more at https://cloud.google.com/logging - * about the Cloud Logging UI. - */ - private static final Logger LOG = LoggerFactory.getLogger(FilterTextFn.class); - - private final Pattern filter; - public FilterTextFn(String pattern) { - filter = Pattern.compile(pattern); - } - - /** - * Concept #3: A custom aggregator can track values in your pipeline as it runs. Those - * values will be displayed in the Dataflow Monitoring UI when this pipeline is run using the - * Dataflow service. These aggregators below track the number of matched and unmatched words. - * Learn more at https://cloud.google.com/dataflow/pipelines/dataflow-monitoring-intf about - * the Dataflow Monitoring UI. - */ - private final Aggregator<Long, Long> matchedWords = - createAggregator("matchedWords", new Sum.SumLongFn()); - private final Aggregator<Long, Long> unmatchedWords = - createAggregator("umatchedWords", new Sum.SumLongFn()); - - @Override - public void processElement(ProcessContext c) { - if (filter.matcher(c.element().getKey()).matches()) { - // Log at the "DEBUG" level each element that we match. When executing this pipeline - // using the Dataflow service, these log lines will appear in the Cloud Logging UI - // only if the log level is set to "DEBUG" or lower. - LOG.debug("Matched: " + c.element().getKey()); - matchedWords.addValue(1L); - c.output(c.element()); - } else { - // Log at the "TRACE" level each element that is not matched. Different log levels - // can be used to control the verbosity of logging providing an effective mechanism - // to filter less important information. - LOG.trace("Did not match: " + c.element().getKey()); - unmatchedWords.addValue(1L); - } - } - } - - /** - * Options supported by {@link DebuggingWordCount}. - * - * <p>Inherits standard configuration options and all options defined in - * {@link WordCount.WordCountOptions}. - */ - public static interface WordCountOptions extends WordCount.WordCountOptions { - - @Description("Regex filter pattern to use in DebuggingWordCount. " - + "Only words matching this pattern will be counted.") - @Default.String("Flourish|stomach") - String getFilterPattern(); - void setFilterPattern(String value); - } - - public static void main(String[] args) { - WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() - .as(WordCountOptions.class); - Pipeline p = Pipeline.create(options); - - PCollection<KV<String, Long>> filteredWords = - p.apply(TextIO.Read.named("ReadLines").from(options.getInputFile())) - .apply(new WordCount.CountWords()) - .apply(ParDo.of(new FilterTextFn(options.getFilterPattern()))); - - /** - * Concept #4: PAssert is a set of convenient PTransforms in the style of - * Hamcrest's collection matchers that can be used when writing Pipeline level tests - * to validate the contents of PCollections. PAssert is best used in unit tests - * with small data sets but is demonstrated here as a teaching tool. - * - * <p>Below we verify that the set of filtered words matches our expected counts. Note - * that PAssert does not provide any output and that successful completion of the - * Pipeline implies that the expectations were met. Learn more at - * https://cloud.google.com/dataflow/pipelines/testing-your-pipeline on how to test - * your Pipeline and see {@link DebuggingWordCountTest} for an example unit test. - */ - List<KV<String, Long>> expectedResults = Arrays.asList( - KV.of("Flourish", 3L), - KV.of("stomach", 1L)); - PAssert.that(filteredWords).containsInAnyOrder(expectedResults); - - p.run(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/main/java/com/google/cloud/dataflow/examples/MinimalWordCount.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/com/google/cloud/dataflow/examples/MinimalWordCount.java b/examples/java/src/main/java/com/google/cloud/dataflow/examples/MinimalWordCount.java deleted file mode 100644 index f3be5a3..0000000 --- a/examples/java/src/main/java/com/google/cloud/dataflow/examples/MinimalWordCount.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.examples; - -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; -import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; -import com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner; -import com.google.cloud.dataflow.sdk.transforms.Count; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.MapElements; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.transforms.SimpleFunction; -import com.google.cloud.dataflow.sdk.values.KV; - - -/** - * An example that counts words in Shakespeare. - * - * <p>This class, {@link MinimalWordCount}, is the first in a series of four successively more - * detailed 'word count' examples. Here, for simplicity, we don't show any error-checking or - * argument processing, and focus on construction of the pipeline, which chains together the - * application of core transforms. - * - * <p>Next, see the {@link WordCount} pipeline, then the {@link DebuggingWordCount}, and finally - * the {@link WindowedWordCount} pipeline, for more detailed examples that introduce additional - * concepts. - * - * <p>Concepts: - * <pre> - * 1. Reading data from text files - * 2. Specifying 'inline' transforms - * 3. Counting a PCollection - * 4. Writing data to Cloud Storage as text files - * </pre> - * - * <p>To execute this pipeline, first edit the code to set your project ID, the staging - * location, and the output location. The specified GCS bucket(s) must already exist. - * - * <p>Then, run the pipeline as described in the README. It will be deployed and run using the - * Dataflow service. No args are required to run the pipeline. You can see the results in your - * output bucket in the GCS browser. - */ -public class MinimalWordCount { - - public static void main(String[] args) { - // Create a DataflowPipelineOptions object. This object lets us set various execution - // options for our pipeline, such as the associated Cloud Platform project and the location - // in Google Cloud Storage to stage files. - DataflowPipelineOptions options = PipelineOptionsFactory.create() - .as(DataflowPipelineOptions.class); - options.setRunner(BlockingDataflowPipelineRunner.class); - // CHANGE 1/3: Your project ID is required in order to run your pipeline on the Google Cloud. - options.setProject("SET_YOUR_PROJECT_ID_HERE"); - // CHANGE 2/3: Your Google Cloud Storage path is required for staging local files. - options.setStagingLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_STAGING_DIRECTORY"); - - // Create the Pipeline object with the options we defined above. - Pipeline p = Pipeline.create(options); - - // Apply the pipeline's transforms. - - // Concept #1: Apply a root transform to the pipeline; in this case, TextIO.Read to read a set - // of input text files. TextIO.Read returns a PCollection where each element is one line from - // the input text (a set of Shakespeare's texts). - p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*")) - // Concept #2: Apply a ParDo transform to our PCollection of text lines. This ParDo invokes a - // DoFn (defined in-line) on each element that tokenizes the text line into individual words. - // The ParDo returns a PCollection<String>, where each element is an individual word in - // Shakespeare's collected texts. - .apply(ParDo.named("ExtractWords").of(new DoFn<String, String>() { - @Override - public void processElement(ProcessContext c) { - for (String word : c.element().split("[^a-zA-Z']+")) { - if (!word.isEmpty()) { - c.output(word); - } - } - } - })) - // Concept #3: Apply the Count transform to our PCollection of individual words. The Count - // transform returns a new PCollection of key/value pairs, where each key represents a unique - // word in the text. The associated value is the occurrence count for that word. - .apply(Count.<String>perElement()) - // Apply a MapElements transform that formats our PCollection of word counts into a printable - // string, suitable for writing to an output file. - .apply("FormatResults", MapElements.via(new SimpleFunction<KV<String, Long>, String>() { - @Override - public String apply(KV<String, Long> input) { - return input.getKey() + ": " + input.getValue(); - } - })) - // Concept #4: Apply a write transform, TextIO.Write, at the end of the pipeline. - // TextIO.Write writes the contents of a PCollection (in this case, our PCollection of - // formatted strings) to a series of text files in Google Cloud Storage. - // CHANGE 3/3: The Google Cloud Storage path is required for outputting the results to. - .apply(TextIO.Write.to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX")); - - // Run the pipeline. - p.run(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/main/java/com/google/cloud/dataflow/examples/WindowedWordCount.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/com/google/cloud/dataflow/examples/WindowedWordCount.java b/examples/java/src/main/java/com/google/cloud/dataflow/examples/WindowedWordCount.java deleted file mode 100644 index 8f49dd2..0000000 --- a/examples/java/src/main/java/com/google/cloud/dataflow/examples/WindowedWordCount.java +++ /dev/null @@ -1,270 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.examples; - -import com.google.api.services.bigquery.model.TableFieldSchema; -import com.google.api.services.bigquery.model.TableReference; -import com.google.api.services.bigquery.model.TableRow; -import com.google.api.services.bigquery.model.TableSchema; -import com.google.cloud.dataflow.examples.common.DataflowExampleOptions; -import com.google.cloud.dataflow.examples.common.DataflowExampleUtils; -import com.google.cloud.dataflow.examples.common.ExampleBigQueryTableOptions; -import com.google.cloud.dataflow.examples.common.ExamplePubsubTopicOptions; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.PipelineResult; -import com.google.cloud.dataflow.sdk.io.BigQueryIO; -import com.google.cloud.dataflow.sdk.io.PubsubIO; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.options.Default; -import com.google.cloud.dataflow.sdk.options.Description; -import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; -import com.google.cloud.dataflow.sdk.transforms.windowing.Window; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PCollection; - -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - - -/** - * An example that counts words in text, and can run over either unbounded or bounded input - * collections. - * - * <p>This class, {@link WindowedWordCount}, is the last in a series of four successively more - * detailed 'word count' examples. First take a look at {@link MinimalWordCount}, - * {@link WordCount}, and {@link DebuggingWordCount}. - * - * <p>Basic concepts, also in the MinimalWordCount, WordCount, and DebuggingWordCount examples: - * Reading text files; counting a PCollection; writing to GCS; executing a Pipeline both locally - * and using the Dataflow service; defining DoFns; creating a custom aggregator; - * user-defined PTransforms; defining PipelineOptions. - * - * <p>New Concepts: - * <pre> - * 1. Unbounded and bounded pipeline input modes - * 2. Adding timestamps to data - * 3. PubSub topics as sources - * 4. Windowing - * 5. Re-using PTransforms over windowed PCollections - * 6. Writing to BigQuery - * </pre> - * - * <p>To execute this pipeline locally, specify general pipeline configuration: - * <pre>{@code - * --project=YOUR_PROJECT_ID - * } - * </pre> - * - * <p>To execute this pipeline using the Dataflow service, specify pipeline configuration: - * <pre>{@code - * --project=YOUR_PROJECT_ID - * --stagingLocation=gs://YOUR_STAGING_DIRECTORY - * --runner=BlockingDataflowPipelineRunner - * } - * </pre> - * - * <p>Optionally specify the input file path via: - * {@code --inputFile=gs://INPUT_PATH}, - * which defaults to {@code gs://dataflow-samples/shakespeare/kinglear.txt}. - * - * <p>Specify an output BigQuery dataset and optionally, a table for the output. If you don't - * specify the table, one will be created for you using the job name. If you don't specify the - * dataset, a dataset called {@code dataflow-examples} must already exist in your project. - * {@code --bigQueryDataset=YOUR-DATASET --bigQueryTable=YOUR-NEW-TABLE-NAME}. - * - * <p>Decide whether you want your pipeline to run with 'bounded' (such as files in GCS) or - * 'unbounded' input (such as a PubSub topic). To run with unbounded input, set - * {@code --unbounded=true}. Then, optionally specify the Google Cloud PubSub topic to read from - * via {@code --pubsubTopic=projects/PROJECT_ID/topics/YOUR_TOPIC_NAME}. If the topic does not - * exist, the pipeline will create one for you. It will delete this topic when it terminates. - * The pipeline will automatically launch an auxiliary batch pipeline to populate the given PubSub - * topic with the contents of the {@code --inputFile}, in order to make the example easy to run. - * If you want to use an independently-populated PubSub topic, indicate this by setting - * {@code --inputFile=""}. In that case, the auxiliary pipeline will not be started. - * - * <p>By default, the pipeline will do fixed windowing, on 1-minute windows. You can - * change this interval by setting the {@code --windowSize} parameter, e.g. {@code --windowSize=10} - * for 10-minute windows. - */ -public class WindowedWordCount { - private static final Logger LOG = LoggerFactory.getLogger(WindowedWordCount.class); - static final int WINDOW_SIZE = 1; // Default window duration in minutes - - /** - * Concept #2: A DoFn that sets the data element timestamp. This is a silly method, just for - * this example, for the bounded data case. - * - * <p>Imagine that many ghosts of Shakespeare are all typing madly at the same time to recreate - * his masterworks. Each line of the corpus will get a random associated timestamp somewhere in a - * 2-hour period. - */ - static class AddTimestampFn extends DoFn<String, String> { - private static final long RAND_RANGE = 7200000; // 2 hours in ms - - @Override - public void processElement(ProcessContext c) { - // Generate a timestamp that falls somewhere in the past two hours. - long randomTimestamp = System.currentTimeMillis() - - (int) (Math.random() * RAND_RANGE); - /** - * Concept #2: Set the data element with that timestamp. - */ - c.outputWithTimestamp(c.element(), new Instant(randomTimestamp)); - } - } - - /** A DoFn that converts a Word and Count into a BigQuery table row. */ - static class FormatAsTableRowFn extends DoFn<KV<String, Long>, TableRow> { - @Override - public void processElement(ProcessContext c) { - TableRow row = new TableRow() - .set("word", c.element().getKey()) - .set("count", c.element().getValue()) - // include a field for the window timestamp - .set("window_timestamp", c.timestamp().toString()); - c.output(row); - } - } - - /** - * Helper method that defines the BigQuery schema used for the output. - */ - private static TableSchema getSchema() { - List<TableFieldSchema> fields = new ArrayList<>(); - fields.add(new TableFieldSchema().setName("word").setType("STRING")); - fields.add(new TableFieldSchema().setName("count").setType("INTEGER")); - fields.add(new TableFieldSchema().setName("window_timestamp").setType("TIMESTAMP")); - TableSchema schema = new TableSchema().setFields(fields); - return schema; - } - - /** - * Concept #6: We'll stream the results to a BigQuery table. The BigQuery output source is one - * that supports both bounded and unbounded data. This is a helper method that creates a - * TableReference from input options, to tell the pipeline where to write its BigQuery results. - */ - private static TableReference getTableReference(Options options) { - TableReference tableRef = new TableReference(); - tableRef.setProjectId(options.getProject()); - tableRef.setDatasetId(options.getBigQueryDataset()); - tableRef.setTableId(options.getBigQueryTable()); - return tableRef; - } - - /** - * Options supported by {@link WindowedWordCount}. - * - * <p>Inherits standard example configuration options, which allow specification of the BigQuery - * table and the PubSub topic, as well as the {@link WordCount.WordCountOptions} support for - * specification of the input file. - */ - public static interface Options extends WordCount.WordCountOptions, - DataflowExampleOptions, ExamplePubsubTopicOptions, ExampleBigQueryTableOptions { - @Description("Fixed window duration, in minutes") - @Default.Integer(WINDOW_SIZE) - Integer getWindowSize(); - void setWindowSize(Integer value); - - @Description("Whether to run the pipeline with unbounded input") - boolean isUnbounded(); - void setUnbounded(boolean value); - } - - public static void main(String[] args) throws IOException { - Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); - options.setBigQuerySchema(getSchema()); - // DataflowExampleUtils creates the necessary input sources to simplify execution of this - // Pipeline. - DataflowExampleUtils exampleDataflowUtils = new DataflowExampleUtils(options, - options.isUnbounded()); - - Pipeline pipeline = Pipeline.create(options); - - /** - * Concept #1: the Dataflow SDK lets us run the same pipeline with either a bounded or - * unbounded input source. - */ - PCollection<String> input; - if (options.isUnbounded()) { - LOG.info("Reading from PubSub."); - /** - * Concept #3: Read from the PubSub topic. A topic will be created if it wasn't - * specified as an argument. The data elements' timestamps will come from the pubsub - * injection. - */ - input = pipeline - .apply(PubsubIO.Read.topic(options.getPubsubTopic())); - } else { - /** Else, this is a bounded pipeline. Read from the GCS file. */ - input = pipeline - .apply(TextIO.Read.from(options.getInputFile())) - // Concept #2: Add an element timestamp, using an artificial time just to show windowing. - // See AddTimestampFn for more detail on this. - .apply(ParDo.of(new AddTimestampFn())); - } - - /** - * Concept #4: Window into fixed windows. The fixed window size for this example defaults to 1 - * minute (you can change this with a command-line option). See the documentation for more - * information on how fixed windows work, and for information on the other types of windowing - * available (e.g., sliding windows). - */ - PCollection<String> windowedWords = input - .apply(Window.<String>into( - FixedWindows.of(Duration.standardMinutes(options.getWindowSize())))); - - /** - * Concept #5: Re-use our existing CountWords transform that does not have knowledge of - * windows over a PCollection containing windowed values. - */ - PCollection<KV<String, Long>> wordCounts = windowedWords.apply(new WordCount.CountWords()); - - /** - * Concept #6: Format the results for a BigQuery table, then write to BigQuery. - * The BigQuery output source supports both bounded and unbounded data. - */ - wordCounts.apply(ParDo.of(new FormatAsTableRowFn())) - .apply(BigQueryIO.Write - .to(getTableReference(options)) - .withSchema(getSchema()) - .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) - .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); - - PipelineResult result = pipeline.run(); - - /** - * To mock unbounded input from PubSub, we'll now start an auxiliary 'injector' pipeline that - * runs for a limited time, and publishes to the input PubSub topic. - * - * With an unbounded input source, you will need to explicitly shut down this pipeline when you - * are done with it, so that you do not continue to be charged for the instances. You can do - * this via a ctrl-C from the command line, or from the developer's console UI for Dataflow - * pipelines. The PubSub topic will also be deleted at this time. - */ - exampleDataflowUtils.mockUnboundedSource(options.getInputFile(), result); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/main/java/com/google/cloud/dataflow/examples/WordCount.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/com/google/cloud/dataflow/examples/WordCount.java b/examples/java/src/main/java/com/google/cloud/dataflow/examples/WordCount.java deleted file mode 100644 index ad08d13..0000000 --- a/examples/java/src/main/java/com/google/cloud/dataflow/examples/WordCount.java +++ /dev/null @@ -1,207 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.examples; - -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.io.TextIO; -import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; -import com.google.cloud.dataflow.sdk.options.Default; -import com.google.cloud.dataflow.sdk.options.DefaultValueFactory; -import com.google.cloud.dataflow.sdk.options.Description; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; -import com.google.cloud.dataflow.sdk.transforms.Aggregator; -import com.google.cloud.dataflow.sdk.transforms.Count; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.MapElements; -import com.google.cloud.dataflow.sdk.transforms.PTransform; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.transforms.SimpleFunction; -import com.google.cloud.dataflow.sdk.transforms.Sum; -import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PCollection; - - -/** - * An example that counts words in Shakespeare and includes Dataflow best practices. - * - * <p>This class, {@link WordCount}, is the second in a series of four successively more detailed - * 'word count' examples. You may first want to take a look at {@link MinimalWordCount}. - * After you've looked at this example, then see the {@link DebuggingWordCount} - * pipeline, for introduction of additional concepts. - * - * <p>For a detailed walkthrough of this example, see - * <a href="https://cloud.google.com/dataflow/java-sdk/wordcount-example"> - * https://cloud.google.com/dataflow/java-sdk/wordcount-example - * </a> - * - * <p>Basic concepts, also in the MinimalWordCount example: - * Reading text files; counting a PCollection; writing to GCS. - * - * <p>New Concepts: - * <pre> - * 1. Executing a Pipeline both locally and using the Dataflow service - * 2. Using ParDo with static DoFns defined out-of-line - * 3. Building a composite transform - * 4. Defining your own pipeline options - * </pre> - * - * <p>Concept #1: you can execute this pipeline either locally or using the Dataflow service. - * These are now command-line options and not hard-coded as they were in the MinimalWordCount - * example. - * To execute this pipeline locally, specify general pipeline configuration: - * <pre>{@code - * --project=YOUR_PROJECT_ID - * } - * </pre> - * and a local output file or output prefix on GCS: - * <pre>{@code - * --output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX] - * }</pre> - * - * <p>To execute this pipeline using the Dataflow service, specify pipeline configuration: - * <pre>{@code - * --project=YOUR_PROJECT_ID - * --stagingLocation=gs://YOUR_STAGING_DIRECTORY - * --runner=BlockingDataflowPipelineRunner - * } - * </pre> - * and an output prefix on GCS: - * <pre>{@code - * --output=gs://YOUR_OUTPUT_PREFIX - * }</pre> - * - * <p>The input file defaults to {@code gs://dataflow-samples/shakespeare/kinglear.txt} and can be - * overridden with {@code --inputFile}. - */ -public class WordCount { - - /** - * Concept #2: You can make your pipeline code less verbose by defining your DoFns statically out- - * of-line. This DoFn tokenizes lines of text into individual words; we pass it to a ParDo in the - * pipeline. - */ - static class ExtractWordsFn extends DoFn<String, String> { - private final Aggregator<Long, Long> emptyLines = - createAggregator("emptyLines", new Sum.SumLongFn()); - - @Override - public void processElement(ProcessContext c) { - if (c.element().trim().isEmpty()) { - emptyLines.addValue(1L); - } - - // Split the line into words. - String[] words = c.element().split("[^a-zA-Z']+"); - - // Output each word encountered into the output PCollection. - for (String word : words) { - if (!word.isEmpty()) { - c.output(word); - } - } - } - } - - /** A SimpleFunction that converts a Word and Count into a printable string. */ - public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> { - @Override - public String apply(KV<String, Long> input) { - return input.getKey() + ": " + input.getValue(); - } - } - - /** - * A PTransform that converts a PCollection containing lines of text into a PCollection of - * formatted word counts. - * - * <p>Concept #3: This is a custom composite transform that bundles two transforms (ParDo and - * Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse, - * modular testing, and an improved monitoring experience. - */ - public static class CountWords extends PTransform<PCollection<String>, - PCollection<KV<String, Long>>> { - @Override - public PCollection<KV<String, Long>> apply(PCollection<String> lines) { - - // Convert lines of text into individual words. - PCollection<String> words = lines.apply( - ParDo.of(new ExtractWordsFn())); - - // Count the number of times each word occurs. - PCollection<KV<String, Long>> wordCounts = - words.apply(Count.<String>perElement()); - - return wordCounts; - } - } - - /** - * Options supported by {@link WordCount}. - * - * <p>Concept #4: Defining your own configuration options. Here, you can add your own arguments - * to be processed by the command-line parser, and specify default values for them. You can then - * access the options values in your pipeline code. - * - * <p>Inherits standard configuration options. - */ - public static interface WordCountOptions extends PipelineOptions { - @Description("Path of the file to read from") - @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") - String getInputFile(); - void setInputFile(String value); - - @Description("Path of the file to write to") - @Default.InstanceFactory(OutputFactory.class) - String getOutput(); - void setOutput(String value); - - /** - * Returns "gs://${YOUR_STAGING_DIRECTORY}/counts.txt" as the default destination. - */ - public static class OutputFactory implements DefaultValueFactory<String> { - @Override - public String create(PipelineOptions options) { - DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); - if (dataflowOptions.getStagingLocation() != null) { - return GcsPath.fromUri(dataflowOptions.getStagingLocation()) - .resolve("counts.txt").toString(); - } else { - throw new IllegalArgumentException("Must specify --output or --stagingLocation"); - } - } - } - - } - - public static void main(String[] args) { - WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() - .as(WordCountOptions.class); - Pipeline p = Pipeline.create(options); - - // Concepts #2 and #3: Our pipeline applies the composite CountWords transform, and passes the - // static FormatAsTextFn() to the ParDo transform. - p.apply(TextIO.Read.named("ReadLines").from(options.getInputFile())) - .apply(new CountWords()) - .apply(MapElements.via(new FormatAsTextFn())) - .apply(TextIO.Write.named("WriteCounts").to(options.getOutput())); - - p.run(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/main/java/com/google/cloud/dataflow/examples/common/DataflowExampleOptions.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/com/google/cloud/dataflow/examples/common/DataflowExampleOptions.java b/examples/java/src/main/java/com/google/cloud/dataflow/examples/common/DataflowExampleOptions.java deleted file mode 100644 index 557e903..0000000 --- a/examples/java/src/main/java/com/google/cloud/dataflow/examples/common/DataflowExampleOptions.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.examples.common; - -import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; -import com.google.cloud.dataflow.sdk.options.Default; -import com.google.cloud.dataflow.sdk.options.Description; - -/** - * Options that can be used to configure the Dataflow examples. - */ -public interface DataflowExampleOptions extends DataflowPipelineOptions { - @Description("Whether to keep jobs running on the Dataflow service after local process exit") - @Default.Boolean(false) - boolean getKeepJobsRunning(); - void setKeepJobsRunning(boolean keepJobsRunning); - - @Description("Number of workers to use when executing the injector pipeline") - @Default.Integer(1) - int getInjectorNumWorkers(); - void setInjectorNumWorkers(int numWorkers); -}