[
https://issues.apache.org/jira/browse/FLINK-18647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17616262#comment-17616262
]
Yun Gao commented on FLINK-18647:
---------------------------------
Hi [~pnowojski] sorry for the long delay due to just back from the holiday.
{quote} - each operator might have different default behaviour
- some operators might override/ignore/reject such changes, for all/some
timers - like maybe hypothetical WindowOperatorWithTTLTimers registering two
different types of timers could honour the setting for firing results, but
would always drop the TTL timers{quote}
I also agree with these two levels, namely the default behaviors of operators
and per-timer behavior. And with my understandings the two levels of behaviors
are decided by the operator logics.
{quote}Furthermore, actually WindowOperator users might be interested in either
of the three settings for the processing time windows - depending on the
business logic it might be the most appropriate to either: fire immediately,
drop the timers, or wait for the timers to fire naturally.
{quote}
I got the thoughts behinds the design, but I'm not quite sure if users have
requirements in real-life jobs, thus I think I'm neutral with the API of
{{org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator#setEndOfInputTimerBehaviour.}}
Do you have more inputs for this point? Or do you think it might be also
acceptable if we postpone adding this API as a second step till we have more
inputs from users?
Also since this issue seems to require changes in the users' API, perhaps we
turn it to be a public discussion in the mail list based on the current
conclusions~?
> How to handle processing time timers with bounded input
> -------------------------------------------------------
>
> Key: FLINK-18647
> URL: https://issues.apache.org/jira/browse/FLINK-18647
> Project: Flink
> Issue Type: Improvement
> Components: API / DataStream
> Affects Versions: 1.11.0
> Reporter: Piotr Nowojski
> Priority: Not a Priority
> Labels: auto-deprioritized-critical, auto-deprioritized-major,
> stale-minor
>
> (most of this description comes from an offline discussion between me,
> [~AHeise], [~roman_khachatryan], [~aljoscha] and [~sunhaibotb])
> In case of end of input (for example for bounded sources), all pending
> (untriggered) processing time timers are ignored/dropped. In some cases this
> is desirable, but for example for {{WindowOperator}} it means that last
> trailing window will not be triggered, causing an apparent data loss.
> There are a couple of ideas what should be considered.
> 1. Provide a way for users to decide what to do with such timers: cancel,
> wait, trigger immediately. For example by overloading the existing methods:
> {{ProcessingTimeService#registerTimer}} and
> {{ProcessingTimeService#scheduleAtFixedRate}} in the following way:
> {code:java}
> ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback
> target, TimerAction timerAction);
> enum TimerAction {
> CANCEL_ON_END_OF_INPUT,
> TRIGGER_ON_END_OF_INPUT,
> WAIT_ON_END_OF_INPUT}
> {code}
> or maybe:
> {code}
> public interface TimerAction {
> void onEndOfInput(ScheduledFuture<?> timer);
> }
> {code}
> But this would also mean we store additional state with each timer and we
> need to modify the serialisation format (providing some kind of state
> migration path) and potentially increase the size foot print of the timers.
> Extra overhead could have been avoided via some kind of {{Map<Timer,
> TimerAction>}}, with lack of entry meaning some default value.
> 2.
> Also another way to solve this problem might be let the operator code decide
> what to do with the given timer. Either ask an operator what should happen
> with given timer (a), or let the operator iterate and cancel the timers on
> endOfInput() (b), or just fire the timer with some endOfInput flag (c).
> I think none of the (a), (b), and (c) would require braking API changes, no
> state changes and no additional overheads. Just the logic what to do with the
> timer would have to be “hardcoded” in the operator’s code. (which btw might
> even has an additional benefit of being easier to change in case of some
> bugs, like a timer was registered with wrong/incorrect {{TimerAction}}).
> This is complicated a bit by a question, how (if at all?) options a), b) or
> c) should be exposed to UDFs?
> 3.
> Maybe we need a combination of both? Pre existing operators could implement
> some custom handling of this issue (via 2a, 2b or 2c), while UDFs could be
> handled by 1.?
--
This message was sent by Atlassian Jira
(v8.20.10#820010)