Repository: beam Updated Branches: refs/heads/master 3360b1f68 -> 0f096b12e
Created Java snippets file Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9f1db3f1 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9f1db3f1 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9f1db3f1 Branch: refs/heads/master Commit: 9f1db3f11680c08407bc8dd0101f8b047c090620 Parents: 3360b1f Author: David Cavazos <[email protected]> Authored: Tue Sep 26 10:38:38 2017 -0700 Committer: Ahmet Altay <[email protected]> Committed: Thu Oct 19 12:10:45 2017 -0700 ---------------------------------------------------------------------- .../examples/website_snippets/Snippets.java | 87 ++++++++++++++ .../examples/website_snippets/SnippetsTest.java | 114 +++++++++++++++++++ .../apache_beam/examples/snippets/snippets.py | 45 +++----- .../examples/snippets/snippets_test.py | 38 ++++--- 4 files changed, 239 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/9f1db3f1/examples/java8/src/main/java/org/apache/beam/examples/website_snippets/Snippets.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/website_snippets/Snippets.java b/examples/java8/src/main/java/org/apache/beam/examples/website_snippets/Snippets.java new file mode 100644 index 0000000..f17171e --- /dev/null +++ b/examples/java8/src/main/java/org/apache/beam/examples/website_snippets/Snippets.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.join.CoGbkResult; +import org.apache.beam.sdk.transforms.join.CoGroupByKey; +import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TupleTag; + +/** + * Code snippets used in webdocs. + */ +public class Snippets { + + /* Helper function to format results in coGroupByKeyTuple */ + public static String formatCoGbkResults( + String name, Iterable<String> emails, Iterable<String> phones) { + + List<String> emailsList = new ArrayList<>(); + for (String elem : emails) { + emailsList.add("'" + elem + "'"); + } + Collections.<String>sort(emailsList); + String emailsStr = "[" + String.join(", ", emailsList) + "]"; + + List<String> phonesList = new ArrayList<>(); + for (String elem : phones) { + phonesList.add("'" + elem + "'"); + } + Collections.<String>sort(phonesList); + String phonesStr = "[" + String.join(", ", phonesList) + "]"; + + return name + "; " + emailsStr + "; " + phonesStr; + } + + public static PCollection<String> coGroupByKeyTuple( + TupleTag<String> emailsTag, + TupleTag<String> phonesTag, + PCollection<KV<String, String>> emails, + PCollection<KV<String, String>> phones) { + + // [START CoGroupByKeyTuple] + PCollection<KV<String, CoGbkResult>> results = + KeyedPCollectionTuple + .of(emailsTag, emails) + .and(phonesTag, phones) + .apply(CoGroupByKey.<String>create()); + + PCollection<String> contactLines = results.apply(ParDo.of( + new DoFn<KV<String, CoGbkResult>, String>() { + @ProcessElement + public void processElement(ProcessContext c) { + KV<String, CoGbkResult> e = c.element(); + String name = e.getKey(); + Iterable<String> emailsIter = e.getValue().getAll(emailsTag); + Iterable<String> phonesIter = e.getValue().getAll(phonesTag); + String formattedResult = Snippets.formatCoGbkResults(name, emailsIter, phonesIter); + c.output(formattedResult); + } + } + )); + // [END CoGroupByKeyTuple] + return contactLines; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/9f1db3f1/examples/java8/src/test/java/org/apache/beam/examples/website_snippets/SnippetsTest.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/test/java/org/apache/beam/examples/website_snippets/SnippetsTest.java b/examples/java8/src/test/java/org/apache/beam/examples/website_snippets/SnippetsTest.java new file mode 100644 index 0000000..3ca6c9a --- /dev/null +++ b/examples/java8/src/test/java/org/apache/beam/examples/website_snippets/SnippetsTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.join.CoGbkResult; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TupleTag; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + + +/** + * Tests for Snippets. + */ +@RunWith(JUnit4.class) +public class SnippetsTest implements Serializable { + + @Rule + public transient TestPipeline p = TestPipeline.create(); + + /* Tests CoGroupByKeyTuple */ + @Test + public void testCoGroupByKeyTuple() throws IOException { + // [START CoGroupByKeyTupleInputs] + final List<KV<String, String>> emailsList = Arrays.asList( + KV.of("amy", "[email protected]"), + KV.of("carl", "[email protected]"), + KV.of("julia", "[email protected]"), + KV.of("carl", "[email protected]")); + + final List<KV<String, String>> phonesList = Arrays.asList( + KV.of("amy", "111-222-3333"), + KV.of("james", "222-333-4444"), + KV.of("amy", "333-444-5555"), + KV.of("carl", "444-555-6666")); + + PCollection<KV<String, String>> emails = p.apply("CreateEmails", Create.of(emailsList)); + PCollection<KV<String, String>> phones = p.apply("CreatePhones", Create.of(phonesList)); + // [END CoGroupByKeyTupleInputs] + + // [START CoGroupByKeyTupleOutputs] + final TupleTag<String> emailsTag = new TupleTag(); + final TupleTag<String> phonesTag = new TupleTag(); + + final List<KV<String, CoGbkResult>> expectedResults = Arrays.asList( + KV.of("amy", CoGbkResult + .of(emailsTag, Arrays.asList("[email protected]")) + .and(phonesTag, Arrays.asList("111-222-3333", "333-444-5555"))), + KV.of("carl", CoGbkResult + .of(emailsTag, Arrays.asList("[email protected]", "[email protected]")) + .and(phonesTag, Arrays.asList("444-555-6666"))), + KV.of("james", CoGbkResult + .of(emailsTag, Arrays.asList()) + .and(phonesTag, Arrays.asList("222-333-4444"))), + KV.of("julia", CoGbkResult + .of(emailsTag, Arrays.asList("[email protected]")) + .and(phonesTag, Arrays.asList()))); + // [END CoGroupByKeyTupleOutputs] + + PCollection<String> actualFormattedResults = + Snippets.coGroupByKeyTuple(emailsTag, phonesTag, emails, phones); + + // [START CoGroupByKeyTupleFormattedOutputs] + final List<String> formattedResults = Arrays.asList( + "amy; ['[email protected]']; ['111-222-3333', '333-444-5555']", + "carl; ['[email protected]', '[email protected]']; ['444-555-6666']", + "james; []; ['222-333-4444']", + "julia; ['[email protected]']; []"); + // [END CoGroupByKeyTupleFormattedOutputs] + + // Make sure that both 'expectedResults' and 'actualFormattedResults' match with the + // 'formattedResults'. 'expectedResults' will have to be formatted before comparing + List<String> expectedFormattedResultsList = new ArrayList<String>(expectedResults.size()); + for (KV<String, CoGbkResult> e : expectedResults) { + String name = e.getKey(); + Iterable<String> emailsIter = e.getValue().getAll(emailsTag); + Iterable<String> phonesIter = e.getValue().getAll(phonesTag); + String formattedResult = Snippets.formatCoGbkResults(name, emailsIter, phonesIter); + expectedFormattedResultsList.add(formattedResult); + } + PCollection<String> expectedFormattedResultsPColl = + p.apply(Create.of(expectedFormattedResultsList)); + PAssert.that(expectedFormattedResultsPColl).containsInAnyOrder(formattedResults); + PAssert.that(actualFormattedResults).containsInAnyOrder(formattedResults); + + p.run(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/9f1db3f1/sdks/python/apache_beam/examples/snippets/snippets.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index a7751a7..6cc96ef 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -1149,39 +1149,26 @@ def model_group_by_key(contents, output_path): | beam.io.WriteToText(output_path)) -def model_co_group_by_key_tuple(email_list, phone_list, output_path): +def model_co_group_by_key_tuple(emails, phones, output_path): """Applying a CoGroupByKey Transform to a tuple.""" import apache_beam as beam - with TestPipeline() as p: # Use TestPipeline for testing. - # [START model_group_by_key_cogroupbykey_tuple] - # Each data set is represented by key-value pairs in separate PCollections. - # Both data sets share a common key type (in this example str). - # The email_list contains values such as: ('joe', '[email protected]') with - # multiple possible values for each key. - # The phone_list contains values such as: ('mary': '111-222-3333') with - # multiple possible values for each key. - emails_pcoll = p | 'create emails' >> beam.Create(email_list) - phones_pcoll = p | 'create phones' >> beam.Create(phone_list) - - # The result PCollection contains one key-value element for each key in the - # input PCollections. The key of the pair will be the key from the input and - # the value will be a dictionary with two entries: 'emails' - an iterable of - # all values for the current key in the emails PCollection and 'phones': an - # iterable of all values for the current key in the phones PCollection. - # For instance, if 'emails' contained ('joe', '[email protected]') and - # ('joe', '[email protected]'), then 'result' will contain the element: - # ('joe', {'emails': ['[email protected]', '[email protected]'], 'phones': ...}) - results = ({'emails': emails_pcoll, 'phones': phones_pcoll} - | beam.CoGroupByKey()) - - def join_info(name_info): - (name, info) = name_info - return '%s; %s; %s' %\ + # [START model_group_by_key_cogroupbykey_tuple] + # The result PCollection contains one key-value element for each key in the + # input PCollections. The key of the pair will be the key from the input and + # the value will be a dictionary with two entries: 'emails' - an iterable of + # all values for the current key in the emails PCollection and 'phones': an + # iterable of all values for the current key in the phones PCollection. + results = ({'emails': emails, 'phones': phones} + | beam.CoGroupByKey()) + + def join_info(name_info): + (name, info) = name_info + return '%s; %s; %s' %\ (name, sorted(info['emails']), sorted(info['phones'])) - contact_lines = results | beam.Map(join_info) - # [END model_group_by_key_cogroupbykey_tuple] - contact_lines | beam.io.WriteToText(output_path) + contact_lines = results | beam.Map(join_info) + # [END model_group_by_key_cogroupbykey_tuple] + contact_lines | beam.io.WriteToText(output_path) def model_join_using_side_inputs( http://git-wip-us.apache.org/repos/asf/beam/blob/9f1db3f1/sdks/python/apache_beam/examples/snippets/snippets_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index 8f88ab9..505858a 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -694,22 +694,28 @@ class SnippetsTest(unittest.TestCase): self.assertEqual([str(s) for s in expected], self.get_output(result_path)) def test_model_co_group_by_key_tuple(self): - # [START model_group_by_key_cogroupbykey_tuple_inputs] - email_list = [ - ('amy', '[email protected]'), - ('carl', '[email protected]'), - ('julia', '[email protected]'), - ('carl', '[email protected]'), - ] - phone_list = [ - ('amy', '111-222-3333'), - ('james', '222-333-4444'), - ('amy', '333-444-5555'), - ('carl', '444-555-6666'), - ] - # [END model_group_by_key_cogroupbykey_tuple_inputs] - result_path = self.create_temp_file() - snippets.model_co_group_by_key_tuple(email_list, phone_list, result_path) + with TestPipeline() as p: + # [START model_group_by_key_cogroupbykey_tuple_inputs] + emails_list = [ + ('amy', '[email protected]'), + ('carl', '[email protected]'), + ('julia', '[email protected]'), + ('carl', '[email protected]'), + ] + phones_list = [ + ('amy', '111-222-3333'), + ('james', '222-333-4444'), + ('amy', '333-444-5555'), + ('carl', '444-555-6666'), + ] + + emails = p | 'CreateEmails' >> beam.Create(emails_list) + phones = p | 'CreatePhones' >> beam.Create(phones_list) + # [END model_group_by_key_cogroupbykey_tuple_inputs] + + result_path = self.create_temp_file() + snippets.model_co_group_by_key_tuple(emails, phones, result_path) + # [START model_group_by_key_cogroupbykey_tuple_outputs] results = [ ('amy', {
