Robert Bradshaw created BEAM-1450:
-------------------------------------
Summary: NewDoFn does not properly explode windows
Key: BEAM-1450
URL: https://issues.apache.org/jira/browse/BEAM-1450
Project: Beam
Issue Type: Bug
Components: sdk-py
Reporter: Robert Bradshaw
Assignee: Robert Bradshaw
A window-taking DoFn's process gets called once per window, but each output
gets placed into all windows.
E.g. the following test fails
def test_window_param(self):
class TestDoFn(DoFn):
def process(self, element, window=DoFn.WindowParam):
yield (element, (float(window.start), float(window.end)))
pipeline = TestPipeline()
pcoll = (pipeline
| Create([1, 7])
| Map(lambda x: TimestampedValue(x, x))
| WindowInto(windowfn=SlidingWindows(10, 5))
| ParDo(TestDoFn()))
assert_that(pcoll, equal_to([(1, (-5, 5)), (1, (0, 10)),
(7, (0, 10)), (7, (5, 15))]))
pcoll2 = pcoll | 'Again' >> ParDo(TestDoFn())
assert_that(
pcoll2,
equal_to([
((1, (-5, 5)), (-5, 5)), ((1, (0, 10)), (0, 10)),
((7, (0, 10)), (0, 10)), ((7, (5, 15)), (5, 15))]),
label='doubled windows')
pipeline.run()
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)