Hi everyone! I noticed that the behavior of AfterCount() trigger seems to be different between python sdk and the java one, so I created a few tests to show the difference, but in general I think the python sdk will buffer on result instead of input elements.
What do you guys think? and here are the tests. I ran them in batch mode. Sincerely, Leiyi
TriggerAfterCount.java
Description: application/ms-java
GroupByKeyTest.java
Description: application/ms-java
import apache_beam as beam
from apache_beam.transforms import window, trigger
from apache_beam.utils.timestamp import Timestamp
p = beam.Pipeline()
( # pylint: disable=expression-not-assigned
p
| beam.Create([
window.TimestampedValue((1, 1), Timestamp(seconds=1596216396)),
window.TimestampedValue((1, 2), Timestamp(seconds=1596216397)),
window.TimestampedValue((1, 3), Timestamp(seconds=1596216398)),
window.TimestampedValue((2, 4), Timestamp(seconds=1596216399)),
window.TimestampedValue((3, 5), Timestamp(seconds=1596216400)),
window.TimestampedValue((2, 6), Timestamp(seconds=1596216402)),
window.TimestampedValue((3, 7), Timestamp(seconds=1596216403)),
window.TimestampedValue((4, 8), Timestamp(seconds=1596216405)),
window.TimestampedValue((4, 9), Timestamp(seconds=1596216406))
])
# | 'event_monitor' >> beam.ParDo(UserMetricsDoFn('event'))
| beam.WindowInto(window.GlobalWindows(),
trigger=trigger.Repeatedly(trigger.AfterCount(3)),
accumulation_mode=trigger.AccumulationMode.DISCARDING)
| 'map_for_type_align' >> beam.Map(lambda i: (i[0], i[1]))
| beam.GroupByKey()
| beam.Map(repr)
# | 'result_monitor' >> beam.ParDo(UserMetricsDoFn('result'))
| beam.io.WriteToText(
"py-test-result", file_name_suffix='.txt', num_shards=1))
result = p.run()
import apache_beam as beam
from apache_beam.transforms import window, trigger
from apache_beam.utils.timestamp import Timestamp
p = beam.Pipeline()
( # pylint: disable=expression-not-assigned
p
| beam.Create([
window.TimestampedValue(1, Timestamp(seconds=1596216396)),
window.TimestampedValue(2, Timestamp(seconds=1596216397)),
window.TimestampedValue(3, Timestamp(seconds=1596216398)),
window.TimestampedValue(4, Timestamp(seconds=1596216399)),
window.TimestampedValue(5, Timestamp(seconds=1596216400)),
window.TimestampedValue(6, Timestamp(seconds=1596216402)),
window.TimestampedValue(7, Timestamp(seconds=1596216403)),
window.TimestampedValue(8, Timestamp(seconds=1596216405))
])
# | 'event_monitor' >> beam.ParDo(UserMetricsDoFn('event'))
| beam.WindowInto(window.GlobalWindows(),
trigger=trigger.Repeatedly(trigger.AfterCount(2)),
accumulation_mode=trigger.AccumulationMode.DISCARDING)
| beam.combiners.Count.Globally() # .with_fanout(5)
| beam.Map(repr)
# | 'result_monitor' >> beam.ParDo(UserMetricsDoFn('result'))
| beam.io.WriteToText(
"py-test-result", file_name_suffix='.txt', num_shards=1))
result = p.run()
