[
https://issues.apache.org/jira/browse/FLINK-18647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17607907#comment-17607907
]
Divye Kapoor commented on FLINK-18647:
--------------------------------------
[~pnowojski] – if the only implementation were "option 2" above – the "correct"
behavior in both testing and production (wait for timers before triggering
EOF), all of the usecases get resolved in the following manner:
1. mark end of some windowed aggregation – terminates in the normal manner.
2. CEP style timeouts - terminates in the normal manner.
3. handle async timeouts - terminates in the normal manner.
4. clean state - clean up in the normal manner.
Re: 1-3: yes, waiting before EOF is inefficient in only 1 case – the testing
case. In production, the wait is essential (because some other processing might
be happening in the meanwhile outside the Flink job). Firing immediately is
acceptable only in the testing case (otherwise it's a violation of guarantees).
Re: 4 –
> ideally should be dropped on EOF. Can be also fired or waited, but either of
> those two is inefficient. When TTL is huge (hours, days or months) waiting
> can be impractical.
Maybe we don't agree here – in production, the wait is essential to retain
processing time guarantees (savepoints on stop allow the job to restart and
continue with the timer when the job restarts, so long timers are not
troublesome). The only delta is testing – where the following case is not
typical: "When TTL is huge (hours, days or months) waiting can be impractical."
In essence, if we think about it a bit more:
1. Option 2: Doing the right thing works for both testing and production.
The only delta from status quo is that in the minicluster, we wait for the
timers to fire before shutting down the job.
This is the minimal change to fix the identified bug (Processing Time CEP is
broken in minicluster).
All the other changes impact production and should be handled as a feature
request.
> How to handle processing time timers with bounded input
> -------------------------------------------------------
>
> Key: FLINK-18647
> URL: https://issues.apache.org/jira/browse/FLINK-18647
> Project: Flink
> Issue Type: Improvement
> Components: API / DataStream
> Affects Versions: 1.11.0
> Reporter: Piotr Nowojski
> Priority: Not a Priority
> Labels: auto-deprioritized-critical, auto-deprioritized-major,
> stale-minor
>
> (most of this description comes from an offline discussion between me,
> [~AHeise], [~roman_khachatryan], [~aljoscha] and [~sunhaibotb])
> In case of end of input (for example for bounded sources), all pending
> (untriggered) processing time timers are ignored/dropped. In some cases this
> is desirable, but for example for {{WindowOperator}} it means that last
> trailing window will not be triggered, causing an apparent data loss.
> There are a couple of ideas what should be considered.
> 1. Provide a way for users to decide what to do with such timers: cancel,
> wait, trigger immediately. For example by overloading the existing methods:
> {{ProcessingTimeService#registerTimer}} and
> {{ProcessingTimeService#scheduleAtFixedRate}} in the following way:
> {code:java}
> ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback
> target, TimerAction timerAction);
> enum TimerAction {
> CANCEL_ON_END_OF_INPUT,
> TRIGGER_ON_END_OF_INPUT,
> WAIT_ON_END_OF_INPUT}
> {code}
> or maybe:
> {code}
> public interface TimerAction {
> void onEndOfInput(ScheduledFuture<?> timer);
> }
> {code}
> But this would also mean we store additional state with each timer and we
> need to modify the serialisation format (providing some kind of state
> migration path) and potentially increase the size foot print of the timers.
> Extra overhead could have been avoided via some kind of {{Map<Timer,
> TimerAction>}}, with lack of entry meaning some default value.
> 2.
> Also another way to solve this problem might be let the operator code decide
> what to do with the given timer. Either ask an operator what should happen
> with given timer (a), or let the operator iterate and cancel the timers on
> endOfInput() (b), or just fire the timer with some endOfInput flag (c).
> I think none of the (a), (b), and (c) would require braking API changes, no
> state changes and no additional overheads. Just the logic what to do with the
> timer would have to be “hardcoded” in the operator’s code. (which btw might
> even has an additional benefit of being easier to change in case of some
> bugs, like a timer was registered with wrong/incorrect {{TimerAction}}).
> This is complicated a bit by a question, how (if at all?) options a), b) or
> c) should be exposed to UDFs?
> 3.
> Maybe we need a combination of both? Pre existing operators could implement
> some custom handling of this issue (via 2a, 2b or 2c), while UDFs could be
> handled by 1.?
--
This message was sent by Atlassian Jira
(v8.20.10#820010)