Piotr Nowojski created FLINK-18647:
--------------------------------------
Summary: How to handle processing time timers with bounded input
Key: FLINK-18647
URL: https://issues.apache.org/jira/browse/FLINK-18647
Project: Flink
Issue Type: Improvement
Components: API / DataStream
Affects Versions: 1.11.0
Reporter: Piotr Nowojski
(most of this description comes from an offline discussion between me,
[~AHeise], [~roman_khachatryan], [~aljoscha] and [~sunhaibotb])
In case of end of input (for example for bounded sources), all pending
(untriggered) processing time timers are ignored/dropped. In some cases this is
desirable, but for example for {{WindowOperator}} it means that last trailing
window will not be triggered, causing an apparent data loss.
There are a couple of ideas what should be considered.
1. Provide a way for users to decide what to do with such timers: cancel, wait,
trigger immediately. For example by overloading the existing methods:
{{ProcessingTimeService#registerTimer}} and
{{ProcessingTimeService#scheduleAtFixedRate}} in the following way:
{code:java}
ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback target,
TimerAction timerAction);
enum TimerAction {
CANCEL_ON_END_OF_INPUT,
TRIGGER_ON_END_OF_INPUT,
WAIT_ON_END_OF_INPUT}
{code}
or maybe:
{code}
public interface TimerAction {
void onEndOfInput(ScheduledFuture<?> timer);
}
{code}
But this would also mean we store additional state with each timer and we need
to modify the serialisation format (providing some kind of state migration
path) and potentially increase the size foot print of the timers.
Extra overhead could have been avoided via some kind of {{Map<Timer,
TimerAction>}}, with lack of entry meaning some default value.
2.
Also another way to solve this problem might be let the operator code decide
what to do with the given timer. Either ask an operator what should happen with
given timer (a), or let the operator iterate and cancel the timers on
endOfInput() (b), or just fire the timer with some endOfInput flag (c).
I think none of the (a), (b), and (c) would require braking API changes, no
state changes and no additional overheads. Just the logic what to do with the
timer would have to be “hardcoded” in the operator’s code. (which btw might
even has an additional benefit of being easier to change in case of some bugs,
like a timer was registered with wrong/incorrect {{TimerAction}}).
This is complicated a bit by a question, how (if at all?) options a), b) or c)
should be exposed to UDFs?
3.
Maybe we need a combination of both? Pre existing operators could implement
some custom handling of this issue (via 2a, 2b or 2c), while UDFs could be
handled by 1.?
--
This message was sent by Atlassian Jira
(v8.3.4#803005)