Flo Vouin created BEAM-12816:
--------------------------------
Summary: Inconsistent behaviour of triggers and/or that requires
clarification
Key: BEAM-12816
URL: https://issues.apache.org/jira/browse/BEAM-12816
Project: Beam
Issue Type: Bug
Components: runner-py-direct
Affects Versions: 2.31.0
Environment: macOS 11.5.1, Python 3.7.10
Reporter: Flo Vouin
Hi,
I've been using the Direct Runner in Python recently, as part of tests for a
job aimed to be run on Dataflow. It's been a bit hard to know exactly what
triggers do, but resorting to simple test cases, it seems like I've noticed
some inconsistencies. I haven't checked whether those appear when running in
Dataflow, though.
*1. AfterAny(AfterProcessingTime(XX)) acts as
Repeatedly(AfterProcessingTime(XX))*
I would have expected {{AfterAny(SomeTrigger) <=> SomeTrigger}}, i.e.
{{AfterAny}} acting as the identity when provided a single trigger. However
this is not the case for {{AfterProcessingTime}}, because its {{on_fire}}
result is not forwarded by {{AfterAny}}, due to the [call to should_fire not
passing the right time
domain|https://github.com/apache/beam/blob/cbb363f2f01d44dd3f7c063c6cd9d529b5fa9104/sdks/python/apache_beam/transforms/trigger.py#L790].
This means that {{on_fire}} always returns {{False}}, hence {{AfterAny}}
acting as {{Repeatedly}} in this case.
Was this the purpose of [saving the time domain from the should_fire
call|https://github.com/apache/beam/blob/cbb363f2f01d44dd3f7c063c6cd9d529b5fa9104/sdks/python/apache_beam/transforms/trigger.py#L779],
which does not seem to be used?
*2. AfterAny(AfterCount, AfterProcessingTime) triggers both children when
AfterCount triggers first*
This is less of a problem, but {{AfterProcessingTime}} will still trigger after
{{AfterCount}} has triggered, when combined into an {{AfterAny}}. If no element
has been added to the window, this means an empty pane will be emitted (or
identical to the previous one, depending on the accumulation mode). My guess is
that this happens because [AfterProcessingTime does not implement
reset()|https://github.com/apache/beam/blob/cbb363f2f01d44dd3f7c063c6cd9d529b5fa9104/sdks/python/apache_beam/transforms/trigger.py#L390],
but I guess there's a reason for this?
*3. Unsafe trigger warning*
It is unclear to me how the {{DataLossReason}}s are combined. For example:
{{Repeatedly(AfterAny(AfterCount, AfterProcessingTime))}}
is detected as an unsafe trigger, but
{{Repeatedly(AfterAny(AfterCount, Repeatedly(AfterProcessingTime)))}}
isn't, although if I'm not mistaken they should basically provide the same
behaviour.
Happy to provide more details if needed, and sorry if the issue doesn't quite
fit the template you're expecting.
Thanks,
Flo
--
This message was sent by Atlassian Jira
(v8.3.4#803005)