This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a5923c9  Use with statements rather than explict pipeline.run().
     new 99f4c67  Merge pull request #12361 from robertwb/with-run
a5923c9 is described below

commit a5923c9dc9769b49072299378f63483cefe7c6ed
Author: Robert Bradshaw <[email protected]>
AuthorDate: Thu Jul 23 14:28:31 2020 -0700

    Use with statements rather than explict pipeline.run().
---
 .../apache_beam/transforms/ptransform_test.py      | 134 ++++++++++-----------
 .../apache_beam/transforms/sideinputs_test.py      | 106 ++++++++--------
 sdks/python/apache_beam/transforms/util_test.py    | 120 +++++++++---------
 .../apache_beam/typehints/typed_pipeline_test.py   |  76 ++++++------
 4 files changed, 212 insertions(+), 224 deletions(-)

diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py 
b/sdks/python/apache_beam/transforms/ptransform_test.py
index 7ff596a..8560af3 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -331,21 +331,22 @@ class PTransformTest(unittest.TestCase):
       def finish_bundle(self):
         yield WindowedValue('finish', 1, [windowfn])
 
-    pipeline = TestPipeline()
-    result = (
-        pipeline
-        | 'Start' >> beam.Create([1])
-        | beam.ParDo(MyDoFn())
-        | WindowInto(windowfn)
-        | 'create tuple' >> beam.Map(
-            lambda v,
-            t=beam.DoFn.TimestampParam,
-            w=beam.DoFn.WindowParam: (v, t, w.start, w.end)))
-    expected_process = [('process1', Timestamp(5), Timestamp(4), Timestamp(6))]
-    expected_finish = [('finish', Timestamp(1), Timestamp(0), Timestamp(2))]
-
-    assert_that(result, equal_to(expected_process + expected_finish))
-    pipeline.run()
+    with TestPipeline() as pipeline:
+      result = (
+          pipeline
+          | 'Start' >> beam.Create([1])
+          | beam.ParDo(MyDoFn())
+          | WindowInto(windowfn)
+          | 'create tuple' >> beam.Map(
+              lambda v,
+              t=beam.DoFn.TimestampParam,
+              w=beam.DoFn.WindowParam: (v, t, w.start, w.end)))
+      expected_process = [
+          ('process1', Timestamp(5), Timestamp(4), Timestamp(6))
+      ]
+      expected_finish = [('finish', Timestamp(1), Timestamp(0), Timestamp(2))]
+
+      assert_that(result, equal_to(expected_process + expected_finish))
 
   def test_do_fn_with_start(self):
     class MyDoFn(beam.DoFn):
@@ -474,12 +475,11 @@ class PTransformTest(unittest.TestCase):
       assert_that(result, equal_to([('a', m_1), ('b', m_2)]))
 
   def test_group_by_key(self):
-    pipeline = TestPipeline()
-    pcoll = pipeline | 'start' >> beam.Create([(1, 1), (2, 1), (3, 1), (1, 2),
-                                               (2, 2), (1, 3)])
-    result = pcoll | 'Group' >> beam.GroupByKey() | _SortLists
-    assert_that(result, equal_to([(1, [1, 2, 3]), (2, [1, 2]), (3, [1])]))
-    pipeline.run()
+    with TestPipeline() as pipeline:
+      pcoll = pipeline | 'start' >> beam.Create([(1, 1), (2, 1), (3, 1), (1, 
2),
+                                                 (2, 2), (1, 3)])
+      result = pcoll | 'Group' >> beam.GroupByKey() | _SortLists
+      assert_that(result, equal_to([(1, [1, 2, 3]), (2, [1, 2]), (3, [1])]))
 
   def test_group_by_key_reiteration(self):
     class MyDoFn(beam.DoFn):
@@ -533,14 +533,13 @@ class PTransformTest(unittest.TestCase):
 
   def test_partition_followed_by_flatten_and_groupbykey(self):
     """Regression test for an issue with how partitions are handled."""
-    pipeline = TestPipeline()
-    contents = [('aa', 1), ('bb', 2), ('aa', 2)]
-    created = pipeline | 'A' >> beam.Create(contents)
-    partitioned = created | 'B' >> beam.Partition(lambda x, n: len(x) % n, 3)
-    flattened = partitioned | 'C' >> beam.Flatten()
-    grouped = flattened | 'D' >> beam.GroupByKey() | _SortLists
-    assert_that(grouped, equal_to([('aa', [1, 2]), ('bb', [2])]))
-    pipeline.run()
+    with TestPipeline() as pipeline:
+      contents = [('aa', 1), ('bb', 2), ('aa', 2)]
+      created = pipeline | 'A' >> beam.Create(contents)
+      partitioned = created | 'B' >> beam.Partition(lambda x, n: len(x) % n, 3)
+      flattened = partitioned | 'C' >> beam.Flatten()
+      grouped = flattened | 'D' >> beam.GroupByKey() | _SortLists
+      assert_that(grouped, equal_to([('aa', [1, 2]), ('bb', [2])]))
 
   @attr('ValidatesRunner')
   def test_flatten_pcollections(self):
@@ -619,50 +618,47 @@ class PTransformTest(unittest.TestCase):
       assert_that(odd_length, equal_to(['BBB']), label='assert:odd')
 
   def test_co_group_by_key_on_list(self):
-    pipeline = TestPipeline()
-    pcoll_1 = pipeline | 'Start 1' >> beam.Create([('a', 1), ('a', 2), ('b', 
3),
-                                                   ('c', 4)])
-    pcoll_2 = pipeline | 'Start 2' >> beam.Create([('a', 5), ('a', 6), ('c', 
7),
-                                                   ('c', 8)])
-    result = (pcoll_1, pcoll_2) | beam.CoGroupByKey() | _SortLists
-    assert_that(
-        result,
-        equal_to([('a', ([1, 2], [5, 6])), ('b', ([3], [])),
-                  ('c', ([4], [7, 8]))]))
-    pipeline.run()
+    with TestPipeline() as pipeline:
+      pcoll_1 = pipeline | 'Start 1' >> beam.Create([('a', 1), ('a', 2),
+                                                     ('b', 3), ('c', 4)])
+      pcoll_2 = pipeline | 'Start 2' >> beam.Create([('a', 5), ('a', 6),
+                                                     ('c', 7), ('c', 8)])
+      result = (pcoll_1, pcoll_2) | beam.CoGroupByKey() | _SortLists
+      assert_that(
+          result,
+          equal_to([('a', ([1, 2], [5, 6])), ('b', ([3], [])),
+                    ('c', ([4], [7, 8]))]))
 
   def test_co_group_by_key_on_iterable(self):
-    pipeline = TestPipeline()
-    pcoll_1 = pipeline | 'Start 1' >> beam.Create([('a', 1), ('a', 2), ('b', 
3),
-                                                   ('c', 4)])
-    pcoll_2 = pipeline | 'Start 2' >> beam.Create([('a', 5), ('a', 6), ('c', 
7),
-                                                   ('c', 8)])
-    result = [pc for pc in (pcoll_1, pcoll_2)] | beam.CoGroupByKey()
-    result |= _SortLists
-    assert_that(
-        result,
-        equal_to([('a', ([1, 2], [5, 6])), ('b', ([3], [])),
-                  ('c', ([4], [7, 8]))]))
-    pipeline.run()
+    with TestPipeline() as pipeline:
+      pcoll_1 = pipeline | 'Start 1' >> beam.Create([('a', 1), ('a', 2),
+                                                     ('b', 3), ('c', 4)])
+      pcoll_2 = pipeline | 'Start 2' >> beam.Create([('a', 5), ('a', 6),
+                                                     ('c', 7), ('c', 8)])
+      result = [pc for pc in (pcoll_1, pcoll_2)] | beam.CoGroupByKey()
+      result |= _SortLists
+      assert_that(
+          result,
+          equal_to([('a', ([1, 2], [5, 6])), ('b', ([3], [])),
+                    ('c', ([4], [7, 8]))]))
 
   def test_co_group_by_key_on_dict(self):
-    pipeline = TestPipeline()
-    pcoll_1 = pipeline | 'Start 1' >> beam.Create([('a', 1), ('a', 2), ('b', 
3),
-                                                   ('c', 4)])
-    pcoll_2 = pipeline | 'Start 2' >> beam.Create([('a', 5), ('a', 6), ('c', 
7),
-                                                   ('c', 8)])
-    result = {'X': pcoll_1, 'Y': pcoll_2} | beam.CoGroupByKey()
-    result |= _SortLists
-    assert_that(
-        result,
-        equal_to([('a', {
-            'X': [1, 2], 'Y': [5, 6]
-        }), ('b', {
-            'X': [3], 'Y': []
-        }), ('c', {
-            'X': [4], 'Y': [7, 8]
-        })]))
-    pipeline.run()
+    with TestPipeline() as pipeline:
+      pcoll_1 = pipeline | 'Start 1' >> beam.Create([('a', 1), ('a', 2),
+                                                     ('b', 3), ('c', 4)])
+      pcoll_2 = pipeline | 'Start 2' >> beam.Create([('a', 5), ('a', 6),
+                                                     ('c', 7), ('c', 8)])
+      result = {'X': pcoll_1, 'Y': pcoll_2} | beam.CoGroupByKey()
+      result |= _SortLists
+      assert_that(
+          result,
+          equal_to([('a', {
+              'X': [1, 2], 'Y': [5, 6]
+          }), ('b', {
+              'X': [3], 'Y': []
+          }), ('c', {
+              'X': [4], 'Y': [7, 8]
+          })]))
 
   def test_group_by_key_input_must_be_kv_pairs(self):
     with self.assertRaises(typehints.TypeCheckError) as e:
diff --git a/sdks/python/apache_beam/transforms/sideinputs_test.py 
b/sdks/python/apache_beam/transforms/sideinputs_test.py
index 1bcf47a..c8d118e 100644
--- a/sdks/python/apache_beam/transforms/sideinputs_test.py
+++ b/sdks/python/apache_beam/transforms/sideinputs_test.py
@@ -348,60 +348,58 @@ class SideInputsTest(unittest.TestCase):
     # This flag is only necessary when using the multi-output TestStream b/c
     # it relies on using the PCollection output tags as the PCollection output
     # ids.
-    p = TestPipeline()
-
-    test_stream = (
-        p
-        | 'Mixed TestStream' >> TestStream().advance_watermark_to(
-            3,
-            tag='main').add_elements(['a1'], tag='main').advance_watermark_to(
-                8, tag='main').add_elements(['a2'], tag='main').add_elements(
-                    [window.TimestampedValue(('k', 100), 2)], tag='side').
-        add_elements([window.TimestampedValue(
-            ('k', 400), 7)], tag='side').advance_watermark_to_infinity(
-                tag='main').advance_watermark_to_infinity(tag='side'))
-
-    main_data = (
-        test_stream['main']
-        | 'Main windowInto' >> beam.WindowInto(
-            window.FixedWindows(5),
-            accumulation_mode=trigger.AccumulationMode.DISCARDING))
-
-    side_data = (
-        test_stream['side']
-        | 'Side windowInto' >> beam.WindowInto(
-            window.FixedWindows(5),
-            trigger=trigger.AfterWatermark(early=trigger.AfterCount(1)),
-            accumulation_mode=trigger.AccumulationMode.DISCARDING)
-        | beam.CombinePerKey(sum)
-        | 'Values' >> Map(lambda k_vs: k_vs[1]))
-
-    class RecordFn(beam.DoFn):
-      def process(
-          self,
-          elm=beam.DoFn.ElementParam,
-          ts=beam.DoFn.TimestampParam,
-          side=beam.DoFn.SideInputParam):
-        yield (elm, ts, side)
-
-    records = (
-        main_data
-        | beam.ParDo(RecordFn(), beam.pvalue.AsList(side_data)))
-
-    expected_window_to_elements = {
-        window.IntervalWindow(0, 5): [
-            ('a1', Timestamp(3), [100, 0]),
-        ],
-        window.IntervalWindow(5, 10): [('a2', Timestamp(8), [400, 0])],
-    }
-
-    assert_that(
-        records,
-        equal_to_per_window(expected_window_to_elements),
-        use_global_window=False,
-        label='assert per window')
-
-    p.run()
+    with TestPipeline() as p:
+
+      test_stream = (
+          p
+          | 'Mixed TestStream' >> TestStream().advance_watermark_to(
+              3,
+              tag='main').add_elements(['a1'], 
tag='main').advance_watermark_to(
+                  8, tag='main').add_elements(['a2'], tag='main').add_elements(
+                      [window.TimestampedValue(('k', 100), 2)], tag='side').
+          add_elements([window.TimestampedValue(
+              ('k', 400), 7)], tag='side').advance_watermark_to_infinity(
+                  tag='main').advance_watermark_to_infinity(tag='side'))
+
+      main_data = (
+          test_stream['main']
+          | 'Main windowInto' >> beam.WindowInto(
+              window.FixedWindows(5),
+              accumulation_mode=trigger.AccumulationMode.DISCARDING))
+
+      side_data = (
+          test_stream['side']
+          | 'Side windowInto' >> beam.WindowInto(
+              window.FixedWindows(5),
+              trigger=trigger.AfterWatermark(early=trigger.AfterCount(1)),
+              accumulation_mode=trigger.AccumulationMode.DISCARDING)
+          | beam.CombinePerKey(sum)
+          | 'Values' >> Map(lambda k_vs: k_vs[1]))
+
+      class RecordFn(beam.DoFn):
+        def process(
+            self,
+            elm=beam.DoFn.ElementParam,
+            ts=beam.DoFn.TimestampParam,
+            side=beam.DoFn.SideInputParam):
+          yield (elm, ts, side)
+
+      records = (
+          main_data
+          | beam.ParDo(RecordFn(), beam.pvalue.AsList(side_data)))
+
+      expected_window_to_elements = {
+          window.IntervalWindow(0, 5): [
+              ('a1', Timestamp(3), [100, 0]),
+          ],
+          window.IntervalWindow(5, 10): [('a2', Timestamp(8), [400, 0])],
+      }
+
+      assert_that(
+          records,
+          equal_to_per_window(expected_window_to_elements),
+          use_global_window=False,
+          label='assert per window')
 
 
 if __name__ == '__main__':
diff --git a/sdks/python/apache_beam/transforms/util_test.py 
b/sdks/python/apache_beam/transforms/util_test.py
index ef88073..f22ea2c 100644
--- a/sdks/python/apache_beam/transforms/util_test.py
+++ b/sdks/python/apache_beam/transforms/util_test.py
@@ -361,20 +361,19 @@ class ReshuffleTest(unittest.TestCase):
       assert_that(result, equal_to(data))
 
   def test_reshuffle_after_gbk_contents_unchanged(self):
-    pipeline = TestPipeline()
-    data = [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 3)]
-    expected_result = [(1, [1, 2, 3]), (2, [1, 2]), (3, [1])]
-
-    after_gbk = (
-        pipeline
-        | beam.Create(data)
-        | beam.GroupByKey()
-        | beam.MapTuple(lambda k, vs: (k, sorted(vs))))
-    assert_that(after_gbk, equal_to(expected_result), label='after_gbk')
-    after_reshuffle = after_gbk | beam.Reshuffle()
-    assert_that(
-        after_reshuffle, equal_to(expected_result), label='after_reshuffle')
-    pipeline.run()
+    with TestPipeline() as pipeline:
+      data = [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 3)]
+      expected_result = [(1, [1, 2, 3]), (2, [1, 2]), (3, [1])]
+
+      after_gbk = (
+          pipeline
+          | beam.Create(data)
+          | beam.GroupByKey()
+          | beam.MapTuple(lambda k, vs: (k, sorted(vs))))
+      assert_that(after_gbk, equal_to(expected_result), label='after_gbk')
+      after_reshuffle = after_gbk | beam.Reshuffle()
+      assert_that(
+          after_reshuffle, equal_to(expected_result), label='after_reshuffle')
 
   def test_reshuffle_timestamps_unchanged(self):
     with TestPipeline() as pipeline:
@@ -482,60 +481,57 @@ class ReshuffleTest(unittest.TestCase):
           reify_windows=True)
 
   def test_reshuffle_global_window(self):
-    pipeline = TestPipeline()
-    data = [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 4)]
-    expected_data = [(1, [1, 2, 4]), (2, [1, 2]), (3, [1])]
-    before_reshuffle = (
-        pipeline
-        | beam.Create(data)
-        | beam.WindowInto(GlobalWindows())
-        | beam.GroupByKey()
-        | beam.MapTuple(lambda k, vs: (k, sorted(vs))))
-    assert_that(
-        before_reshuffle, equal_to(expected_data), label='before_reshuffle')
-    after_reshuffle = before_reshuffle | beam.Reshuffle()
-    assert_that(
-        after_reshuffle, equal_to(expected_data), label='after reshuffle')
-    pipeline.run()
+    with TestPipeline() as pipeline:
+      data = [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 4)]
+      expected_data = [(1, [1, 2, 4]), (2, [1, 2]), (3, [1])]
+      before_reshuffle = (
+          pipeline
+          | beam.Create(data)
+          | beam.WindowInto(GlobalWindows())
+          | beam.GroupByKey()
+          | beam.MapTuple(lambda k, vs: (k, sorted(vs))))
+      assert_that(
+          before_reshuffle, equal_to(expected_data), label='before_reshuffle')
+      after_reshuffle = before_reshuffle | beam.Reshuffle()
+      assert_that(
+          after_reshuffle, equal_to(expected_data), label='after reshuffle')
 
   def test_reshuffle_sliding_window(self):
-    pipeline = TestPipeline()
-    data = [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 4)]
-    window_size = 2
-    expected_data = [(1, [1, 2, 4]), (2, [1, 2]), (3, [1])] * window_size
-    before_reshuffle = (
-        pipeline
-        | beam.Create(data)
-        | beam.WindowInto(SlidingWindows(size=window_size, period=1))
-        | beam.GroupByKey()
-        | beam.MapTuple(lambda k, vs: (k, sorted(vs))))
-    assert_that(
-        before_reshuffle, equal_to(expected_data), label='before_reshuffle')
-    after_reshuffle = before_reshuffle | beam.Reshuffle()
-    # If Reshuffle applies the sliding window function a second time there
-    # should be extra values for each key.
-    assert_that(
-        after_reshuffle, equal_to(expected_data), label='after reshuffle')
-    pipeline.run()
+    with TestPipeline() as pipeline:
+      data = [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 4)]
+      window_size = 2
+      expected_data = [(1, [1, 2, 4]), (2, [1, 2]), (3, [1])] * window_size
+      before_reshuffle = (
+          pipeline
+          | beam.Create(data)
+          | beam.WindowInto(SlidingWindows(size=window_size, period=1))
+          | beam.GroupByKey()
+          | beam.MapTuple(lambda k, vs: (k, sorted(vs))))
+      assert_that(
+          before_reshuffle, equal_to(expected_data), label='before_reshuffle')
+      after_reshuffle = before_reshuffle | beam.Reshuffle()
+      # If Reshuffle applies the sliding window function a second time there
+      # should be extra values for each key.
+      assert_that(
+          after_reshuffle, equal_to(expected_data), label='after reshuffle')
 
   def test_reshuffle_streaming_global_window(self):
     options = PipelineOptions()
     options.view_as(StandardOptions).streaming = True
-    pipeline = TestPipeline(options=options)
-    data = [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 4)]
-    expected_data = [(1, [1, 2, 4]), (2, [1, 2]), (3, [1])]
-    before_reshuffle = (
-        pipeline
-        | beam.Create(data)
-        | beam.WindowInto(GlobalWindows())
-        | beam.GroupByKey()
-        | beam.MapTuple(lambda k, vs: (k, sorted(vs))))
-    assert_that(
-        before_reshuffle, equal_to(expected_data), label='before_reshuffle')
-    after_reshuffle = before_reshuffle | beam.Reshuffle()
-    assert_that(
-        after_reshuffle, equal_to(expected_data), label='after reshuffle')
-    pipeline.run()
+    with TestPipeline(options=options) as pipeline:
+      data = [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 4)]
+      expected_data = [(1, [1, 2, 4]), (2, [1, 2]), (3, [1])]
+      before_reshuffle = (
+          pipeline
+          | beam.Create(data)
+          | beam.WindowInto(GlobalWindows())
+          | beam.GroupByKey()
+          | beam.MapTuple(lambda k, vs: (k, sorted(vs))))
+      assert_that(
+          before_reshuffle, equal_to(expected_data), label='before_reshuffle')
+      after_reshuffle = before_reshuffle | beam.Reshuffle()
+      assert_that(
+          after_reshuffle, equal_to(expected_data), label='after reshuffle')
 
   @attr('ValidatesRunner')
   def test_reshuffle_preserves_timestamps(self):
diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py 
b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
index 7cf26f5..6d72fa8 100644
--- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py
+++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
@@ -156,27 +156,26 @@ class MainInputTest(unittest.TestCase):
         else:
           yield beam.pvalue.TaggedOutput('even', element)
 
-    p = TestPipeline()
-    res = (
-        p
-        | beam.Create([1, 2, 3])
-        | beam.ParDo(MyDoFn()).with_outputs('odd', 'even'))
-    self.assertIsNotNone(res[None].element_type)
-    self.assertIsNotNone(res['even'].element_type)
-    self.assertIsNotNone(res['odd'].element_type)
-    res_main = (
-        res[None]
-        | 'id_none' >> beam.ParDo(lambda e: [e]).with_input_types(int))
-    res_even = (
-        res['even']
-        | 'id_even' >> beam.ParDo(lambda e: [e]).with_input_types(int))
-    res_odd = (
-        res['odd']
-        | 'id_odd' >> beam.ParDo(lambda e: [e]).with_input_types(int))
-    assert_that(res_main, equal_to([]), label='none_check')
-    assert_that(res_even, equal_to([2]), label='even_check')
-    assert_that(res_odd, equal_to([1, 3]), label='odd_check')
-    p.run()
+    with TestPipeline() as p:
+      res = (
+          p
+          | beam.Create([1, 2, 3])
+          | beam.ParDo(MyDoFn()).with_outputs('odd', 'even'))
+      self.assertIsNotNone(res[None].element_type)
+      self.assertIsNotNone(res['even'].element_type)
+      self.assertIsNotNone(res['odd'].element_type)
+      res_main = (
+          res[None]
+          | 'id_none' >> beam.ParDo(lambda e: [e]).with_input_types(int))
+      res_even = (
+          res['even']
+          | 'id_even' >> beam.ParDo(lambda e: [e]).with_input_types(int))
+      res_odd = (
+          res['odd']
+          | 'id_odd' >> beam.ParDo(lambda e: [e]).with_input_types(int))
+      assert_that(res_main, equal_to([]), label='none_check')
+      assert_that(res_even, equal_to([2]), label='even_check')
+      assert_that(res_odd, equal_to([1, 3]), label='odd_check')
 
     with self.assertRaises(ValueError):
       _ = res['undeclared tag']
@@ -189,24 +188,23 @@ class MainInputTest(unittest.TestCase):
         else:
           yield beam.pvalue.TaggedOutput('even', element)
 
-    p = TestPipeline()
-    res = (p | beam.Create([1, 2, 3]) | beam.ParDo(MyDoFn()).with_outputs())
-    self.assertIsNotNone(res[None].element_type)
-    self.assertIsNotNone(res['even'].element_type)
-    self.assertIsNotNone(res['odd'].element_type)
-    res_main = (
-        res[None]
-        | 'id_none' >> beam.ParDo(lambda e: [e]).with_input_types(int))
-    res_even = (
-        res['even']
-        | 'id_even' >> beam.ParDo(lambda e: [e]).with_input_types(int))
-    res_odd = (
-        res['odd']
-        | 'id_odd' >> beam.ParDo(lambda e: [e]).with_input_types(int))
-    assert_that(res_main, equal_to([]), label='none_check')
-    assert_that(res_even, equal_to([2]), label='even_check')
-    assert_that(res_odd, equal_to([1, 3]), label='odd_check')
-    p.run()
+    with TestPipeline() as p:
+      res = (p | beam.Create([1, 2, 3]) | beam.ParDo(MyDoFn()).with_outputs())
+      self.assertIsNotNone(res[None].element_type)
+      self.assertIsNotNone(res['even'].element_type)
+      self.assertIsNotNone(res['odd'].element_type)
+      res_main = (
+          res[None]
+          | 'id_none' >> beam.ParDo(lambda e: [e]).with_input_types(int))
+      res_even = (
+          res['even']
+          | 'id_even' >> beam.ParDo(lambda e: [e]).with_input_types(int))
+      res_odd = (
+          res['odd']
+          | 'id_odd' >> beam.ParDo(lambda e: [e]).with_input_types(int))
+      assert_that(res_main, equal_to([]), label='none_check')
+      assert_that(res_even, equal_to([2]), label='even_check')
+      assert_that(res_odd, equal_to([1, 3]), label='odd_check')
 
 
 class NativeTypesTest(unittest.TestCase):

Reply via email to