Pablo Estrada created BEAM-7122:
-----------------------------------

             Summary: Accumulating triggers seem not to work on DirectRunner
                 Key: BEAM-7122
                 URL: https://issues.apache.org/jira/browse/BEAM-7122
             Project: Beam
          Issue Type: Bug
          Components: sdk-py-core
            Reporter: Pablo Estrada
            Assignee: Pablo Estrada


Running this in DirectRunner.

Test case:
{code}
  def test_multiple_accumulating_firings(self):
    # PCollection will contain elements from 1 to 10.
    elements = [i for i in range(1, 11)]

    ts = TestStream().advance_watermark_to(0)
    for i in elements:
      ts.add_elements([str(i)])
      if i % 5 == 0:
        ts.advance_watermark_to(i)
        ts.advance_processing_time(5)

    with TestPipeline() as p:
      _ = (p
           | ts
           | beam.WindowInto(
               FixedWindows(10),
               accumulation_mode=trigger.AccumulationMode.ACCUMULATING,
               trigger=AfterWatermark(
                   early=AfterAll(
                       AfterCount(1), AfterProcessingTime(5))
               ))
           | beam.ParDo(self.record_dofn()))


    # The trigger should fire twice. Once after 5 seconds, and once after 10.
    # The firings should accumulate the output.
    first_firing = [str(i) for i in elements if i <= 5]
    second_firing = [str(i) for i in elements]

    # Assert that we have two firings.
    self.assertEqual(2, len(TriggerPipelineTest.all_firings))
    self.assertListEqual(first_firing + second_firing,
                         TriggerPipelineTest.all_records)
{code}
Failure:
{code:java}
======================================================================
FAIL: test_multiple_accumulating_firings 
(apache_beam.transforms.trigger_test.TriggerPipelineTest)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "apache_beam/transforms/trigger_test.py", line 488, in 
test_multiple_accumulating_firings
    TriggerPipelineTest.all_records)
AssertionError: Lists differ: ['1', '2', '3', '4', '5', '1',... != ['1', '2', 
'3', '4', '5', '6',...

First differing element 5:
'1'
'6'

First list contains 5 additional elements.
First extra element 10:
'6'

- ['1', '2', '3', '4', '5', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10']
?                           -------------------------

+ ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10']
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to