These are some very interesting thoughts! I have some more, based on these:
What happens if you have for example this Trigger: All(Watermark.pastEndOfWindow(), Count.atLeast(10)) When would this even fire, i.e. what are the steps that lead to this combined trigger firing with the Trigger system that we currently have in place? I have some thoughts but they are not compatible with the way we currently handle triggers. I have to think some more, but please shoot if you have any ideas. Cheers, Aljoscha On Fri, 22 Jul 2016 at 13:10 Kostas Kloudas <k.klou...@data-artisans.com> wrote: > Forgot to say that the signature for the onFire() that I think fits should > be: > > void onFire(Window window, TriggerContext ctx) throws Exception; > > > On Jul 22, 2016, at 12:47 PM, Kostas Kloudas < > k.klou...@data-artisans.com> wrote: > > > > Hi, > > > > I started working on the new triggers proposed here and so far I can see > > two shortcomings in the current state of the triggers that do not play > well > > with the new proposals, and more specifically the composite triggers All > > and Any. > > > > So here it goes: > > > > 1) In the document posted above, there are some new useful trigger > combinations (like Any and All) which allow > > for combining primitive triggers. This decouples the TriggerResult of > each individual trigger from the action that > > is actually going to be executed. For example, an All trigger may have > one proposing FIRE while the other > > CONTINUE and the final result will be CONTINUE. > > > > In this case, any action that should be taken by each individual > trigger upon firing, e.g. cleaning its state as in the > > case of CountTrigger, should be postponed until the parent trigger (All) > decides to fire. > > > > For this, there should be a onFire() method in each trigger that does > exactly that. This method should be called in the > > fireOrCleanup() of the windowOperator, when the firing is successful. > > > > 2) In the current implementation, when stateful triggers, like the > CountTrigger, are combined in a composite Trigger > > (with Any or All) their state is shared because the stateHandle is the > same for both. To solve this, the handle should > > become unique, BUT consistent for the same Trigger. The latter implies > that the handle for the same trigger after > > a node failure, should be the same as that of its predecessor (before > the failure). > > > > Let me know your thoughts on these. > > > > Kostas > > > > > >> On Jul 21, 2016, at 10:24 AM, Aljoscha Krettek <aljos...@apache.org> > wrote: > >> > >> I'm proposing to get this small change into 1.1: > >> https://issues.apache.org/jira/browse/FLINK-4239 This will make our > lives > >> easier with the future proposed changes. > >> > >> What do you think? > >> On Tue, 19 Jul 2016 at 11:41 Aljoscha Krettek <aljos...@apache.org> > wrote: > >> > >>> Hi, > >>> these new features should make it into the 1.2 release. We are already > >>> working on releasing 1.1 so it won't make it for that one. > unfortunately. > >>> > >>> Cheers, > >>> Aljoscha > >>> > >>> On Mon, 18 Jul 2016 at 23:19 Chen Qin <qinnc...@gmail.com> wrote: > >>> > >>>> BTW, do you have rough timeline in term of roll out it to production? > >>>> > >>>> Thanks, > >>>> Chen > >>>> > >>>> > >>>> On Mon, Jul 18, 2016 at 2:46 AM, Aljoscha Krettek < > aljos...@apache.org> > >>>> wrote: > >>>> > >>>>> Hi, > >>>>> Chen commented this on the doc (I'm mirroring here so everyone can > >>>> follow): > >>>>> "It would be cool to be able to access last snapshot of window states > >>>>> before it get purged. Pipeline author might consider put it to > external > >>>>> storage and deal with late arriving events by restore corresponding > >>>>> window." > >>>>> > >>>>> My answer: > >>>>> This is partially covered by the section called "What Happens at > >>>>> Window-Cleanup Time, Who Decides When to Purge". What I want to > >>>> introduce > >>>>> is that the window can have one final emission if there is new data > in > >>>> the > >>>>> buffers at cleanup time. > >>>>> > >>>>> The work on this will also depend on this proposal: > >>>>> > >>>>> > >>>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata > >>>>> With > >>>>> this, the WindowFunction can get meta data about the window firing > so it > >>>>> could be informed that this is the last firing before a cleanup and > that > >>>>> there already was an earlier, on-time firing. > >>>>> > >>>>> Does this cover your concerns, Chen? > >>>>> > >>>>> Cheers, > >>>>> Aljoscha > >>>>> > >>>>> On Sun, 10 Jul 2016 at 21:24 Chen Qin <qinnc...@gmail.com> wrote: > >>>>> > >>>>>> Sure. Currently, it looks like any element assigned to a too late > >>>> window > >>>>>> will be dropped silently😓 ? > >>>>>> > >>>>>> Having a late window stream imply somehow Flink needs to add one > more > >>>>> state > >>>>>> to window and split window state cleanup from window retirement. > >>>>>> I would suggest as simple as adding a function in trigger called > >>>>>> OnLateElement and always fire_purge it would enable developer aware > >>>> and > >>>>>> handle this case. > >>>>>> > >>>>>> Chen > >>>>>> > >>>>>> > >>>>>> > >>>>>> On Fri, Jul 8, 2016 at 1:00 AM, Aljoscha Krettek < > aljos...@apache.org > >>>>> > >>>>>> wrote: > >>>>>> > >>>>>>> @Chen I added a section at the end of the document regarding access > >>>> to > >>>>>> the > >>>>>>> elements that are dropped as late. Right now, the section just > >>>> mentions > >>>>>>> that we have to do this but there is no real proposal yet for how > >>>> to do > >>>>>> it. > >>>>>>> Only a rough sketch so that we don't forget about it. > >>>>>>> > >>>>>>> On Fri, 8 Jul 2016 at 07:47 Chen Qin <qinnc...@gmail.com> wrote: > >>>>>>> > >>>>>>>> +1 for allowedLateness scenario. > >>>>>>>> > >>>>>>>> The rationale behind is there are backfills or data issues hold > >>>>>> in-window > >>>>>>>> data till watermark pass end time. It cause sink write partial > >>>>> output. > >>>>>>>> > >>>>>>>> Allow high allowedLateness threshold makes life easier to merge > >>>> those > >>>>>>>> results and overwrite partial output with correct output at sink. > >>>> But > >>>>>>> yeah, > >>>>>>>> pipeline author is at risk of blow up statebackend with huge > >>>> states. > >>>>>>>> > >>>>>>>> Alternatively, In some case, if sink allows read-check-merge > >>>>> operation, > >>>>>>>> window can explicit call out events ingested after > >>>> allowedLateness. > >>>>> It > >>>>>>> asks > >>>>>>>> pipeline author mitigated these events in a way outside of flink > >>>>>>> ecosystem. > >>>>>>>> The catch is that since everywhere in a flink job can replay and > >>>>>>>> checkpoint, notification should somehow includes these info as > >>>> well. > >>>>>>>> > >>>>>>>> Thanks > >>>>>>>> Chen > >>>>>>>> > >>>>>>>> On Thu, Jul 7, 2016 at 12:14 AM, Kostas Kloudas < > >>>>>>>> k.klou...@data-artisans.com > >>>>>>>>> wrote: > >>>>>>>> > >>>>>>>>> Hi, > >>>>>>>>> > >>>>>>>>> In the effort to move the discussion to the mailing list, rather > >>>>> than > >>>>>>> the > >>>>>>>>> doc, > >>>>>>>>> there was a comment in the doc: > >>>>>>>>> > >>>>>>>>> “It seems this proposal marries the allowed lateness of events > >>>> and > >>>>>> the > >>>>>>>>> discarding of window state. In most use cases this should be > >>>>>>> sufficient, > >>>>>>>>> but there are instances where having independent control of > >>>> these > >>>>> may > >>>>>>> be > >>>>>>>>> useful. > >>>>>>>>> > >>>>>>>>> For instance, you may have a job that computes some aggregate, > >>>>> like a > >>>>>>>> sum. > >>>>>>>>> You may want to keep the window state around for a while, but > >>>> not > >>>>> too > >>>>>>>> long. > >>>>>>>>> Yet you may want to continue processing late events after you > >>>>>> discarded > >>>>>>>> the > >>>>>>>>> window state. It is possible that your stream sinks can make > >>>> use of > >>>>>>> this > >>>>>>>>> data. For instance, they may be writing to a data store that > >>>>> returns > >>>>>> an > >>>>>>>>> error if a row already exists, which allow the sink to read the > >>>>>>> existing > >>>>>>>>> row and update it with the new data." > >>>>>>>>> > >>>>>>>>> To which I would like to reply: > >>>>>>>>> > >>>>>>>>> If I understand your use-case correctly, I believe that the > >>>>> proposed > >>>>>>>>> binding of the allowed lateness to the state purging does not > >>>>> impose > >>>>>>> any > >>>>>>>>> problem. The lateness specifies the upper time bound, after > >>>> which > >>>>> the > >>>>>>>> state > >>>>>>>>> will be discarded. Between the start of a window and its (end + > >>>>>>>>> allowedLateness) you can write custom triggers that fire, purge > >>>> the > >>>>>>>> state, > >>>>>>>>> or do nothing. Given this, I suppose that, at the most extreme > >>>>> case, > >>>>>>> you > >>>>>>>>> can specify an allowed lateness of Long.MaxValue and do the > >>>> purging > >>>>>> of > >>>>>>>> the > >>>>>>>>> state "manually". By doing this, you remove the safeguard of > >>>>> letting > >>>>>>> the > >>>>>>>>> system purge the state at some point in time, and you can do > >>>> your > >>>>> own > >>>>>>>>> custom state management that fits your needs. > >>>>>>>>> > >>>>>>>>> Thanks, > >>>>>>>>> Kostas > >>>>>>>>> > >>>>>>>>>> On Jul 6, 2016, at 5:43 PM, Aljoscha Krettek < > >>>>> aljos...@apache.org> > >>>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>> @Vishnu Funny you should ask that because I have a design doc > >>>>> lying > >>>>>>>>> around. > >>>>>>>>>> I'll open a new mail thread to not hijack this one. > >>>>>>>>>> > >>>>>>>>>> On Wed, 6 Jul 2016 at 17:17 Vishnu Viswanath < > >>>>>>>>> vishnu.viswanat...@gmail.com> > >>>>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> Hi, > >>>>>>>>>>> > >>>>>>>>>>> I was going through the suggested improvements in window, > >>>> and I > >>>>>> have > >>>>>>>>>>> few questions/suggestion on improvement regarding the > >>>> Evictor. > >>>>>>>>>>> > >>>>>>>>>>> 1) I am having a use case where I have to create a custom > >>>>> Evictor > >>>>>>> that > >>>>>>>>> will > >>>>>>>>>>> evict elements from the window based on the value (e.g., if I > >>>>> have > >>>>>>>>> elements > >>>>>>>>>>> are of case class Item(id: Int, type:String) then evict > >>>> elements > >>>>>>> that > >>>>>>>>> has > >>>>>>>>>>> type="a"). I believe this is not currently possible. > >>>>>>>>>>> 2) this is somewhat related to 1) where there should be an > >>>>> option > >>>>>> to > >>>>>>>>> evict > >>>>>>>>>>> elements from anywhere in the window. not only from the > >>>>> beginning > >>>>>> of > >>>>>>>> the > >>>>>>>>>>> window. (e.g., apply the delta function to all elements and > >>>>> remove > >>>>>>> all > >>>>>>>>>>> those don't pass. I checked the code and evict method just > >>>>> returns > >>>>>>> the > >>>>>>>>>>> number of elements to be removed and processTriggerResult > >>>> just > >>>>>> skips > >>>>>>>>> those > >>>>>>>>>>> many elements from the beginning. > >>>>>>>>>>> 3) Add an option to enables the user to decide if the > >>>> eviction > >>>>>>> should > >>>>>>>>>>> happen before the apply function or after the apply function. > >>>>>>>> Currently > >>>>>>>>> it > >>>>>>>>>>> is before the apply function, but I have a use case where I > >>>> need > >>>>>> to > >>>>>>>>> first > >>>>>>>>>>> apply the function and evict afterward. > >>>>>>>>>>> > >>>>>>>>>>> I am doing these for a POC so I think I can modify the flink > >>>>> code > >>>>>>> base > >>>>>>>>> to > >>>>>>>>>>> make these changes and build, but I would appreciate any > >>>>>> suggestion > >>>>>>> on > >>>>>>>>>>> whether these are viable changes or will there any > >>>> performance > >>>>>> issue > >>>>>>>> if > >>>>>>>>>>> these are done. Also any pointer on where to start(e.g, do I > >>>>>> create > >>>>>>> a > >>>>>>>>> new > >>>>>>>>>>> class similar to EvictingWindowOperator that extends > >>>>>>> WindowOperator?) > >>>>>>>>>>> > >>>>>>>>>>> Thanks and Regards, > >>>>>>>>>>> Vishnu Viswanath, > >>>>>>>>>>> > >>>>>>>>>>> On Wed, Jul 6, 2016 at 9:39 AM, Aljoscha Krettek < > >>>>>>> aljos...@apache.org > >>>>>>>>> > >>>>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> I did: > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > https://mail-archives.apache.org/mod_mbox/flink-dev/201606.mbox/%3ccanmxww0abttjjg9ewdxrugxkjm7jscbenmvrzohpt2qo3pq...@mail.gmail.com%3e > >>>>>>>>>>>> ;-) > >>>>>>>>>>>> > >>>>>>>>>>>> On Wed, 6 Jul 2016 at 15:31 Ufuk Celebi <u...@apache.org> > >>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>>> On Wed, Jul 6, 2016 at 3:19 PM, Aljoscha Krettek < > >>>>>>>> aljos...@apache.org > >>>>>>>>>> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>> In the future, it might be good to to discussions > >>>> directly on > >>>>>> the > >>>>>>>> ML > >>>>>>>>>>>> and > >>>>>>>>>>>>>> then change the document accordingly. This way everyone > >>>> can > >>>>>>> follow > >>>>>>>>>>> the > >>>>>>>>>>>>>> discussion on the ML. I also feel that Google Doc comments > >>>>>> often > >>>>>>>>>>> don't > >>>>>>>>>>>>> give > >>>>>>>>>>>>>> enough space for expressing more complex opinions. > >>>>>>>>>>>>> > >>>>>>>>>>>>> I agree! Would you mind raising this point as a separate > >>>>>>> discussion > >>>>>>>> on > >>>>>>>>>>>> dev@ > >>>>>>>>>>>>> ? > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > > > >