[ 
https://issues.apache.org/jira/browse/BEAM-7122?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pablo Estrada resolved BEAM-7122.
---------------------------------
       Resolution: Fixed
    Fix Version/s: Not applicable

> Accumulating triggers seem not to work on DirectRunner
> ------------------------------------------------------
>
>                 Key: BEAM-7122
>                 URL: https://issues.apache.org/jira/browse/BEAM-7122
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Pablo Estrada
>            Assignee: Pablo Estrada
>            Priority: Major
>             Fix For: Not applicable
>
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> Running this in DirectRunner.
> Test case:
> {code}
>   def test_multiple_accumulating_firings(self):
>     # PCollection will contain elements from 1 to 10.
>     elements = [i for i in range(1, 11)]
>     ts = TestStream().advance_watermark_to(0)
>     for i in elements:
>       ts.add_elements([str(i)])
>       if i % 5 == 0:
>         ts.advance_watermark_to(i)
>         ts.advance_processing_time(5)
>     with TestPipeline() as p:
>       _ = (p
>            | ts
>            | beam.WindowInto(
>                FixedWindows(10),
>                accumulation_mode=trigger.AccumulationMode.ACCUMULATING,
>                trigger=AfterWatermark(
>                    early=AfterAll(
>                        AfterCount(1), AfterProcessingTime(5))
>                ))
>            | beam.ParDo(self.record_dofn()))
>     # The trigger should fire twice. Once after 5 seconds, and once after 10.
>     # The firings should accumulate the output.
>     first_firing = [str(i) for i in elements if i <= 5]
>     second_firing = [str(i) for i in elements]
>     # Assert that we have two firings.
>     self.assertEqual(2, len(TriggerPipelineTest.all_firings))
>     self.assertListEqual(first_firing + second_firing,
>                          TriggerPipelineTest.all_records)
> {code}
> Failure:
> {code:java}
> ======================================================================
> FAIL: test_multiple_accumulating_firings 
> (apache_beam.transforms.trigger_test.TriggerPipelineTest)
> ----------------------------------------------------------------------
> Traceback (most recent call last):
>   File "apache_beam/transforms/trigger_test.py", line 488, in 
> test_multiple_accumulating_firings
>     TriggerPipelineTest.all_records)
> AssertionError: Lists differ: ['1', '2', '3', '4', '5', '1',... != ['1', '2', 
> '3', '4', '5', '6',...
> First differing element 5:
> '1'
> '6'
> First list contains 5 additional elements.
> First extra element 10:
> '6'
> - ['1', '2', '3', '4', '5', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10']
> ?                           -------------------------
> + ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10']
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to