Repository: beam
Updated Branches:
  refs/heads/master f8e119292 -> e9d3a4a79


Added concrete example for CoGroupByKey snippet


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/92676a5d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/92676a5d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/92676a5d

Branch: refs/heads/master
Commit: 92676a5d8b79dc99cd805a8a36b14fa1ef3007f5
Parents: f8e1192
Author: David Cavazos <[email protected]>
Authored: Thu Aug 31 12:24:46 2017 -0700
Committer: Ahmet Altay <[email protected]>
Committed: Fri Sep 8 11:26:46 2017 -0700

----------------------------------------------------------------------
 .../apache_beam/examples/snippets/snippets.py   | 20 +++++++--------
 .../examples/snippets/snippets_test.py          | 27 +++++++++++++++++---
 2 files changed, 33 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/92676a5d/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py 
b/sdks/python/apache_beam/examples/snippets/snippets.py
index 46696f4..eac87a2 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -1148,24 +1148,24 @@ def model_co_group_by_key_tuple(email_list, phone_list, 
output_path):
     # multiple possible values for each key.
     # The phone_list contains values such as: ('mary': '111-222-3333') with
     # multiple possible values for each key.
-    emails = p | 'email' >> beam.Create(email_list)
-    phones = p | 'phone' >> beam.Create(phone_list)
+    emails_pcoll = p | 'create emails' >> beam.Create(email_list)
+    phones_pcoll = p | 'create phones' >> beam.Create(phone_list)
+
     # The result PCollection contains one key-value element for each key in the
     # input PCollections. The key of the pair will be the key from the input 
and
     # the value will be a dictionary with two entries: 'emails' - an iterable 
of
     # all values for the current key in the emails PCollection and 'phones': an
     # iterable of all values for the current key in the phones PCollection.
     # For instance, if 'emails' contained ('joe', '[email protected]') and
-    # ('joe', '[email protected]'), then 'result' will contain the element
+    # ('joe', '[email protected]'), then 'result' will contain the element:
     # ('joe', {'emails': ['[email protected]', '[email protected]'], 'phones': 
...})
-    result = {'emails': emails, 'phones': phones} | beam.CoGroupByKey()
-
-    def join_info((name, info)):
-      return '; '.join(['%s' % name,
-                        '%s' % ','.join(info['emails']),
-                        '%s' % ','.join(info['phones'])])
+    result = ({'emails': emails_pcoll, 'phones': phones_pcoll}
+              | beam.CoGroupByKey())
 
-    contact_lines = result | beam.Map(join_info)
+    contact_lines = result | beam.Map(
+        lambda (name, info):\
+           '%s; %s; %s' %\
+           (name, sorted(info['emails']), sorted(info['phones'])))
     # [END model_group_by_key_cogroupbykey_tuple]
     contact_lines | beam.io.WriteToText(output_path)
 

http://git-wip-us.apache.org/repos/asf/beam/blob/92676a5d/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py 
b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index ee1e50e..a700ba5 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -694,12 +694,31 @@ class SnippetsTest(unittest.TestCase):
     self.assertEqual([str(s) for s in expected], self.get_output(result_path))
 
   def test_model_co_group_by_key_tuple(self):
-    email_list = [['a', '[email protected]'], ['b', '[email protected]']]
-    phone_list = [['a', 'x4312'], ['b', 'x8452']]
+    # [START model_group_by_key_cogroupbykey_tuple_inputs]
+    email_list = [
+        ('amy', '[email protected]'),
+        ('carl', '[email protected]'),
+        ('julia', '[email protected]'),
+        ('carl', '[email protected]'),
+    ]
+    phone_list = [
+        ('amy', '111-222-3333'),
+        ('james', '222-333-4444'),
+        ('amy', '333-444-5555'),
+        ('carl', '444-555-6666'),
+    ]
+    # [END model_group_by_key_cogroupbykey_tuple_inputs]
     result_path = self.create_temp_file()
     snippets.model_co_group_by_key_tuple(email_list, phone_list, result_path)
-    expect = ['a; [email protected]; x4312', 'b; [email protected]; x8452']
-    self.assertEqual(expect, self.get_output(result_path))
+    # [START model_group_by_key_cogroupbykey_tuple_outputs]
+    contact_lines = [
+        "amy; ['[email protected]']; ['111-222-3333', '333-444-5555']",
+        "carl; ['[email protected]', '[email protected]']; ['444-555-6666']",
+        "james; []; ['222-333-4444']",
+        "julia; ['[email protected]']; []",
+    ]
+    # [END model_group_by_key_cogroupbykey_tuple_outputs]
+    self.assertEqual(contact_lines, self.get_output(result_path))
 
   def test_model_use_and_query_metrics(self):
     """DebuggingWordCount example snippets."""

Reply via email to