[
https://issues.apache.org/jira/browse/FLINK-18647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17604879#comment-17604879
]
Divye Kapoor commented on FLINK-18647:
--------------------------------------
Here are 3 usecases to solve as part of addressing this ticket:
Major usecase: Integration Testing of Processing time jobs doing session
windows / tumbling time windows etc.
One example is a stream-stream join that has the following graph:
S1, S2 => Union => Session Window (30s max gap) => Sink.
# In the test environment, the processing time streams end when S1 and S2 ends
(EOF propagates too soon)
The timers associated with the session window don't have the opportunity to
trigger before the Session Window emits EOF.
The Flink MiniCluster shuts down and the job terminates. Bug: The sink does not
receive the session window triggers.
Possible fix Option 1: On all input EOFs, trigger all the timers (in-order &
immediately) before passing EOF to Sink. (correct implementation for testing)
Possible fix Option 2: Send EOF to Sink after inputs are EOF and all timers
have triggered at their natural (scheduled) times. (correct implementation for
production).
Possible fix Option 3: Cancel all the pending timers immediately and send the
EOF downstream (current implementation - buggy for Windowed operators).
Canceling the timers is a last resort because if the timers are not triggered,
it might lead to state leaks or other resource leaks (eg. files / sockets) -
there's no timer cancelled callback function. If option 3 has to be retained,
then a timer canceled callback function needs to be provided and the callback
function should have the option to emit outputs before the output stream is
closed as EOF. This is a more complex implementation.
I would suggest going for an implementation that only implements Option 1 and
Option 2 (as Option 1 is roughly the same as Option 3 without the complexity of
timer cancels). User code can easily convert Option 1 to Option 3 if they so
desire by skipping the timer body.
> How to handle processing time timers with bounded input
> -------------------------------------------------------
>
> Key: FLINK-18647
> URL: https://issues.apache.org/jira/browse/FLINK-18647
> Project: Flink
> Issue Type: Improvement
> Components: API / DataStream
> Affects Versions: 1.11.0
> Reporter: Piotr Nowojski
> Priority: Not a Priority
> Labels: auto-deprioritized-critical, auto-deprioritized-major,
> stale-minor
>
> (most of this description comes from an offline discussion between me,
> [~AHeise], [~roman_khachatryan], [~aljoscha] and [~sunhaibotb])
> In case of end of input (for example for bounded sources), all pending
> (untriggered) processing time timers are ignored/dropped. In some cases this
> is desirable, but for example for {{WindowOperator}} it means that last
> trailing window will not be triggered, causing an apparent data loss.
> There are a couple of ideas what should be considered.
> 1. Provide a way for users to decide what to do with such timers: cancel,
> wait, trigger immediately. For example by overloading the existing methods:
> {{ProcessingTimeService#registerTimer}} and
> {{ProcessingTimeService#scheduleAtFixedRate}} in the following way:
> {code:java}
> ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback
> target, TimerAction timerAction);
> enum TimerAction {
> CANCEL_ON_END_OF_INPUT,
> TRIGGER_ON_END_OF_INPUT,
> WAIT_ON_END_OF_INPUT}
> {code}
> or maybe:
> {code}
> public interface TimerAction {
> void onEndOfInput(ScheduledFuture<?> timer);
> }
> {code}
> But this would also mean we store additional state with each timer and we
> need to modify the serialisation format (providing some kind of state
> migration path) and potentially increase the size foot print of the timers.
> Extra overhead could have been avoided via some kind of {{Map<Timer,
> TimerAction>}}, with lack of entry meaning some default value.
> 2.
> Also another way to solve this problem might be let the operator code decide
> what to do with the given timer. Either ask an operator what should happen
> with given timer (a), or let the operator iterate and cancel the timers on
> endOfInput() (b), or just fire the timer with some endOfInput flag (c).
> I think none of the (a), (b), and (c) would require braking API changes, no
> state changes and no additional overheads. Just the logic what to do with the
> timer would have to be “hardcoded” in the operator’s code. (which btw might
> even has an additional benefit of being easier to change in case of some
> bugs, like a timer was registered with wrong/incorrect {{TimerAction}}).
> This is complicated a bit by a question, how (if at all?) options a), b) or
> c) should be exposed to UDFs?
> 3.
> Maybe we need a combination of both? Pre existing operators could implement
> some custom handling of this issue (via 2a, 2b or 2c), while UDFs could be
> handled by 1.?
--
This message was sent by Atlassian Jira
(v8.20.10#820010)