Pablo Estrada created BEAM-7122:
-----------------------------------
Summary: Accumulating triggers seem not to work on DirectRunner
Key: BEAM-7122
URL: https://issues.apache.org/jira/browse/BEAM-7122
Project: Beam
Issue Type: Bug
Components: sdk-py-core
Reporter: Pablo Estrada
Assignee: Pablo Estrada
Running this in DirectRunner.
Test case:
{code}
def test_multiple_accumulating_firings(self):
# PCollection will contain elements from 1 to 10.
elements = [i for i in range(1, 11)]
ts = TestStream().advance_watermark_to(0)
for i in elements:
ts.add_elements([str(i)])
if i % 5 == 0:
ts.advance_watermark_to(i)
ts.advance_processing_time(5)
with TestPipeline() as p:
_ = (p
| ts
| beam.WindowInto(
FixedWindows(10),
accumulation_mode=trigger.AccumulationMode.ACCUMULATING,
trigger=AfterWatermark(
early=AfterAll(
AfterCount(1), AfterProcessingTime(5))
))
| beam.ParDo(self.record_dofn()))
# The trigger should fire twice. Once after 5 seconds, and once after 10.
# The firings should accumulate the output.
first_firing = [str(i) for i in elements if i <= 5]
second_firing = [str(i) for i in elements]
# Assert that we have two firings.
self.assertEqual(2, len(TriggerPipelineTest.all_firings))
self.assertListEqual(first_firing + second_firing,
TriggerPipelineTest.all_records)
{code}
Failure:
{code:java}
======================================================================
FAIL: test_multiple_accumulating_firings
(apache_beam.transforms.trigger_test.TriggerPipelineTest)
----------------------------------------------------------------------
Traceback (most recent call last):
File "apache_beam/transforms/trigger_test.py", line 488, in
test_multiple_accumulating_firings
TriggerPipelineTest.all_records)
AssertionError: Lists differ: ['1', '2', '3', '4', '5', '1',... != ['1', '2',
'3', '4', '5', '6',...
First differing element 5:
'1'
'6'
First list contains 5 additional elements.
First extra element 10:
'6'
- ['1', '2', '3', '4', '5', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10']
? -------------------------
+ ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10']
{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)