[ https://issues.apache.org/jira/browse/BEAM-7759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17547985#comment-17547985 ]
Danny McCormick commented on BEAM-7759: --------------------------------------- This issue has been migrated to https://github.com/apache/beam/issues/19641 > beam.Create([...]) does not create an properly shaped PCollection until > pipeline executes > ----------------------------------------------------------------------------------------- > > Key: BEAM-7759 > URL: https://issues.apache.org/jira/browse/BEAM-7759 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core > Reporter: Pablo Estrada > Priority: P3 > > This issue is surfaced by the errors from > [https://github.com/apache/beam/pull/9077] > > Specifically, in the following pipelines, note how Map(lambda x: x) is > necessary before the actual transform that is being tested. This is due to > the fact that the pcoll output by create is a little odd. > {code} > // code placeholder > def test_timestamp(self): > l = [TimestampedValue('a', 100), > TimestampedValue('b', 200), > TimestampedValue('c', 300)] > expected = [TestWindowedValue('a', 100, [GlobalWindow()]), > TestWindowedValue('b', 200, [GlobalWindow()]), > TestWindowedValue('c', 300, [GlobalWindow()])] > with TestPipeline() as p: > # Map(lambda x: x) PTransform is added after Create here, because when > # a PCollection of TimestampedValues is created with Create PTransform, > # the timestamps are not assigned to it. Adding a Map forces the > # PCollection to go through a DoFn so that the PCollection consists of > # the elements with timestamps assigned to them instead of a PCollection > # of TimestampedValue(element, timestamp). > pc = p | beam.Create(l) | beam.Map(lambda x: x) > reified_pc = pc | util.Reify.Timestamp() > assert_that(reified_pc, equal_to(expected), reify_windows=True) > def test_window(self): > l = [GlobalWindows.windowed_value('a', 100), > GlobalWindows.windowed_value('b', 200), > GlobalWindows.windowed_value('c', 300)] > expected = [TestWindowedValue(('a', 100, GlobalWindow()), 100, > [GlobalWindow()]), > TestWindowedValue(('b', 200, GlobalWindow()), 200, > [GlobalWindow()]), > TestWindowedValue(('c', 300, GlobalWindow()), 300, > [GlobalWindow()])] > with TestPipeline() as p: > pc = p | beam.Create(l) > # Map(lambda x: x) PTransform is added after Create here, because when > # a PCollection of WindowedValues is created with Create PTransform, > # the windows are not assigned to it. Adding a Map forces the > # PCollection to go through a DoFn so that the PCollection consists of > # the elements with timestamps assigned to them instead of a PCollection > # of WindowedValue(element, timestamp, window). > pc = pc | beam.Map(lambda x: x) > reified_pc = pc | util.Reify.Window() > assert_that(reified_pc, equal_to(expected), reify_windows=True) > {code} > -- This message was sent by Atlassian Jira (v8.20.7#820007)