[
https://issues.apache.org/jira/browse/FLINK-18647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Piotr Nowojski updated FLINK-18647:
-----------------------------------
Priority: Not a Priority (was: Minor)
> How to handle processing time timers with bounded input
> -------------------------------------------------------
>
> Key: FLINK-18647
> URL: https://issues.apache.org/jira/browse/FLINK-18647
> Project: Flink
> Issue Type: Improvement
> Components: API / DataStream
> Affects Versions: 1.11.0
> Reporter: Piotr Nowojski
> Priority: Not a Priority
> Labels: auto-deprioritized-critical, auto-deprioritized-major,
> stale-minor
>
> (most of this description comes from an offline discussion between me,
> [~AHeise], [~roman_khachatryan], [~aljoscha] and [~sunhaibotb])
> In case of end of input (for example for bounded sources), all pending
> (untriggered) processing time timers are ignored/dropped. In some cases this
> is desirable, but for example for {{WindowOperator}} it means that last
> trailing window will not be triggered, causing an apparent data loss.
> There are a couple of ideas what should be considered.
> 1. Provide a way for users to decide what to do with such timers: cancel,
> wait, trigger immediately. For example by overloading the existing methods:
> {{ProcessingTimeService#registerTimer}} and
> {{ProcessingTimeService#scheduleAtFixedRate}} in the following way:
> {code:java}
> ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback
> target, TimerAction timerAction);
> enum TimerAction {
> CANCEL_ON_END_OF_INPUT,
> TRIGGER_ON_END_OF_INPUT,
> WAIT_ON_END_OF_INPUT}
> {code}
> or maybe:
> {code}
> public interface TimerAction {
> void onEndOfInput(ScheduledFuture<?> timer);
> }
> {code}
> But this would also mean we store additional state with each timer and we
> need to modify the serialisation format (providing some kind of state
> migration path) and potentially increase the size foot print of the timers.
> Extra overhead could have been avoided via some kind of {{Map<Timer,
> TimerAction>}}, with lack of entry meaning some default value.
> 2.
> Also another way to solve this problem might be let the operator code decide
> what to do with the given timer. Either ask an operator what should happen
> with given timer (a), or let the operator iterate and cancel the timers on
> endOfInput() (b), or just fire the timer with some endOfInput flag (c).
> I think none of the (a), (b), and (c) would require braking API changes, no
> state changes and no additional overheads. Just the logic what to do with the
> timer would have to be “hardcoded” in the operator’s code. (which btw might
> even has an additional benefit of being easier to change in case of some
> bugs, like a timer was registered with wrong/incorrect {{TimerAction}}).
> This is complicated a bit by a question, how (if at all?) options a), b) or
> c) should be exposed to UDFs?
> 3.
> Maybe we need a combination of both? Pre existing operators could implement
> some custom handling of this issue (via 2a, 2b or 2c), while UDFs could be
> handled by 1.?
--
This message was sent by Atlassian Jira
(v8.20.1#820001)