Pablo Estrada created BEAM-7759:
-----------------------------------
Summary: beam.Create([...]) does not create an properly shaped
PCollection until pipeline executes
Key: BEAM-7759
URL: https://issues.apache.org/jira/browse/BEAM-7759
Project: Beam
Issue Type: Improvement
Components: sdk-py-core
Reporter: Pablo Estrada
This issue is surfaced by the errors from
[https://github.com/apache/beam/pull/9077]
Specifically, in the following pipelines, note how Map(lambda x: x) is
necessary before the actual transform that is being tested. This is due to the
fact that the pcoll output by create is a little odd.
{code}
// code placeholder
def test_timestamp(self):
l = [TimestampedValue('a', 100),
TimestampedValue('b', 200),
TimestampedValue('c', 300)]
expected = [TestWindowedValue('a', 100, [GlobalWindow()]),
TestWindowedValue('b', 200, [GlobalWindow()]),
TestWindowedValue('c', 300, [GlobalWindow()])]
with TestPipeline() as p:
# Map(lambda x: x) PTransform is added after Create here, because when
# a PCollection of TimestampedValues is created with Create PTransform,
# the timestamps are not assigned to it. Adding a Map forces the
# PCollection to go through a DoFn so that the PCollection consists of
# the elements with timestamps assigned to them instead of a PCollection
# of TimestampedValue(element, timestamp).
pc = p | beam.Create(l) | beam.Map(lambda x: x)
reified_pc = pc | util.Reify.Timestamp()
assert_that(reified_pc, equal_to(expected), reify_windows=True)
def test_window(self):
l = [GlobalWindows.windowed_value('a', 100),
GlobalWindows.windowed_value('b', 200),
GlobalWindows.windowed_value('c', 300)]
expected = [TestWindowedValue(('a', 100, GlobalWindow()), 100,
[GlobalWindow()]),
TestWindowedValue(('b', 200, GlobalWindow()), 200,
[GlobalWindow()]),
TestWindowedValue(('c', 300, GlobalWindow()), 300,
[GlobalWindow()])]
with TestPipeline() as p:
pc = p | beam.Create(l)
# Map(lambda x: x) PTransform is added after Create here, because when
# a PCollection of WindowedValues is created with Create PTransform,
# the windows are not assigned to it. Adding a Map forces the
# PCollection to go through a DoFn so that the PCollection consists of
# the elements with timestamps assigned to them instead of a PCollection
# of WindowedValue(element, timestamp, window).
pc = pc | beam.Map(lambda x: x)
reified_pc = pc | util.Reify.Window()
assert_that(reified_pc, equal_to(expected), reify_windows=True)
{code}
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)