[ 
https://issues.apache.org/jira/browse/BEAM-2402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16034093#comment-16034093
 ] 

Kenneth Knowles commented on BEAM-2402:
---------------------------------------

Reading your code to understand this, it seems the goal of the trigger is:

1. Fire if some element count is exceeded.
2. Fire if the processing time between two element arrivals is a certain gap.

So I have a couple high level comments:

* The code you shared will fire only when an element comes in, so it will be 
higher latency than a timer. I understand you did it because timers were not 
performant.
* Can you still combine these two criteria with {{AfterFirst.of}} ?
* The other issue is that {{shouldFire}} should be strictly a function of 
current state, not instance fields. It seems you did this to avoid saving data 
to state? But state can/should be cached in memory until commit, which means 
the "WARN" bit that you commented on should be redundant. You could achieve 
something similar but more robust (still kind of a hack) by setting 
{{shouldFire}} to a {{ValueState<Boolean>}} and just reading it. We want this 
sort of thing to be not necessary, since it is basically the same as the simper 
way of just reading the current situation to decide whether to fire.

And if/when you propose a PR, please also include the SDK side concrete syntax 
for the trigger.

Perhaps the timer scan you refer to can be addressed at the Flink level - what 
do you think, [~aljoscha]?

> Support AfterPane.elementGapAtMost() trigger and its combination with 
> elementCountAtLeast()
> -------------------------------------------------------------------------------------------
>
>                 Key: BEAM-2402
>                 URL: https://issues.apache.org/jira/browse/BEAM-2402
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-java-core
>            Reporter: Pei He
>            Assignee: Pei He
>
> We need a timestamp-driven trigger to use as a cheaper (or more efficient) 
> version of the ProcessingTime trigger.
> The problem of using ProcessingTime trigger is that current runners' supports 
> are not very efficient, and couldn't work for pipelines that have lots of 
> keys (for example, flink runner will scan timers for all keys when watermark 
> advance).
> We have used AfterPane.elementGapAtMost() trigger in our production, and want 
> to merge it back. And, we believe it could be the solution for people who 
> have the similar issue.
> Implementation for reference:
> https://github.com/apache/beam/compare/master...peihe:custom-after-pane?expand=1



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to