Flo Vouin created BEAM-12816:
--------------------------------

             Summary: Inconsistent behaviour of triggers and/or that requires 
clarification
                 Key: BEAM-12816
                 URL: https://issues.apache.org/jira/browse/BEAM-12816
             Project: Beam
          Issue Type: Bug
          Components: runner-py-direct
    Affects Versions: 2.31.0
         Environment: macOS 11.5.1, Python 3.7.10
            Reporter: Flo Vouin


Hi,

I've been using the Direct Runner in Python recently, as part of tests for a 
job aimed to be run on Dataflow. It's been a bit hard to know exactly what 
triggers do, but resorting to simple test cases, it seems like I've noticed 
some inconsistencies. I haven't checked whether those appear when running in 
Dataflow, though.

 

*1. AfterAny(AfterProcessingTime(XX)) acts as 
Repeatedly(AfterProcessingTime(XX))*

I would have expected {{AfterAny(SomeTrigger) <=> SomeTrigger}}, i.e. 
{{AfterAny}} acting as the identity when provided a single trigger. However 
this is not the case for {{AfterProcessingTime}}, because its {{on_fire}} 
result is not forwarded by {{AfterAny}}, due to the [call to should_fire not 
passing the right time 
domain|https://github.com/apache/beam/blob/cbb363f2f01d44dd3f7c063c6cd9d529b5fa9104/sdks/python/apache_beam/transforms/trigger.py#L790].
 This means that {{on_fire}} always returns {{False}}, hence {{AfterAny}} 
acting as {{Repeatedly}} in this case.

Was this the purpose of [saving the time domain from the should_fire 
call|https://github.com/apache/beam/blob/cbb363f2f01d44dd3f7c063c6cd9d529b5fa9104/sdks/python/apache_beam/transforms/trigger.py#L779],
 which does not seem to be used?

*2. AfterAny(AfterCount, AfterProcessingTime) triggers both children when 
AfterCount triggers first*

This is less of a problem, but {{AfterProcessingTime}} will still trigger after 
{{AfterCount}} has triggered, when combined into an {{AfterAny}}. If no element 
has been added to the window, this means an empty pane will be emitted (or 
identical to the previous one, depending on the accumulation mode). My guess is 
that this happens because [AfterProcessingTime does not implement 
reset()|https://github.com/apache/beam/blob/cbb363f2f01d44dd3f7c063c6cd9d529b5fa9104/sdks/python/apache_beam/transforms/trigger.py#L390],
 but I guess there's a reason for this?

*3. Unsafe trigger warning*

It is unclear to me how the {{DataLossReason}}s are combined. For example:

{{Repeatedly(AfterAny(AfterCount, AfterProcessingTime))}}

is detected as an unsafe trigger, but

{{Repeatedly(AfterAny(AfterCount, Repeatedly(AfterProcessingTime)))}}

isn't, although if I'm not mistaken they should basically provide the same 
behaviour.

 

Happy to provide more details if needed, and sorry if the issue doesn't quite 
fit the template you're expecting.

Thanks,

Flo



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to