This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new a5923c9 Use with statements rather than explict pipeline.run().
new 99f4c67 Merge pull request #12361 from robertwb/with-run
a5923c9 is described below
commit a5923c9dc9769b49072299378f63483cefe7c6ed
Author: Robert Bradshaw <[email protected]>
AuthorDate: Thu Jul 23 14:28:31 2020 -0700
Use with statements rather than explict pipeline.run().
---
.../apache_beam/transforms/ptransform_test.py | 134 ++++++++++-----------
.../apache_beam/transforms/sideinputs_test.py | 106 ++++++++--------
sdks/python/apache_beam/transforms/util_test.py | 120 +++++++++---------
.../apache_beam/typehints/typed_pipeline_test.py | 76 ++++++------
4 files changed, 212 insertions(+), 224 deletions(-)
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py
b/sdks/python/apache_beam/transforms/ptransform_test.py
index 7ff596a..8560af3 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -331,21 +331,22 @@ class PTransformTest(unittest.TestCase):
def finish_bundle(self):
yield WindowedValue('finish', 1, [windowfn])
- pipeline = TestPipeline()
- result = (
- pipeline
- | 'Start' >> beam.Create([1])
- | beam.ParDo(MyDoFn())
- | WindowInto(windowfn)
- | 'create tuple' >> beam.Map(
- lambda v,
- t=beam.DoFn.TimestampParam,
- w=beam.DoFn.WindowParam: (v, t, w.start, w.end)))
- expected_process = [('process1', Timestamp(5), Timestamp(4), Timestamp(6))]
- expected_finish = [('finish', Timestamp(1), Timestamp(0), Timestamp(2))]
-
- assert_that(result, equal_to(expected_process + expected_finish))
- pipeline.run()
+ with TestPipeline() as pipeline:
+ result = (
+ pipeline
+ | 'Start' >> beam.Create([1])
+ | beam.ParDo(MyDoFn())
+ | WindowInto(windowfn)
+ | 'create tuple' >> beam.Map(
+ lambda v,
+ t=beam.DoFn.TimestampParam,
+ w=beam.DoFn.WindowParam: (v, t, w.start, w.end)))
+ expected_process = [
+ ('process1', Timestamp(5), Timestamp(4), Timestamp(6))
+ ]
+ expected_finish = [('finish', Timestamp(1), Timestamp(0), Timestamp(2))]
+
+ assert_that(result, equal_to(expected_process + expected_finish))
def test_do_fn_with_start(self):
class MyDoFn(beam.DoFn):
@@ -474,12 +475,11 @@ class PTransformTest(unittest.TestCase):
assert_that(result, equal_to([('a', m_1), ('b', m_2)]))
def test_group_by_key(self):
- pipeline = TestPipeline()
- pcoll = pipeline | 'start' >> beam.Create([(1, 1), (2, 1), (3, 1), (1, 2),
- (2, 2), (1, 3)])
- result = pcoll | 'Group' >> beam.GroupByKey() | _SortLists
- assert_that(result, equal_to([(1, [1, 2, 3]), (2, [1, 2]), (3, [1])]))
- pipeline.run()
+ with TestPipeline() as pipeline:
+ pcoll = pipeline | 'start' >> beam.Create([(1, 1), (2, 1), (3, 1), (1,
2),
+ (2, 2), (1, 3)])
+ result = pcoll | 'Group' >> beam.GroupByKey() | _SortLists
+ assert_that(result, equal_to([(1, [1, 2, 3]), (2, [1, 2]), (3, [1])]))
def test_group_by_key_reiteration(self):
class MyDoFn(beam.DoFn):
@@ -533,14 +533,13 @@ class PTransformTest(unittest.TestCase):
def test_partition_followed_by_flatten_and_groupbykey(self):
"""Regression test for an issue with how partitions are handled."""
- pipeline = TestPipeline()
- contents = [('aa', 1), ('bb', 2), ('aa', 2)]
- created = pipeline | 'A' >> beam.Create(contents)
- partitioned = created | 'B' >> beam.Partition(lambda x, n: len(x) % n, 3)
- flattened = partitioned | 'C' >> beam.Flatten()
- grouped = flattened | 'D' >> beam.GroupByKey() | _SortLists
- assert_that(grouped, equal_to([('aa', [1, 2]), ('bb', [2])]))
- pipeline.run()
+ with TestPipeline() as pipeline:
+ contents = [('aa', 1), ('bb', 2), ('aa', 2)]
+ created = pipeline | 'A' >> beam.Create(contents)
+ partitioned = created | 'B' >> beam.Partition(lambda x, n: len(x) % n, 3)
+ flattened = partitioned | 'C' >> beam.Flatten()
+ grouped = flattened | 'D' >> beam.GroupByKey() | _SortLists
+ assert_that(grouped, equal_to([('aa', [1, 2]), ('bb', [2])]))
@attr('ValidatesRunner')
def test_flatten_pcollections(self):
@@ -619,50 +618,47 @@ class PTransformTest(unittest.TestCase):
assert_that(odd_length, equal_to(['BBB']), label='assert:odd')
def test_co_group_by_key_on_list(self):
- pipeline = TestPipeline()
- pcoll_1 = pipeline | 'Start 1' >> beam.Create([('a', 1), ('a', 2), ('b',
3),
- ('c', 4)])
- pcoll_2 = pipeline | 'Start 2' >> beam.Create([('a', 5), ('a', 6), ('c',
7),
- ('c', 8)])
- result = (pcoll_1, pcoll_2) | beam.CoGroupByKey() | _SortLists
- assert_that(
- result,
- equal_to([('a', ([1, 2], [5, 6])), ('b', ([3], [])),
- ('c', ([4], [7, 8]))]))
- pipeline.run()
+ with TestPipeline() as pipeline:
+ pcoll_1 = pipeline | 'Start 1' >> beam.Create([('a', 1), ('a', 2),
+ ('b', 3), ('c', 4)])
+ pcoll_2 = pipeline | 'Start 2' >> beam.Create([('a', 5), ('a', 6),
+ ('c', 7), ('c', 8)])
+ result = (pcoll_1, pcoll_2) | beam.CoGroupByKey() | _SortLists
+ assert_that(
+ result,
+ equal_to([('a', ([1, 2], [5, 6])), ('b', ([3], [])),
+ ('c', ([4], [7, 8]))]))
def test_co_group_by_key_on_iterable(self):
- pipeline = TestPipeline()
- pcoll_1 = pipeline | 'Start 1' >> beam.Create([('a', 1), ('a', 2), ('b',
3),
- ('c', 4)])
- pcoll_2 = pipeline | 'Start 2' >> beam.Create([('a', 5), ('a', 6), ('c',
7),
- ('c', 8)])
- result = [pc for pc in (pcoll_1, pcoll_2)] | beam.CoGroupByKey()
- result |= _SortLists
- assert_that(
- result,
- equal_to([('a', ([1, 2], [5, 6])), ('b', ([3], [])),
- ('c', ([4], [7, 8]))]))
- pipeline.run()
+ with TestPipeline() as pipeline:
+ pcoll_1 = pipeline | 'Start 1' >> beam.Create([('a', 1), ('a', 2),
+ ('b', 3), ('c', 4)])
+ pcoll_2 = pipeline | 'Start 2' >> beam.Create([('a', 5), ('a', 6),
+ ('c', 7), ('c', 8)])
+ result = [pc for pc in (pcoll_1, pcoll_2)] | beam.CoGroupByKey()
+ result |= _SortLists
+ assert_that(
+ result,
+ equal_to([('a', ([1, 2], [5, 6])), ('b', ([3], [])),
+ ('c', ([4], [7, 8]))]))
def test_co_group_by_key_on_dict(self):
- pipeline = TestPipeline()
- pcoll_1 = pipeline | 'Start 1' >> beam.Create([('a', 1), ('a', 2), ('b',
3),
- ('c', 4)])
- pcoll_2 = pipeline | 'Start 2' >> beam.Create([('a', 5), ('a', 6), ('c',
7),
- ('c', 8)])
- result = {'X': pcoll_1, 'Y': pcoll_2} | beam.CoGroupByKey()
- result |= _SortLists
- assert_that(
- result,
- equal_to([('a', {
- 'X': [1, 2], 'Y': [5, 6]
- }), ('b', {
- 'X': [3], 'Y': []
- }), ('c', {
- 'X': [4], 'Y': [7, 8]
- })]))
- pipeline.run()
+ with TestPipeline() as pipeline:
+ pcoll_1 = pipeline | 'Start 1' >> beam.Create([('a', 1), ('a', 2),
+ ('b', 3), ('c', 4)])
+ pcoll_2 = pipeline | 'Start 2' >> beam.Create([('a', 5), ('a', 6),
+ ('c', 7), ('c', 8)])
+ result = {'X': pcoll_1, 'Y': pcoll_2} | beam.CoGroupByKey()
+ result |= _SortLists
+ assert_that(
+ result,
+ equal_to([('a', {
+ 'X': [1, 2], 'Y': [5, 6]
+ }), ('b', {
+ 'X': [3], 'Y': []
+ }), ('c', {
+ 'X': [4], 'Y': [7, 8]
+ })]))
def test_group_by_key_input_must_be_kv_pairs(self):
with self.assertRaises(typehints.TypeCheckError) as e:
diff --git a/sdks/python/apache_beam/transforms/sideinputs_test.py
b/sdks/python/apache_beam/transforms/sideinputs_test.py
index 1bcf47a..c8d118e 100644
--- a/sdks/python/apache_beam/transforms/sideinputs_test.py
+++ b/sdks/python/apache_beam/transforms/sideinputs_test.py
@@ -348,60 +348,58 @@ class SideInputsTest(unittest.TestCase):
# This flag is only necessary when using the multi-output TestStream b/c
# it relies on using the PCollection output tags as the PCollection output
# ids.
- p = TestPipeline()
-
- test_stream = (
- p
- | 'Mixed TestStream' >> TestStream().advance_watermark_to(
- 3,
- tag='main').add_elements(['a1'], tag='main').advance_watermark_to(
- 8, tag='main').add_elements(['a2'], tag='main').add_elements(
- [window.TimestampedValue(('k', 100), 2)], tag='side').
- add_elements([window.TimestampedValue(
- ('k', 400), 7)], tag='side').advance_watermark_to_infinity(
- tag='main').advance_watermark_to_infinity(tag='side'))
-
- main_data = (
- test_stream['main']
- | 'Main windowInto' >> beam.WindowInto(
- window.FixedWindows(5),
- accumulation_mode=trigger.AccumulationMode.DISCARDING))
-
- side_data = (
- test_stream['side']
- | 'Side windowInto' >> beam.WindowInto(
- window.FixedWindows(5),
- trigger=trigger.AfterWatermark(early=trigger.AfterCount(1)),
- accumulation_mode=trigger.AccumulationMode.DISCARDING)
- | beam.CombinePerKey(sum)
- | 'Values' >> Map(lambda k_vs: k_vs[1]))
-
- class RecordFn(beam.DoFn):
- def process(
- self,
- elm=beam.DoFn.ElementParam,
- ts=beam.DoFn.TimestampParam,
- side=beam.DoFn.SideInputParam):
- yield (elm, ts, side)
-
- records = (
- main_data
- | beam.ParDo(RecordFn(), beam.pvalue.AsList(side_data)))
-
- expected_window_to_elements = {
- window.IntervalWindow(0, 5): [
- ('a1', Timestamp(3), [100, 0]),
- ],
- window.IntervalWindow(5, 10): [('a2', Timestamp(8), [400, 0])],
- }
-
- assert_that(
- records,
- equal_to_per_window(expected_window_to_elements),
- use_global_window=False,
- label='assert per window')
-
- p.run()
+ with TestPipeline() as p:
+
+ test_stream = (
+ p
+ | 'Mixed TestStream' >> TestStream().advance_watermark_to(
+ 3,
+ tag='main').add_elements(['a1'],
tag='main').advance_watermark_to(
+ 8, tag='main').add_elements(['a2'], tag='main').add_elements(
+ [window.TimestampedValue(('k', 100), 2)], tag='side').
+ add_elements([window.TimestampedValue(
+ ('k', 400), 7)], tag='side').advance_watermark_to_infinity(
+ tag='main').advance_watermark_to_infinity(tag='side'))
+
+ main_data = (
+ test_stream['main']
+ | 'Main windowInto' >> beam.WindowInto(
+ window.FixedWindows(5),
+ accumulation_mode=trigger.AccumulationMode.DISCARDING))
+
+ side_data = (
+ test_stream['side']
+ | 'Side windowInto' >> beam.WindowInto(
+ window.FixedWindows(5),
+ trigger=trigger.AfterWatermark(early=trigger.AfterCount(1)),
+ accumulation_mode=trigger.AccumulationMode.DISCARDING)
+ | beam.CombinePerKey(sum)
+ | 'Values' >> Map(lambda k_vs: k_vs[1]))
+
+ class RecordFn(beam.DoFn):
+ def process(
+ self,
+ elm=beam.DoFn.ElementParam,
+ ts=beam.DoFn.TimestampParam,
+ side=beam.DoFn.SideInputParam):
+ yield (elm, ts, side)
+
+ records = (
+ main_data
+ | beam.ParDo(RecordFn(), beam.pvalue.AsList(side_data)))
+
+ expected_window_to_elements = {
+ window.IntervalWindow(0, 5): [
+ ('a1', Timestamp(3), [100, 0]),
+ ],
+ window.IntervalWindow(5, 10): [('a2', Timestamp(8), [400, 0])],
+ }
+
+ assert_that(
+ records,
+ equal_to_per_window(expected_window_to_elements),
+ use_global_window=False,
+ label='assert per window')
if __name__ == '__main__':
diff --git a/sdks/python/apache_beam/transforms/util_test.py
b/sdks/python/apache_beam/transforms/util_test.py
index ef88073..f22ea2c 100644
--- a/sdks/python/apache_beam/transforms/util_test.py
+++ b/sdks/python/apache_beam/transforms/util_test.py
@@ -361,20 +361,19 @@ class ReshuffleTest(unittest.TestCase):
assert_that(result, equal_to(data))
def test_reshuffle_after_gbk_contents_unchanged(self):
- pipeline = TestPipeline()
- data = [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 3)]
- expected_result = [(1, [1, 2, 3]), (2, [1, 2]), (3, [1])]
-
- after_gbk = (
- pipeline
- | beam.Create(data)
- | beam.GroupByKey()
- | beam.MapTuple(lambda k, vs: (k, sorted(vs))))
- assert_that(after_gbk, equal_to(expected_result), label='after_gbk')
- after_reshuffle = after_gbk | beam.Reshuffle()
- assert_that(
- after_reshuffle, equal_to(expected_result), label='after_reshuffle')
- pipeline.run()
+ with TestPipeline() as pipeline:
+ data = [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 3)]
+ expected_result = [(1, [1, 2, 3]), (2, [1, 2]), (3, [1])]
+
+ after_gbk = (
+ pipeline
+ | beam.Create(data)
+ | beam.GroupByKey()
+ | beam.MapTuple(lambda k, vs: (k, sorted(vs))))
+ assert_that(after_gbk, equal_to(expected_result), label='after_gbk')
+ after_reshuffle = after_gbk | beam.Reshuffle()
+ assert_that(
+ after_reshuffle, equal_to(expected_result), label='after_reshuffle')
def test_reshuffle_timestamps_unchanged(self):
with TestPipeline() as pipeline:
@@ -482,60 +481,57 @@ class ReshuffleTest(unittest.TestCase):
reify_windows=True)
def test_reshuffle_global_window(self):
- pipeline = TestPipeline()
- data = [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 4)]
- expected_data = [(1, [1, 2, 4]), (2, [1, 2]), (3, [1])]
- before_reshuffle = (
- pipeline
- | beam.Create(data)
- | beam.WindowInto(GlobalWindows())
- | beam.GroupByKey()
- | beam.MapTuple(lambda k, vs: (k, sorted(vs))))
- assert_that(
- before_reshuffle, equal_to(expected_data), label='before_reshuffle')
- after_reshuffle = before_reshuffle | beam.Reshuffle()
- assert_that(
- after_reshuffle, equal_to(expected_data), label='after reshuffle')
- pipeline.run()
+ with TestPipeline() as pipeline:
+ data = [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 4)]
+ expected_data = [(1, [1, 2, 4]), (2, [1, 2]), (3, [1])]
+ before_reshuffle = (
+ pipeline
+ | beam.Create(data)
+ | beam.WindowInto(GlobalWindows())
+ | beam.GroupByKey()
+ | beam.MapTuple(lambda k, vs: (k, sorted(vs))))
+ assert_that(
+ before_reshuffle, equal_to(expected_data), label='before_reshuffle')
+ after_reshuffle = before_reshuffle | beam.Reshuffle()
+ assert_that(
+ after_reshuffle, equal_to(expected_data), label='after reshuffle')
def test_reshuffle_sliding_window(self):
- pipeline = TestPipeline()
- data = [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 4)]
- window_size = 2
- expected_data = [(1, [1, 2, 4]), (2, [1, 2]), (3, [1])] * window_size
- before_reshuffle = (
- pipeline
- | beam.Create(data)
- | beam.WindowInto(SlidingWindows(size=window_size, period=1))
- | beam.GroupByKey()
- | beam.MapTuple(lambda k, vs: (k, sorted(vs))))
- assert_that(
- before_reshuffle, equal_to(expected_data), label='before_reshuffle')
- after_reshuffle = before_reshuffle | beam.Reshuffle()
- # If Reshuffle applies the sliding window function a second time there
- # should be extra values for each key.
- assert_that(
- after_reshuffle, equal_to(expected_data), label='after reshuffle')
- pipeline.run()
+ with TestPipeline() as pipeline:
+ data = [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 4)]
+ window_size = 2
+ expected_data = [(1, [1, 2, 4]), (2, [1, 2]), (3, [1])] * window_size
+ before_reshuffle = (
+ pipeline
+ | beam.Create(data)
+ | beam.WindowInto(SlidingWindows(size=window_size, period=1))
+ | beam.GroupByKey()
+ | beam.MapTuple(lambda k, vs: (k, sorted(vs))))
+ assert_that(
+ before_reshuffle, equal_to(expected_data), label='before_reshuffle')
+ after_reshuffle = before_reshuffle | beam.Reshuffle()
+ # If Reshuffle applies the sliding window function a second time there
+ # should be extra values for each key.
+ assert_that(
+ after_reshuffle, equal_to(expected_data), label='after reshuffle')
def test_reshuffle_streaming_global_window(self):
options = PipelineOptions()
options.view_as(StandardOptions).streaming = True
- pipeline = TestPipeline(options=options)
- data = [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 4)]
- expected_data = [(1, [1, 2, 4]), (2, [1, 2]), (3, [1])]
- before_reshuffle = (
- pipeline
- | beam.Create(data)
- | beam.WindowInto(GlobalWindows())
- | beam.GroupByKey()
- | beam.MapTuple(lambda k, vs: (k, sorted(vs))))
- assert_that(
- before_reshuffle, equal_to(expected_data), label='before_reshuffle')
- after_reshuffle = before_reshuffle | beam.Reshuffle()
- assert_that(
- after_reshuffle, equal_to(expected_data), label='after reshuffle')
- pipeline.run()
+ with TestPipeline(options=options) as pipeline:
+ data = [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 4)]
+ expected_data = [(1, [1, 2, 4]), (2, [1, 2]), (3, [1])]
+ before_reshuffle = (
+ pipeline
+ | beam.Create(data)
+ | beam.WindowInto(GlobalWindows())
+ | beam.GroupByKey()
+ | beam.MapTuple(lambda k, vs: (k, sorted(vs))))
+ assert_that(
+ before_reshuffle, equal_to(expected_data), label='before_reshuffle')
+ after_reshuffle = before_reshuffle | beam.Reshuffle()
+ assert_that(
+ after_reshuffle, equal_to(expected_data), label='after reshuffle')
@attr('ValidatesRunner')
def test_reshuffle_preserves_timestamps(self):
diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py
b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
index 7cf26f5..6d72fa8 100644
--- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py
+++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
@@ -156,27 +156,26 @@ class MainInputTest(unittest.TestCase):
else:
yield beam.pvalue.TaggedOutput('even', element)
- p = TestPipeline()
- res = (
- p
- | beam.Create([1, 2, 3])
- | beam.ParDo(MyDoFn()).with_outputs('odd', 'even'))
- self.assertIsNotNone(res[None].element_type)
- self.assertIsNotNone(res['even'].element_type)
- self.assertIsNotNone(res['odd'].element_type)
- res_main = (
- res[None]
- | 'id_none' >> beam.ParDo(lambda e: [e]).with_input_types(int))
- res_even = (
- res['even']
- | 'id_even' >> beam.ParDo(lambda e: [e]).with_input_types(int))
- res_odd = (
- res['odd']
- | 'id_odd' >> beam.ParDo(lambda e: [e]).with_input_types(int))
- assert_that(res_main, equal_to([]), label='none_check')
- assert_that(res_even, equal_to([2]), label='even_check')
- assert_that(res_odd, equal_to([1, 3]), label='odd_check')
- p.run()
+ with TestPipeline() as p:
+ res = (
+ p
+ | beam.Create([1, 2, 3])
+ | beam.ParDo(MyDoFn()).with_outputs('odd', 'even'))
+ self.assertIsNotNone(res[None].element_type)
+ self.assertIsNotNone(res['even'].element_type)
+ self.assertIsNotNone(res['odd'].element_type)
+ res_main = (
+ res[None]
+ | 'id_none' >> beam.ParDo(lambda e: [e]).with_input_types(int))
+ res_even = (
+ res['even']
+ | 'id_even' >> beam.ParDo(lambda e: [e]).with_input_types(int))
+ res_odd = (
+ res['odd']
+ | 'id_odd' >> beam.ParDo(lambda e: [e]).with_input_types(int))
+ assert_that(res_main, equal_to([]), label='none_check')
+ assert_that(res_even, equal_to([2]), label='even_check')
+ assert_that(res_odd, equal_to([1, 3]), label='odd_check')
with self.assertRaises(ValueError):
_ = res['undeclared tag']
@@ -189,24 +188,23 @@ class MainInputTest(unittest.TestCase):
else:
yield beam.pvalue.TaggedOutput('even', element)
- p = TestPipeline()
- res = (p | beam.Create([1, 2, 3]) | beam.ParDo(MyDoFn()).with_outputs())
- self.assertIsNotNone(res[None].element_type)
- self.assertIsNotNone(res['even'].element_type)
- self.assertIsNotNone(res['odd'].element_type)
- res_main = (
- res[None]
- | 'id_none' >> beam.ParDo(lambda e: [e]).with_input_types(int))
- res_even = (
- res['even']
- | 'id_even' >> beam.ParDo(lambda e: [e]).with_input_types(int))
- res_odd = (
- res['odd']
- | 'id_odd' >> beam.ParDo(lambda e: [e]).with_input_types(int))
- assert_that(res_main, equal_to([]), label='none_check')
- assert_that(res_even, equal_to([2]), label='even_check')
- assert_that(res_odd, equal_to([1, 3]), label='odd_check')
- p.run()
+ with TestPipeline() as p:
+ res = (p | beam.Create([1, 2, 3]) | beam.ParDo(MyDoFn()).with_outputs())
+ self.assertIsNotNone(res[None].element_type)
+ self.assertIsNotNone(res['even'].element_type)
+ self.assertIsNotNone(res['odd'].element_type)
+ res_main = (
+ res[None]
+ | 'id_none' >> beam.ParDo(lambda e: [e]).with_input_types(int))
+ res_even = (
+ res['even']
+ | 'id_even' >> beam.ParDo(lambda e: [e]).with_input_types(int))
+ res_odd = (
+ res['odd']
+ | 'id_odd' >> beam.ParDo(lambda e: [e]).with_input_types(int))
+ assert_that(res_main, equal_to([]), label='none_check')
+ assert_that(res_even, equal_to([2]), label='even_check')
+ assert_that(res_odd, equal_to([1, 3]), label='odd_check')
class NativeTypesTest(unittest.TestCase):