damccorm opened a new issue, #19641:
URL: https://github.com/apache/beam/issues/19641

   This issue is surfaced by the errors from 
[https://github.com/apache/beam/pull/9077](https://github.com/apache/beam/pull/9077)
   
    
   
   Specifically, in the following pipelines, note how Map(lambda x: x) is 
necessary before the actual transform that is being tested. This is due to the 
fact that the pcoll output by create is a little odd.
   ```
   
   // code placeholder
     def test_timestamp(self):
       l = [TimestampedValue('a', 100),
            TimestampedValue('b',
   200),
            TimestampedValue('c', 300)]
       expected = [TestWindowedValue('a', 100, [GlobalWindow()]),
   
                  TestWindowedValue('b', 200, [GlobalWindow()]),
                   TestWindowedValue('c',
   300, [GlobalWindow()])]
       with TestPipeline() as p:
         # Map(lambda x: x) PTransform is added
   after Create here, because when
         # a PCollection of TimestampedValues is created with Create PTransform,
   
        # the timestamps are not assigned to it. Adding a Map forces the
         # PCollection to go through
   a DoFn so that the PCollection consists of
         # the elements with timestamps assigned to them instead
   of a PCollection
         # of TimestampedValue(element, timestamp).
         pc = p | beam.Create(l) |
   beam.Map(lambda x: x)
         reified_pc = pc | util.Reify.Timestamp()
         assert_that(reified_pc,
   equal_to(expected), reify_windows=True)
   
     def test_window(self):
       l = [GlobalWindows.windowed_value('a',
   100),
            GlobalWindows.windowed_value('b', 200),
            GlobalWindows.windowed_value('c',
   300)]
       expected = [TestWindowedValue(('a', 100, GlobalWindow()), 100,
                           
            [GlobalWindow()]),
                   TestWindowedValue(('b', 200, GlobalWindow()), 200,
     
                                  [GlobalWindow()]),
                   TestWindowedValue(('c', 300, GlobalWindow()),
   300,
                                     [GlobalWindow()])]
       with TestPipeline() as p:
         pc
   = p | beam.Create(l)
         # Map(lambda x: x) PTransform is added after Create here, because when
   
        # a PCollection of WindowedValues is created with Create PTransform,
         # the windows are not
   assigned to it. Adding a Map forces the
         # PCollection to go through a DoFn so that the PCollection
   consists of
         # the elements with timestamps assigned to them instead of a 
PCollection
         #
   of WindowedValue(element, timestamp, window).
         pc = pc | beam.Map(lambda x: x)
         reified_pc
   = pc | util.Reify.Window()
         assert_that(reified_pc, equal_to(expected), reify_windows=True)
   
   ```
   
    
   
   Imported from Jira 
[BEAM-7759](https://issues.apache.org/jira/browse/BEAM-7759). Original Jira may 
contain additional context.
   Reported by: pabloem.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to