Pablo Estrada created BEAM-7759:
-----------------------------------

             Summary: beam.Create([...]) does not create an properly shaped 
PCollection until pipeline executes
                 Key: BEAM-7759
                 URL: https://issues.apache.org/jira/browse/BEAM-7759
             Project: Beam
          Issue Type: Improvement
          Components: sdk-py-core
            Reporter: Pablo Estrada


This issue is surfaced by the errors from 
[https://github.com/apache/beam/pull/9077]

 

Specifically, in the following pipelines, note how Map(lambda x: x) is 
necessary before the actual transform that is being tested. This is due to the 
fact that the pcoll output by create is a little odd.
{code}
// code placeholder
  def test_timestamp(self):
    l = [TimestampedValue('a', 100),
         TimestampedValue('b', 200),
         TimestampedValue('c', 300)]
    expected = [TestWindowedValue('a', 100, [GlobalWindow()]),
                TestWindowedValue('b', 200, [GlobalWindow()]),
                TestWindowedValue('c', 300, [GlobalWindow()])]
    with TestPipeline() as p:
      # Map(lambda x: x) PTransform is added after Create here, because when
      # a PCollection of TimestampedValues is created with Create PTransform,
      # the timestamps are not assigned to it. Adding a Map forces the
      # PCollection to go through a DoFn so that the PCollection consists of
      # the elements with timestamps assigned to them instead of a PCollection
      # of TimestampedValue(element, timestamp).
      pc = p | beam.Create(l) | beam.Map(lambda x: x)
      reified_pc = pc | util.Reify.Timestamp()
      assert_that(reified_pc, equal_to(expected), reify_windows=True)

  def test_window(self):
    l = [GlobalWindows.windowed_value('a', 100),
         GlobalWindows.windowed_value('b', 200),
         GlobalWindows.windowed_value('c', 300)]
    expected = [TestWindowedValue(('a', 100, GlobalWindow()), 100,
                                  [GlobalWindow()]),
                TestWindowedValue(('b', 200, GlobalWindow()), 200,
                                  [GlobalWindow()]),
                TestWindowedValue(('c', 300, GlobalWindow()), 300,
                                  [GlobalWindow()])]
    with TestPipeline() as p:
      pc = p | beam.Create(l)
      # Map(lambda x: x) PTransform is added after Create here, because when
      # a PCollection of WindowedValues is created with Create PTransform,
      # the windows are not assigned to it. Adding a Map forces the
      # PCollection to go through a DoFn so that the PCollection consists of
      # the elements with timestamps assigned to them instead of a PCollection
      # of WindowedValue(element, timestamp, window).
      pc = pc | beam.Map(lambda x: x)
      reified_pc = pc | util.Reify.Window()
      assert_that(reified_pc, equal_to(expected), reify_windows=True)
{code}
 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to