Sorry to pick up this discussion again after a long time.
@Yun Gao, could you share the PoC if possible?

Best
Yun Tang

On 2022/12/08 10:20:21 Piotr Nowojski wrote:
> Sounds good to me as well!
> 
> Best,
> Piotrek
> 
> czw., 8 gru 2022 o 09:53 Dawid Wysakowicz <dwysakow...@apache.org>
> napisał(a):
> 
> > Sounds like a good plan to me.
> > On 08/12/2022 08:58, Yun Gao wrote:
> >
> > Hi Dawid,
> >
> > Very thanks for the discussion and sorry for the delayed response
> > since I was hesitated on some points.
> >
> > But as a whole, with some more thought, first I agree with that adding
> > the trigger() / cancle() methods to some kind of timer object is not
> > necessary
> > for us to achieve the exactly-once for the operators. We could follow the
> > direction of "modifying the implementation of the operators" to achieve the
> > same target.
> >
> > But continue to think with this direction, it now looks to me it is also
> > not
> > needed to add the callback to the timer services:
> > 1. For InternalTimerService, the operators could just call
> > `InternalTimerService
> > #forEachProcessingTimer()` on finish to handle the pending timers.
> > 2. For the timers registered to the underlying ProcessingTimerService, at
> > least in
> > the currently listed scenarios, the operators itself knows what is the
> > remaining work
> > (e.g., the FileWriter knows if it has in-progress file to flush).
> >
> > Operators could handle the remaining timers in finish() method.
> >
> > Then the only interface we need to consider is that added to the
> > ProcessFunction. The
> > current interface also looks ok to me.
> >
> > If you think the above option works, I could first have a PoC that
> > demonstrate it is sufficient
> > to only modify the operator implementation to handling the remaining
> > workers properly on
> > finish(). If there are new issues I'll post here and we could have some
> > more discussion.
> >
> > Best,
> > Yun Gao
> >
> >
> > ------------------Original Mail ------------------
> > *Sender:*Dawid Wysakowicz <dwysakow...@apache.org>
> > <dwysakow...@apache.org>
> > *Send Date:*Fri Dec 2 21:21:25 2022
> > *Recipients:*Dev <dev@flink.apache.org> <dev@flink.apache.org>
> > *Subject:*Re: [DISCUSS] FLIP-269: Properly Handling the Processing Timers
> > on Job Termination
> >
> >> Ad. 1
> >>
> >> I'd start with ProcessingTimerService as that's the only public
> >> interface. It is exposed in the Sink V2 interface. In this scenario it
> >> would be the Sink interface that need to extend from a EOFTimersHandler. I
> >> believe it would be hard to pass it from there to the ProcessingTimeService
> >> as it is passed from the outside e.g. in the ProcessingTimeServiceAware.
> >> For that reason I'd go with a registration method in that interface.
> >>
> >> In ProcessFunction I'd go with a mixin approach, so a ProcessFunction can
> >> extend from EOFTimersHandler. I'd do that because ProcessFunction does not
> >> have an init/open method where we could register the handler.
> >>
> >> On operator level I'd have a registration method in InternalTimerService.
> >> I believe that's the only way to handle the above ProcessFunction aproach.
> >> E.g. in KeyedProcessOperator you need to check if the UDF extend from the
> >> interface not the operator itself.
> >>
> >> Ad. 2
> >>
> >> I'd go with
> >>
> >> *(Keyed)ProcessFunction:*
> >>
> >> interface EOFTimersHandler {
> >>
> >>  void handleProcessingTimer(long timestamp, Context);
> >>
> >> }
> >>
> >> interface Context {
> >>         public abstract <X> void output(OutputTag<X> outputTag, X value);
> >>
> >>         public abstract K getCurrentKey();
> >>
> >> // we can extend it for waitFor later
> >>
> >> }
> >>
> >> *ProcessingTimeService: *
> >>
> >> interface EOFTimersHandler {
> >>
> >>  void handleProcessingTimer(long timestamp, Context);
> >>
> >> }
> >>
> >> interface Context {
> >>
> >> // we can extend it for waitFor later
> >>
> >> }
> >>
> >> *InternalTimeService:*
> >>
> >> interface EOFTimersHandler {
> >>
> >>  void handleProcessingTimer(InternalTimer<K,N> timer Context);
> >>
> >> }
> >>
> >> interface Context {
> >>
> >> // we can extend it for waitFor later
> >>
> >> }
> >>
> >> Personally I'd not try to unify those places too much. They have also
> >> different visibilities (public/internal), have access to different set of
> >> metadata (key/namespace).
> >>
> >>
> >> Ad 3.
> >>
> >> I don't like the having the trigger/cancel methods, because:
> >>
> >> 1. I don't like the back and forth between system and UDF
> >>
> >> 2. Yes, the biggest issue I have is with the possibility with registering
> >> new timers. I am trying to be on the safe side here. I don't like the idea
> >> of dropping them, because it is again making assumptions what users do with
> >> those timers. What if they e.g. emit counter if it reached certain
> >> threshold? We'd need an additional flag in the method that is the final
> >> timer. My sentiment is that we're making it questionably easier to trigger
> >> a timer for the cost of openning up for unforeseen problems with follow up
> >> registration.
> >>
> >> Best,
> >>
> >> Dawid
> >> On 30/11/2022 12:13, Yun Gao wrote:
> >>
> >> Hi Dawid, PiotrVery thanks for the discussion!As a whole I think we are 
> >> already consistent with the callback option, and I don't think I opposed 
> >> that we could modify the current internal implementation. But from my side 
> >> it is still not clear what the actual interfaces are proposing. Let me 
> >> first try to summarize that a bit:1) Which object does the handlers 
> >> register on?It seems there are two options, one is to timer services 
> >> (InternalTimerService/ ProcessingTimerService or some equivalent things 
> >> after refactoring), the otherone is as a lifecycle of the operator. I'm 
> >> now tending to the latter one, how do you think on this part?2) What is 
> >> the interface of the handler?Option 1 is that interface SomeHandlerName { 
> >> void processingTimer(Timer timer);}class Timer { long getTimestamp(); void 
> >> trigger(); void cancel(); // Other actions if required. }But it seems 
> >> there is controversy on whether to add actions to the timer class. If 
> >> without that, with my understanding the interfaces of the Option 2 
> >> areinterface SomeHandlerName { void processTimer(Timer timer); }interface 
> >> KeyedSomeHandlerName<KEY, NAMESPACE> { void 
> >> processKeyedTimer(KeyedTimer<KEY, NAMESPACE> timer, Context ctx); }class 
> >> Timer { long getTimestamp();}class KeyedTimer<KEY, NAMESPACE> extends 
> >> Timer { KEY getKey(); NAMESPACE getNamespace();}void Context {void 
> >> executeAtScheduledTime(Consumer<timer> handler);}As Piotr has pointed out, 
> >> if we could eliminate the logic of namespace, we could thenremove the 
> >> namespace related type parameter and method from the interfaces.Do I 
> >> understand right?Besides, I'm still fully got the reason that why we 
> >> should not add the actions to the timer class, in consideration that it 
> >> seems in most cases users could implement their logical with simply 
> >> calling timer.trigger() (I think the repeat registration is indeed a 
> >> problem, but I think we could ignore the timers registered during 
> >> termination). Could you further enlighten me a bit on this part?Best,Yun 
> >> Gao------------------------------------------------------------------From:Piotr
> >>  Nowojski <pnowoj...@apache.org> <pnowoj...@apache.org>Send Time:2022 Nov. 
> >> 30 (Wed.) 17:10To:dev <dev@flink.apache.org> 
> >> <dev@flink.apache.org>Subject:Re: [DISCUSS] FLIP-269: Properly Handling 
> >> the Processing Timers on Job TerminationHi,I have a couple of 
> >> remarks.First a general one. For me the important part in the design of 
> >> this API ishow to expose this to Flink users in public interfaces. 
> >> NamelyProcessFunction and StreamOperator. InternalTimerService is an 
> >> internalclass, so we can change it and break it as needed in the 
> >> future.For registering a handler like proposed by Dawid:interface 
> >> SomeHandlerName { void onTimer(/* whatever type it is */ timer, Context 
> >> ctx ) { }}makes sense to me. For the InternalTimerService I think it 
> >> doesn't mattertoo much what we do. We could provide a similar interface as 
> >> for theProcessFunction/StreamOperator, it doesn't have to be the same one. 
> >> On thecontrary, I think it shouldn't be the same, as part of this effort 
> >> weshouldn't be exposing the concept of `Namespaces` to the public facing 
> >> API.Re the "waitFor". Theoretically I see arguments why users might want 
> >> to usethis, but I'm also not convinced whether that's necessary in 
> >> practice. Iwould be +1 either way. First version can be without this 
> >> functionality andwe can add it later (given that we designed a good place 
> >> to add it in thefuture, like the `Context` proposed by Dawid). But I'm 
> >> also fine adding itnow if others are insisting.Best,Piotrekśr., 30 lis 
> >> 2022 o 09:18 Dawid Wysakowicz <dwysakow...@apache.org> 
> >> <dwysakow...@apache.org>napisał(a):
> >>
> >> WindowOperator is not implemented by users. I can see that 
> >> forInternalTimerService we'll needinterface PendingTimerProcessor<KEY, 
> >> NAMESPACE> {void onTimer(InternalTimer<KEY, NAMESPACE> timer) 
> >> {doHandleTimer(timer);}I don't see a problem with that.As you said 
> >> ProcessingTimeService is a user facing interface andcompletely unrelated 
> >> to the InternalTimerService. I don't see a reasonwhy we'd need to unify 
> >> those.As for the waitFor behaviour. Personally, I have not been convinced 
> >> itis necessary. Maybe it's just my lack of vision, but I can't think of 
> >> ascenario where I'd use it. Still if we need it, I'd go for something 
> >> like:void onTimer(/* whatever type it is */ timer, Context ctx ) 
> >> {}interface Context {void executeAtScheduledTime(Consumer<timer> 
> >> handler);}That way you have independent simple interfaces that need to 
> >> work onlyin a single well defined scenario and you don't need to match 
> >> aninterface to multiple different cases.Best,DawidOn 30/11/2022 07:27, Yun 
> >> Gao wrote:
> >>
> >> Hi Dawid,Thanks for the comments!As a whole I'm also open to the API and I 
> >> also prefer to use simplebut flexible interfaces, but it still looks there 
> >> are some problem tojust let users to implement the termination 
> >> actions.Let's take the WindowOperator as an example. As seen in [1],in the 
> >> timer processing logic it needs to acquire the key / namespaceinformation 
> >> bound to the timer (which is only supported by the
> >>
> >> InternalTimerService).
> >>
> >> Thus if we want users to implement the same logic on termination, we
> >>
> >> either let users
> >>
> >> to trigger the timer handler directly or we also allows users to access
> >>
> >> these piece of
> >>
> >> information. If we go with the later direction, we might need to provide
> >>
> >> interfaces like
> >>
> >> interface PendingTimerProcessor<KEY, NAMESPACE> {void onTimer(Timer<KEY, 
> >> NAMESPACE> timer) {doHandleTimer(timer);}}class Timer<KEY, NAMESPACE> 
> >> {long getTimestamp();KEY getKey();NAMESPACE getNamespace();}Then we'll 
> >> have the issue that since we need the interface to handle
> >>
> >> both of cases of
> >>
> >> InternalTimerSerivce and raw ProcessTimeService, the later do not have
> >>
> >> key and
> >>
> >> namespace information attached, and its also be a bit inconsistency for
> >>
> >> users to have to set
> >>
> >> the KEY and NAMESPACE types.Besides, it looks to me that if we want to 
> >> implement behaviors like
> >>
> >> waiting for, it might
> >>
> >> be not simply reuse the time handler time, then it requires every
> >>
> >> operator authors to
> >>
> >> re-implement such waiting logics.
> >>
> >> Moreover it still have the downside that if you call back to the
> >>
> >> `onTimer` method after
> >>
> >> `trigger` you have access to the Context which lets you register new
> >>
> >> timers.
> >>
> >> I think we could simply drop the timers registered during we start
> >>
> >> processing the pending timers
> >>
> >> on termination. Logically there should be no new data after termination.
> >>
> >> I think I am not convinced to these arguments. First of all I'm afraid
> >>
> >> there is no clear distinction
> >>
> >> in that area what is runtime and what is not. I always found
> >>
> >> `AbstracStreamOperator(*)` actually part
> >>
> >> of runtime or Flink's internals and thus I don't find
> >>
> >> `InternalTimerService` a utility, but a vital part
> >>
> >> of the system. Let's be honest it is impossible to implement an
> >>
> >> operator without extending from
> >>
> >> `AbstractStreamOperator*`.What would be the problem with having a
> >>
> >> proper implementation in
> >>
> >> `InternalTimerService`? Can't we do it like this?:
> >>
> >> I think the original paragraph is only explanation to that the interface
> >>
> >> is harder to support if we
> >>
> >> allows the users to implement the arbitrary logic. But since now we are
> >>
> >> at the page with the callback
> >>
> >> option, users could always be allowed to implement arbitrary logic no
> >>
> >> matter we support timer.trigger()
> >>
> >> or not, thus I think now there is no divergence on this point. I also
> >>
> >> believe in we'll finally have some logic
> >>
> >> similar to the proposed one that drain all the times and process 
> >> it.Best,Yun Gao[1]
> >>
> >> https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488
> >>  
> >> <https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488
> >>  > 
> >> <https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488><https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488
> >>  
> >> <https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488
> >>  > 
> >> <https://github.com/apache/flink/blob/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L488>
> >>
> >> ------------------------------------------------------------------From:Dawid
> >>  Wysakowicz <dwysakow...@apache.org> <dwysakow...@apache.org>Send 
> >> Time:2022 Nov. 28 (Mon.) 23:33To:dev <dev@flink.apache.org> 
> >> <dev@flink.apache.org>Subject:Re: [DISCUSS] FLIP-269: Properly Handling 
> >> the Processing Timers
> >>
> >> on Job Termination
> >>
> >> Do we really need to have separate methods for
> >>
> >> triggering/waiting/cancelling. To me it sounds rather counterintuitive. 
> >> Whycan't users just execute whatever they want in the handler itself 
> >> insteadof additional back and forth with the system? Moreover it still 
> >> have thedownside that if you call back to the `onTimer` method after 
> >> `trigger` youhave access to the Context which lets you register new timers.
> >>
> >> I find following approach much simpler:void onTimer(...) 
> >> {doHandleTimer(timestamp);}void processPendingTimer(...) {// 
> >> triggerdoHandleTimer(timestamp);// for cancel, simply do nothing...}Sorry 
> >> I might not make it very clear here. I think the difficulty with
> >>
> >> supported setting the currentKey is a special issue for the 
> >> callbackoptions (no matter what the interface is) since it allows users to 
> >> executelogic other than the one registered with the timers. The complexity 
> >> comesfrom that currently we have two level of TimerServices: 
> >> TheProcessingTimerService (there is no key) and InternalTimerService 
> >> (withkey). Currently only ProcessingTimerService is exposed to the runtime 
> >> andInternalTimerService is much more a utility to implement the operator. 
> >> Thenwith the current code, the runtime could only access 
> >> toProcessingTimerService on termination.
> >>
> >> I think I am not convinced to these arguments. First of all I'm afraid
> >>
> >> there is no clear distinction in that area what is runtime and what is 
> >> not.I always found `AbstracStreamOperator(*)` actually part of runtime 
> >> orFlink's internals and thus I don't find `InternalTimerService` a 
> >> utility,but a vital part of the system. Let's be honest it is impossible 
> >> toimplement an operator without extending from `AbstractStreamOperator*`.
> >>
> >> What would be the problem with having a proper implementation in
> >>
> >> `InternalTimerService`? Can't we do it like this?:
> >>
> >> AbstractStreamOperator#finish() 
> >> {internalTimerService.finish();}InternalTimerService#finish() {while 
> >> ((timer = processingTimeTimersQueue.peek()) != null) 
> >> {keyContext.setCurrentKey(timer.getKey());processingTimeTimersQueue.poll();onEndOfInputHandler.processPendingTimer(timer);}}If
> >>  we only executes some predefined actions, we do not need to worry
> >>
> >> about the implementation of InternalTimerService and just execute 
> >> theregistered timers. But if we allow users to execute arbitrary logic, 
> >> weneed to be also aware of the InternalTimerServices and parse the key 
> >> fromthe timers stored in it. I think we should always have method to 
> >> overcomethis issue, but to support the callback options would be more 
> >> complex.
> >>
> >> I am not sure, having "predefined actions" would be good enough that we
> >>
> >> do not need to set a key. As a user I'd anyhow expect the proper key to 
> >> beset in processPendingTimer.
> >>
> >> Best,DawidOn 24/11/2022 08:51, Yun Gao wrote:Hi Piotr / Divye, Very thanks 
> >> for the discussion! First IMO it seems we
> >>
> >> have reached the consensus on the high-level API: Most operators 
> >> shouldusually have only one reasonable action to the pending timers 
> >> ontermination, thus we could let the operators to implement its own 
> >> actionswith the low-level interface provided. The only exception is 
> >> theProcessFunction, with which users might register customized timers, 
> >> thususers might also defines the actions on termination (If I 
> >> havemisunderstandings here, please correct me). For the low-level API, I 
> >> couldget the benefits with the callback options: since in most cases an 
> >> operatorhas only one action to all the timers, its a waste for us to store 
> >> the sameflag for all the timers, also with a lot of code / state format 
> >> changes.But since it is enough for most users to simply trigger / cacnel 
> >> thetimers, it would be redundant for users to implement the logic twice. 
> >> Thusperhaps we might combine the benefits of the two options: We might 
> >> have aseparate interface public interface TimerHandlersOnTermination { 
> >> voidprocessPendingTimer(Timer timer, long currentTime); } public class 
> >> Timer {long getRegisteredTimestamp(); void trigger(); void waitFor(); 
> >> voidcancel(); } Then if an operator have implemented 
> >> theTimerHandlersOnTermination interface, on termination we could 
> >> callprocessPendingTimer(xx) for every pending timers. Users might 
> >> simplytrigger / waitFor / cancel it, or execute some other logics if 
> >> needed. Thenfor the ProcessFunction we might have a similar interface 
> >> toprocessPendingTimer, except we might need to provide Context / Collector 
> >> tothe ProcessFunction. Do you think this would be a good direction? 
> >> Also@Piotr I don't see a problem here. Interface doesn't have to reflect 
> >> that,only the runtime must set the correct key context before executing 
> >> thehandler dealing with the processing time timers at the end of 
> >> input/time.Sorry I might not make it very clear here. I think the 
> >> difficulty withsupported setting the currentKey is a special issue for the 
> >> callbackoptions (no matter what the interface is) since it allows users to 
> >> executelogic other than the one registered with the timers. The complexity 
> >> comesfrom that currently we have two level of TimerServices: 
> >> TheProcessingTimerService (there is no key) and InternalTimerService 
> >> (withkey). Currently only ProcessingTimerService is exposed to the runtime 
> >> andInternalTimerService is much more a utility to implement the operator. 
> >> Thenwith the current code, the runtime could only access 
> >> toProcessingTimerService on termination. If we only executes some 
> >> predefinedactions, we do not need to worry about the implementation 
> >> ofInternalTimerService and just execute the registered timers. But if 
> >> weallow users to execute arbitrary logic, we need to be also aware of 
> >> theInternalTimerServices and parse the key from the timers stored in it. 
> >> Ithink we should always have method to overcome this issue, but to 
> >> supportthe callback options would be more complex. Best, Yun 
> >> Gao------------------------------------------------------------------From:Divye
> >>  Kapoor <dkap...@pinterest.com.INVALID> <dkap...@pinterest.com.INVALID> 
> >> <mailto:dkap...@pinterest.com.INVALID > <dkap...@pinterest.com.INVALID> 
> >> Send Time:2022 Nov. 24 (Thu.) 08:50To:dev <dev@flink.apache.org> 
> >> <dev@flink.apache.org> <mailto:dev@flink.apache.org > 
> >> <dev@flink.apache.org> Cc:XenonDevelopment Team <xenon-...@pinterest.com> 
> >> <xenon-...@pinterest.com> <mailto:xenon-...@pinterest.com 
> >> <xenon-...@pinterest.com>
> >>
> >> Subject:Re: Re: [DISCUSS] FLIP-269: Properly Handling the Processing
> >>
> >> Timers on Job Termination Sounds good. Looks like we're on the same 
> >> page.Thanks! Divye On Wed, Nov 23, 2022 at 2:41 AM Piotr Nowojski 
> >> <pnowoj...@apache.org> <mailto:pnowoj...@apache.org > 
> >> <pnowoj...@apache.org> wrote: Hi Divye Ithink we are mostly on the same 
> >> page. Just to clarify/rephrase: One thingto think about - on EOF “trigger 
> >> immediately” will mean that theasynchronous wait timeout timers will also 
> >> fire - which is undesirable Ididn't mean to fire all timers immediately in 
> >> all of the built-inoperators. Just that each built-in operator can have a 
> >> hard coded way(without a way for users to change it) to handle those 
> >> timers. Windowedoperators would trigger the lingering timers (flush 
> >> outputs),AsyncWaitOperator could just ignore them. The same way users 
> >> could registerEOF timer handlers in the ProcessFunction as Dawid 
> >> Wysakowicz proposed, we(as flink developers) could use the same mechanism 
> >> to implement anybehaviour we want for the built-in operators. There should 
> >> be no need toadd any separate mechanism. Best, Piotrek śr., 23 lis 2022 o 
> >> 08:21 DivyeKapoor <dkap...@pinterest.com.invalid> 
> >> <dkap...@pinterest.com.invalid> <mailto:dkap...@pinterest.com.invalid > 
> >> <dkap...@pinterest.com.invalid> napisał(a): Thanks Yun/Piotrek, Somebrief 
> >> comments inline below. On Tue, Nov 22, 2022 at 1:37 AM Piotr 
> >> Nowojski<pnowoj...@apache.org> <pnowoj...@apache.org> 
> >> <mailto:pnowoj...@apache.org > <pnowoj...@apache.org> wrote: Hi, All inall 
> >> I would agree with Dawid's proposal. +1 We can add the flexibility ofhow 
> >> to deal with the timers in the low level API via adding a handler - 
> >> ifsomeone needs to customize it, he will always have a workaround. Note 
> >> aftergiving it more thought, I agree that registering some handlers is 
> >> betterthan overloading the register timer method and modifying the timer's 
> >> state.+1. At the same time, we can force the most sensible semantic that 
> >> we thinkfor the couple of built-in operators, which should be 
> >> prettystraightforward (either ignore the timers, or fire them at once). I 
> >> agreethere might be some edge cases, that theoretically user might want to 
> >> waitfor the timer to fire naturally, but: 1. I'm not sure how common 
> >> inpractice this will be. If not at all, then why should we be 
> >> complicatingthe API/system? That’s fair. However, the specifics are very 
> >> importanthere. One thing to think about - on EOF “trigger immediately” 
> >> will meanthat the asynchronous wait timeout timers will also fire - which 
> >> isundesirable (because they are racing with the last async call). 
> >> However,the issue is cleanly resolved by waiting for the timer to be 
> >> canceled whenthe last event is processed. (“Wait for” case). Ignoring the 
> >> timer has theleast justification. Registering the handler as per Dawid’s 
> >> proposal andhaving that handler unregister the timers on EOF makes best 
> >> sense. Thissolution also unifies the trigger immediately case as that 
> >> handler canreregister the timers for early termination. The proposal: 1. 
> >> Operatorreceives EOF 2. EOF timer handler triggers 3. EOF handler adjusts 
> >> theregistered timers for early trigger or ignore. If wait-for behavior 
> >> isdesired, timers are not changed. This is controlled in client code. 
> >> 4.Operator waits for all timers to drain/trigger. (“Always”). There is 
> >> nospecial handling for ignore/early trigger. 5. Operator allows job 
> >> toproceed with shutdown. The only api change needed is an EOF handler. 
> >> Theother agreement we need is that “Wait for” is the desired behavior 
> >> inprocessing time and that processing time is fundamentally different 
> >> fromevent time in this respect. (I have changed my thinking since the 
> >> lastmail). 2. We can always expand the API in the future, and let the 
> >> useroverride the default built-in behaviour of the operators via some 
> >> setter onthe stream transformation (`SingleOutputStreamOperator`), or via 
> >> somecustom API DSL style in each of the operators separately. This is 
> >> notrequired. See above. Re forcing the same semantics for processing 
> >> timetimers as for event time ones - this is tempting, but indeed I see 
> >> apossibility that users need to adhere to some external constraints 
> >> whenusing processing time. +1. As above, we should consider the 2 
> >> casesfundamentally different in this area. Re: Yun - b) Another issue is 
> >> thatwhat if users use timers with different termination actions in the 
> >> sameoperator / UDF? For example, users use some kind of timeout (like 
> >> throwsexception if some thing not happen after some other thing), and also 
> >> somekind of window aggregation logic. In this case, without additional 
> >> tags,users might not be able to distinguish which timer should be canceled 
> >> andwhich time should be triggered ? as above. The EOF handler makes 
> >> thechoice. 4. How could these scenarios adjust their APIs ? From the 
> >> currentlisted scenarios, I'm more tend to that as @Dawid pointed out, 
> >> there mightbe only one expected behavior for each scenario, thus it does 
> >> not seems toneed to allow users to adjust the behavior. Thus @Divye may I 
> >> have a doubleconfirmation currently do we have explicit scenarios that is 
> >> expected tochange the different behaviors for the same scenario? Wait-for 
> >> behavior isprobably the only expected behavior and any alterations should 
> >> be from theEOF handler managing the registered timers. Besides @Divye from 
> >> the listedscenarios, I have another concern for global configuration is 
> >> that for onejob, different operators seems to still might have different 
> >> expectedbehaviors. For example, A job using both Window operator 
> >> andAsyncWaitOperator might have different requirements for timers 
> >> ontermination? Thank you for raising this case. This changed my 
> >> thinking.Based on your point, we should try and align on the “Wait-for” 
> >> with EOFhandler proposal. I’m withdrawing the “single-runtime-config” 
> >> proposal.Best, Divye
> >>
> >>
> 

Reply via email to