Repository: beam Updated Branches: refs/heads/master f8e119292 -> e9d3a4a79
Added concrete example for CoGroupByKey snippet Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/92676a5d Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/92676a5d Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/92676a5d Branch: refs/heads/master Commit: 92676a5d8b79dc99cd805a8a36b14fa1ef3007f5 Parents: f8e1192 Author: David Cavazos <[email protected]> Authored: Thu Aug 31 12:24:46 2017 -0700 Committer: Ahmet Altay <[email protected]> Committed: Fri Sep 8 11:26:46 2017 -0700 ---------------------------------------------------------------------- .../apache_beam/examples/snippets/snippets.py | 20 +++++++-------- .../examples/snippets/snippets_test.py | 27 +++++++++++++++++--- 2 files changed, 33 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/92676a5d/sdks/python/apache_beam/examples/snippets/snippets.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index 46696f4..eac87a2 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -1148,24 +1148,24 @@ def model_co_group_by_key_tuple(email_list, phone_list, output_path): # multiple possible values for each key. # The phone_list contains values such as: ('mary': '111-222-3333') with # multiple possible values for each key. - emails = p | 'email' >> beam.Create(email_list) - phones = p | 'phone' >> beam.Create(phone_list) + emails_pcoll = p | 'create emails' >> beam.Create(email_list) + phones_pcoll = p | 'create phones' >> beam.Create(phone_list) + # The result PCollection contains one key-value element for each key in the # input PCollections. The key of the pair will be the key from the input and # the value will be a dictionary with two entries: 'emails' - an iterable of # all values for the current key in the emails PCollection and 'phones': an # iterable of all values for the current key in the phones PCollection. # For instance, if 'emails' contained ('joe', '[email protected]') and - # ('joe', '[email protected]'), then 'result' will contain the element + # ('joe', '[email protected]'), then 'result' will contain the element: # ('joe', {'emails': ['[email protected]', '[email protected]'], 'phones': ...}) - result = {'emails': emails, 'phones': phones} | beam.CoGroupByKey() - - def join_info((name, info)): - return '; '.join(['%s' % name, - '%s' % ','.join(info['emails']), - '%s' % ','.join(info['phones'])]) + result = ({'emails': emails_pcoll, 'phones': phones_pcoll} + | beam.CoGroupByKey()) - contact_lines = result | beam.Map(join_info) + contact_lines = result | beam.Map( + lambda (name, info):\ + '%s; %s; %s' %\ + (name, sorted(info['emails']), sorted(info['phones']))) # [END model_group_by_key_cogroupbykey_tuple] contact_lines | beam.io.WriteToText(output_path) http://git-wip-us.apache.org/repos/asf/beam/blob/92676a5d/sdks/python/apache_beam/examples/snippets/snippets_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index ee1e50e..a700ba5 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -694,12 +694,31 @@ class SnippetsTest(unittest.TestCase): self.assertEqual([str(s) for s in expected], self.get_output(result_path)) def test_model_co_group_by_key_tuple(self): - email_list = [['a', '[email protected]'], ['b', '[email protected]']] - phone_list = [['a', 'x4312'], ['b', 'x8452']] + # [START model_group_by_key_cogroupbykey_tuple_inputs] + email_list = [ + ('amy', '[email protected]'), + ('carl', '[email protected]'), + ('julia', '[email protected]'), + ('carl', '[email protected]'), + ] + phone_list = [ + ('amy', '111-222-3333'), + ('james', '222-333-4444'), + ('amy', '333-444-5555'), + ('carl', '444-555-6666'), + ] + # [END model_group_by_key_cogroupbykey_tuple_inputs] result_path = self.create_temp_file() snippets.model_co_group_by_key_tuple(email_list, phone_list, result_path) - expect = ['a; [email protected]; x4312', 'b; [email protected]; x8452'] - self.assertEqual(expect, self.get_output(result_path)) + # [START model_group_by_key_cogroupbykey_tuple_outputs] + contact_lines = [ + "amy; ['[email protected]']; ['111-222-3333', '333-444-5555']", + "carl; ['[email protected]', '[email protected]']; ['444-555-6666']", + "james; []; ['222-333-4444']", + "julia; ['[email protected]']; []", + ] + # [END model_group_by_key_cogroupbykey_tuple_outputs] + self.assertEqual(contact_lines, self.get_output(result_path)) def test_model_use_and_query_metrics(self): """DebuggingWordCount example snippets."""
