[
https://issues.apache.org/jira/browse/BEAM-7759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ismaël Mejía updated BEAM-7759:
-------------------------------
Status: Open (was: Triage Needed)
> beam.Create([...]) does not create an properly shaped PCollection until
> pipeline executes
> -----------------------------------------------------------------------------------------
>
> Key: BEAM-7759
> URL: https://issues.apache.org/jira/browse/BEAM-7759
> Project: Beam
> Issue Type: Improvement
> Components: sdk-py-core
> Reporter: Pablo Estrada
> Priority: Major
>
> This issue is surfaced by the errors from
> [https://github.com/apache/beam/pull/9077]
>
> Specifically, in the following pipelines, note how Map(lambda x: x) is
> necessary before the actual transform that is being tested. This is due to
> the fact that the pcoll output by create is a little odd.
> {code}
> // code placeholder
> def test_timestamp(self):
> l = [TimestampedValue('a', 100),
> TimestampedValue('b', 200),
> TimestampedValue('c', 300)]
> expected = [TestWindowedValue('a', 100, [GlobalWindow()]),
> TestWindowedValue('b', 200, [GlobalWindow()]),
> TestWindowedValue('c', 300, [GlobalWindow()])]
> with TestPipeline() as p:
> # Map(lambda x: x) PTransform is added after Create here, because when
> # a PCollection of TimestampedValues is created with Create PTransform,
> # the timestamps are not assigned to it. Adding a Map forces the
> # PCollection to go through a DoFn so that the PCollection consists of
> # the elements with timestamps assigned to them instead of a PCollection
> # of TimestampedValue(element, timestamp).
> pc = p | beam.Create(l) | beam.Map(lambda x: x)
> reified_pc = pc | util.Reify.Timestamp()
> assert_that(reified_pc, equal_to(expected), reify_windows=True)
> def test_window(self):
> l = [GlobalWindows.windowed_value('a', 100),
> GlobalWindows.windowed_value('b', 200),
> GlobalWindows.windowed_value('c', 300)]
> expected = [TestWindowedValue(('a', 100, GlobalWindow()), 100,
> [GlobalWindow()]),
> TestWindowedValue(('b', 200, GlobalWindow()), 200,
> [GlobalWindow()]),
> TestWindowedValue(('c', 300, GlobalWindow()), 300,
> [GlobalWindow()])]
> with TestPipeline() as p:
> pc = p | beam.Create(l)
> # Map(lambda x: x) PTransform is added after Create here, because when
> # a PCollection of WindowedValues is created with Create PTransform,
> # the windows are not assigned to it. Adding a Map forces the
> # PCollection to go through a DoFn so that the PCollection consists of
> # the elements with timestamps assigned to them instead of a PCollection
> # of WindowedValue(element, timestamp, window).
> pc = pc | beam.Map(lambda x: x)
> reified_pc = pc | util.Reify.Window()
> assert_that(reified_pc, equal_to(expected), reify_windows=True)
> {code}
>
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)