udim commented on a change in pull request #15535:
URL: https://github.com/apache/beam/pull/15535#discussion_r712429108



##########
File path: sdks/python/apache_beam/transforms/util_test.py
##########
@@ -72,6 +73,123 @@
     'ignore', category=FutureWarning, module='apache_beam.transform.util_test')
 
 
+class CoGroupByKeyTest(unittest.TestCase):
+  def test_co_group_by_key_on_tuple(self):
+    with TestPipeline() as pipeline:
+      pcoll_1 = pipeline | 'Start 1' >> beam.Create([('a', 1), ('a', 2),
+                                                     ('b', 3), ('c', 4)])
+      pcoll_2 = pipeline | 'Start 2' >> beam.Create([('a', 5), ('a', 6),
+                                                     ('c', 7), ('c', 8)])
+      result = (pcoll_1, pcoll_2) | beam.CoGroupByKey() | SortLists
+      assert_that(
+          result,
+          equal_to([('a', ([1, 2], [5, 6])), ('b', ([3], [])),
+                    ('c', ([4], [7, 8]))]))
+
+  def test_co_group_by_key_on_iterable(self):
+    with TestPipeline() as pipeline:
+      pcoll_1 = pipeline | 'Start 1' >> beam.Create([('a', 1), ('a', 2),
+                                                     ('b', 3), ('c', 4)])
+      pcoll_2 = pipeline | 'Start 2' >> beam.Create([('a', 5), ('a', 6),
+                                                     ('c', 7), ('c', 8)])
+      result = iter([pcoll_1, pcoll_2]) | beam.CoGroupByKey() | SortLists
+      assert_that(
+          result,
+          equal_to([('a', ([1, 2], [5, 6])), ('b', ([3], [])),
+                    ('c', ([4], [7, 8]))]))
+
+  def test_co_group_by_key_on_list(self):
+    with TestPipeline() as pipeline:
+      pcoll_1 = pipeline | 'Start 1' >> beam.Create([('a', 1), ('a', 2),
+                                                     ('b', 3), ('c', 4)])
+      pcoll_2 = pipeline | 'Start 2' >> beam.Create([('a', 5), ('a', 6),
+                                                     ('c', 7), ('c', 8)])
+      result = [pcoll_1, pcoll_2] | beam.CoGroupByKey() | SortLists
+      assert_that(
+          result,
+          equal_to([('a', ([1, 2], [5, 6])), ('b', ([3], [])),
+                    ('c', ([4], [7, 8]))]))
+
+  def test_co_group_by_key_on_dict(self):
+    with TestPipeline() as pipeline:
+      pcoll_1 = pipeline | 'Start 1' >> beam.Create([('a', 1), ('a', 2),
+                                                     ('b', 3), ('c', 4)])
+      pcoll_2 = pipeline | 'Start 2' >> beam.Create([('a', 5), ('a', 6),
+                                                     ('c', 7), ('c', 8)])
+      result = {'X': pcoll_1, 'Y': pcoll_2} | beam.CoGroupByKey() | SortLists
+      assert_that(
+          result,
+          equal_to([('a', {
+              'X': [1, 2], 'Y': [5, 6]
+          }), ('b', {
+              'X': [3], 'Y': []
+          }), ('c', {
+              'X': [4], 'Y': [7, 8]
+          })]))
+
+  def test_co_group_by_key_on_dict_with_tuple_keys(self):
+    with TestPipeline() as pipeline:
+      key = ('a', ('b', 'c'))
+      pcoll_1 = pipeline | 'Start 1' >> beam.Create([(key, 1)])
+      pcoll_2 = pipeline | 'Start 2' >> beam.Create([(key, 2)])
+      result = {'X': pcoll_1, 'Y': pcoll_2} | beam.CoGroupByKey() | SortLists
+      assert_that(result, equal_to([(key, {'X': [1], 'Y': [2]})]))
+
+  def test_co_group_by_key_on_empty(self):
+    with TestPipeline() as pipeline:
+      assert_that(
+          tuple() | 'EmptyTuple' >> beam.CoGroupByKey(pipeline=pipeline),
+          equal_to([]),
+          label='AssertEmptyTuple')
+      assert_that([] | 'EmptyList' >> beam.CoGroupByKey(pipeline=pipeline),
+                  equal_to([]),
+                  label='AssertEmptyList')
+      assert_that(
+          iter([]) | 'EmptyIterable' >> beam.CoGroupByKey(pipeline=pipeline),
+          equal_to([]),
+          label='AssertEmptyIterable')
+      assert_that({} | 'EmptyDict' >> beam.CoGroupByKey(pipeline=pipeline),
+                  equal_to([]),
+                  label='AssertEmptyDict')
+
+  def test_co_group_by_key_on_one(self):
+    with TestPipeline() as pipeline:
+      pcoll = pipeline | beam.Create([('a', 1), ('b', 2)])
+      expected = [('a', ([1], )), ('b', ([2], ))]
+      assert_that((pcoll, ) | 'OneTuple' >> beam.CoGroupByKey(),
+                  equal_to(expected),
+                  label='AssertOneTuple')
+      assert_that([pcoll] | 'OneList' >> beam.CoGroupByKey(),
+                  equal_to(expected),
+                  label='AssertOneList')
+      assert_that(
+          iter([pcoll]) | 'OneIterable' >> beam.CoGroupByKey(),
+          equal_to(expected),
+          label='AssertOneIterable')
+      assert_that({'tag': pcoll}
+                  | 'OneDict' >> beam.CoGroupByKey()
+                  | beam.MapTuple(lambda k, v: (k, (v['tag'], ))),
+                  equal_to(expected),
+                  label='AssertOneDict')
+
+  def test_co_group_by_key_on_empty(self):

Review comment:
       Duplicate test name




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to