damccorm opened a new issue, #19641: URL: https://github.com/apache/beam/issues/19641
This issue is surfaced by the errors from [https://github.com/apache/beam/pull/9077](https://github.com/apache/beam/pull/9077) Specifically, in the following pipelines, note how Map(lambda x: x) is necessary before the actual transform that is being tested. This is due to the fact that the pcoll output by create is a little odd. ``` // code placeholder def test_timestamp(self): l = [TimestampedValue('a', 100), TimestampedValue('b', 200), TimestampedValue('c', 300)] expected = [TestWindowedValue('a', 100, [GlobalWindow()]), TestWindowedValue('b', 200, [GlobalWindow()]), TestWindowedValue('c', 300, [GlobalWindow()])] with TestPipeline() as p: # Map(lambda x: x) PTransform is added after Create here, because when # a PCollection of TimestampedValues is created with Create PTransform, # the timestamps are not assigned to it. Adding a Map forces the # PCollection to go through a DoFn so that the PCollection consists of # the elements with timestamps assigned to them instead of a PCollection # of TimestampedValue(element, timestamp). pc = p | beam.Create(l) | beam.Map(lambda x: x) reified_pc = pc | util.Reify.Timestamp() assert_that(reified_pc, equal_to(expected), reify_windows=True) def test_window(self): l = [GlobalWindows.windowed_value('a', 100), GlobalWindows.windowed_value('b', 200), GlobalWindows.windowed_value('c', 300)] expected = [TestWindowedValue(('a', 100, GlobalWindow()), 100, [GlobalWindow()]), TestWindowedValue(('b', 200, GlobalWindow()), 200, [GlobalWindow()]), TestWindowedValue(('c', 300, GlobalWindow()), 300, [GlobalWindow()])] with TestPipeline() as p: pc = p | beam.Create(l) # Map(lambda x: x) PTransform is added after Create here, because when # a PCollection of WindowedValues is created with Create PTransform, # the windows are not assigned to it. Adding a Map forces the # PCollection to go through a DoFn so that the PCollection consists of # the elements with timestamps assigned to them instead of a PCollection # of WindowedValue(element, timestamp, window). pc = pc | beam.Map(lambda x: x) reified_pc = pc | util.Reify.Window() assert_that(reified_pc, equal_to(expected), reify_windows=True) ``` Imported from Jira [BEAM-7759](https://issues.apache.org/jira/browse/BEAM-7759). Original Jira may contain additional context. Reported by: pabloem. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
