[
https://issues.apache.org/jira/browse/BEAM-7122?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Pablo Estrada resolved BEAM-7122.
---------------------------------
Resolution: Fixed
Fix Version/s: Not applicable
> Accumulating triggers seem not to work on DirectRunner
> ------------------------------------------------------
>
> Key: BEAM-7122
> URL: https://issues.apache.org/jira/browse/BEAM-7122
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-core
> Reporter: Pablo Estrada
> Assignee: Pablo Estrada
> Priority: Major
> Fix For: Not applicable
>
> Time Spent: 10m
> Remaining Estimate: 0h
>
> Running this in DirectRunner.
> Test case:
> {code}
> def test_multiple_accumulating_firings(self):
> # PCollection will contain elements from 1 to 10.
> elements = [i for i in range(1, 11)]
> ts = TestStream().advance_watermark_to(0)
> for i in elements:
> ts.add_elements([str(i)])
> if i % 5 == 0:
> ts.advance_watermark_to(i)
> ts.advance_processing_time(5)
> with TestPipeline() as p:
> _ = (p
> | ts
> | beam.WindowInto(
> FixedWindows(10),
> accumulation_mode=trigger.AccumulationMode.ACCUMULATING,
> trigger=AfterWatermark(
> early=AfterAll(
> AfterCount(1), AfterProcessingTime(5))
> ))
> | beam.ParDo(self.record_dofn()))
> # The trigger should fire twice. Once after 5 seconds, and once after 10.
> # The firings should accumulate the output.
> first_firing = [str(i) for i in elements if i <= 5]
> second_firing = [str(i) for i in elements]
> # Assert that we have two firings.
> self.assertEqual(2, len(TriggerPipelineTest.all_firings))
> self.assertListEqual(first_firing + second_firing,
> TriggerPipelineTest.all_records)
> {code}
> Failure:
> {code:java}
> ======================================================================
> FAIL: test_multiple_accumulating_firings
> (apache_beam.transforms.trigger_test.TriggerPipelineTest)
> ----------------------------------------------------------------------
> Traceback (most recent call last):
> File "apache_beam/transforms/trigger_test.py", line 488, in
> test_multiple_accumulating_firings
> TriggerPipelineTest.all_records)
> AssertionError: Lists differ: ['1', '2', '3', '4', '5', '1',... != ['1', '2',
> '3', '4', '5', '6',...
> First differing element 5:
> '1'
> '6'
> First list contains 5 additional elements.
> First extra element 10:
> '6'
> - ['1', '2', '3', '4', '5', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10']
> ? -------------------------
> + ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10']
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)