[ 
https://issues.apache.org/jira/browse/BEAM-7759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17547985#comment-17547985
 ] 

Danny McCormick commented on BEAM-7759:
---------------------------------------

This issue has been migrated to https://github.com/apache/beam/issues/19641

> beam.Create([...]) does not create an properly shaped PCollection until 
> pipeline executes
> -----------------------------------------------------------------------------------------
>
>                 Key: BEAM-7759
>                 URL: https://issues.apache.org/jira/browse/BEAM-7759
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-py-core
>            Reporter: Pablo Estrada
>            Priority: P3
>
> This issue is surfaced by the errors from 
> [https://github.com/apache/beam/pull/9077]
>  
> Specifically, in the following pipelines, note how Map(lambda x: x) is 
> necessary before the actual transform that is being tested. This is due to 
> the fact that the pcoll output by create is a little odd.
> {code}
> // code placeholder
>   def test_timestamp(self):
>     l = [TimestampedValue('a', 100),
>          TimestampedValue('b', 200),
>          TimestampedValue('c', 300)]
>     expected = [TestWindowedValue('a', 100, [GlobalWindow()]),
>                 TestWindowedValue('b', 200, [GlobalWindow()]),
>                 TestWindowedValue('c', 300, [GlobalWindow()])]
>     with TestPipeline() as p:
>       # Map(lambda x: x) PTransform is added after Create here, because when
>       # a PCollection of TimestampedValues is created with Create PTransform,
>       # the timestamps are not assigned to it. Adding a Map forces the
>       # PCollection to go through a DoFn so that the PCollection consists of
>       # the elements with timestamps assigned to them instead of a PCollection
>       # of TimestampedValue(element, timestamp).
>       pc = p | beam.Create(l) | beam.Map(lambda x: x)
>       reified_pc = pc | util.Reify.Timestamp()
>       assert_that(reified_pc, equal_to(expected), reify_windows=True)
>   def test_window(self):
>     l = [GlobalWindows.windowed_value('a', 100),
>          GlobalWindows.windowed_value('b', 200),
>          GlobalWindows.windowed_value('c', 300)]
>     expected = [TestWindowedValue(('a', 100, GlobalWindow()), 100,
>                                   [GlobalWindow()]),
>                 TestWindowedValue(('b', 200, GlobalWindow()), 200,
>                                   [GlobalWindow()]),
>                 TestWindowedValue(('c', 300, GlobalWindow()), 300,
>                                   [GlobalWindow()])]
>     with TestPipeline() as p:
>       pc = p | beam.Create(l)
>       # Map(lambda x: x) PTransform is added after Create here, because when
>       # a PCollection of WindowedValues is created with Create PTransform,
>       # the windows are not assigned to it. Adding a Map forces the
>       # PCollection to go through a DoFn so that the PCollection consists of
>       # the elements with timestamps assigned to them instead of a PCollection
>       # of WindowedValue(element, timestamp, window).
>       pc = pc | beam.Map(lambda x: x)
>       reified_pc = pc | util.Reify.Window()
>       assert_that(reified_pc, equal_to(expected), reify_windows=True)
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to