Hi everyone!
I noticed that the behavior of AfterCount() trigger seems to be different
between python sdk and the java one, so I created a few tests to show the
difference, but in general I think the python sdk will buffer on result
instead of input elements.

What do you guys think?

and here are the tests. I ran them in batch mode.

Sincerely,
Leiyi

Attachment: TriggerAfterCount.java
Description: application/ms-java

Attachment: GroupByKeyTest.java
Description: application/ms-java

import apache_beam as beam
from apache_beam.transforms import window, trigger
from apache_beam.utils.timestamp import Timestamp


p = beam.Pipeline()

(  # pylint: disable=expression-not-assigned
    p
    | beam.Create([
        window.TimestampedValue((1, 1), Timestamp(seconds=1596216396)),
        window.TimestampedValue((1, 2), Timestamp(seconds=1596216397)),
        window.TimestampedValue((1, 3), Timestamp(seconds=1596216398)),
        window.TimestampedValue((2, 4), Timestamp(seconds=1596216399)),
        window.TimestampedValue((3, 5), Timestamp(seconds=1596216400)),
        window.TimestampedValue((2, 6), Timestamp(seconds=1596216402)),
        window.TimestampedValue((3, 7), Timestamp(seconds=1596216403)),
        window.TimestampedValue((4, 8), Timestamp(seconds=1596216405)),
        window.TimestampedValue((4, 9), Timestamp(seconds=1596216406))
])
    # | 'event_monitor' >> beam.ParDo(UserMetricsDoFn('event'))
    | beam.WindowInto(window.GlobalWindows(),
                      trigger=trigger.Repeatedly(trigger.AfterCount(3)),
                      accumulation_mode=trigger.AccumulationMode.DISCARDING)
    | 'map_for_type_align' >> beam.Map(lambda i: (i[0], i[1]))
    | beam.GroupByKey()
    | beam.Map(repr)
    # | 'result_monitor' >> beam.ParDo(UserMetricsDoFn('result'))
    | beam.io.WriteToText(
        "py-test-result", file_name_suffix='.txt', num_shards=1))

result = p.run()

import apache_beam as beam
from apache_beam.transforms import window, trigger
from apache_beam.utils.timestamp import Timestamp


p = beam.Pipeline()

(  # pylint: disable=expression-not-assigned
    p
    | beam.Create([
        window.TimestampedValue(1, Timestamp(seconds=1596216396)),
        window.TimestampedValue(2, Timestamp(seconds=1596216397)),
        window.TimestampedValue(3, Timestamp(seconds=1596216398)),
        window.TimestampedValue(4, Timestamp(seconds=1596216399)),
        window.TimestampedValue(5, Timestamp(seconds=1596216400)),
        window.TimestampedValue(6, Timestamp(seconds=1596216402)),
        window.TimestampedValue(7, Timestamp(seconds=1596216403)),
        window.TimestampedValue(8, Timestamp(seconds=1596216405))
    ])
    # | 'event_monitor' >> beam.ParDo(UserMetricsDoFn('event'))
    | beam.WindowInto(window.GlobalWindows(),
                      trigger=trigger.Repeatedly(trigger.AfterCount(2)),
                      accumulation_mode=trigger.AccumulationMode.DISCARDING)
    | beam.combiners.Count.Globally()  # .with_fanout(5)
    | beam.Map(repr)
    # | 'result_monitor' >> beam.ParDo(UserMetricsDoFn('result'))
    | beam.io.WriteToText(
        "py-test-result", file_name_suffix='.txt', num_shards=1))

result = p.run()

Reply via email to