Hi Radu, Yes we can remove elements randomly using iterator.remove()
Regards, Vishnu On Fri, Oct 7, 2016 at 2:57 AM, Radu Tudoran <radu.tudo...@huawei.com> wrote: > Hi, > > I must apologies that I missed some of the email exchanges on this thread > and thus my remark/question might have been already settled. > > Does this interface you propose enable to remove also elements out of > order e.g., assuming I have elements 1,2,3,4,5 in the window buffer to be > able to evict 2 and 4? > We discussed about this some email exchanges ago but as I said I am not > sure if this functionality is captured in this interface. Basically, will > the typical remove() method from Iterators be available? > > Best regards, > > > -----Original Message----- > From: Vishnu Viswanath [mailto:vishnu.viswanat...@gmail.com] > Sent: Friday, October 07, 2016 8:29 AM > To: Dev > Subject: Re: [DISCUSS][FLIP-4] Enhance Window Evictor in Flink > > Hi Aljoscha, > > To pass the time information to Evictor at the same to not expose the > StreamRecord, I suppose we can change the signature of evictBefore and > evictAfter to take Iterable<Tuple2<Long, T>> instead > Iterable<StreamRecord<T>> > > void evictBefore(Iterable<Tuple2<Long, T>> elements, int size, W window, > EvictorContext evictorContext); > > The fire() method of EvictingWindowOperator can transform the > Iterable<StreamRecord<IN>> to FluentIterable<Tuple2<Long, IN>> and pass it > on to the evictor(where f0 will be the timestamp and f1 will the value). > That way the TimeEvictor will work for EventTime or IngestionTime as long > as timestamp is set in the StreamRecord. In case timestamp is not set, > TimeEvictor can capture this by checking the Tuple2.f0 (which will be > Long.MIN_VALUE) and ignore the eviction. > > If you think this is fine, I will make the changes and also edit the FLIP. > > Regards, > Vishnu > > > On Wed, Oct 5, 2016 at 9:49 PM, Vishnu Viswanath < > vishnu.viswanat...@gmail.com> wrote: > > > Thank you Aljoscha, > > > > Yes, I agree we don't need ProcessingTimeEvcitor. > > I will change the current TimeEvictors to use EventTimeEvictor as > > suggested. > > > > Also, figure out a way to pass timestamp to Evictor interface so that we > > can avoid exposing StreamRecrods. > > > > Regards, > > Vishnu > > > > > > > > On Tue, Sep 20, 2016 at 4:33 AM, Aljoscha Krettek <aljos...@apache.org> > > wrote: > > > >> Hi, > >> now you again see what I mentioned a while back: eviction based on > >> processing time is not really well defined. I think we can completely > get > >> rid of "processing time eviction" because it can be replaced by > something > >> like this: > >> > >> DataStream input = ... > >> DataStream withTimestamps = input.assignTimestampsAndWatermarks(new > >> IngestionTimeExtractor()) // this will assign the current processing > time > >> as timestamp > >> withTimestamps > >> .keyBy(...) > >> .window(...) > >> .evictor(new EventTimeEvictor()) > >> .apply(...) > >> > >> With this, we would just have to find a good way of passing the > timestamps > >> in the Evictor interface and a good way of implementing the > >> EvictingWindowOperator. > >> > >> Cheers, > >> Aljoscha > >> > >> > >> On Sun, 18 Sep 2016 at 18:14 Vishnu Viswanath < > >> vishnu.viswanat...@gmail.com> > >> wrote: > >> > >> > Hi Aljoscha, > >> > > >> > A) > >> > I tried the approach where we set the ProcessingTime explicitly by > >> > converting DataStream<T> input to DataStream<Tuple2<Long, T>> using > map > >> > function and below are my observations: > >> > 1. All the current code which uses TimeEvictor (which will be by > default > >> > changed to ProcessingTimeEvictor) will be forced to implement a > mapping > >> > Function to agree with the new method signature. > >> > 2. Even after doing the above mapping function, the timestamp field of > >> the > >> > StreamRecord will not be changed. Which might be confusing since now > we > >> > have two timestamps for the record, one set by the mapping function, > >> other > >> > in the StreamRecord. > >> > 3. Having a Stream of Tuple2<Long, T> makes it confusing to do the > keyBy > >> > and also the now the WindowFunction has to process Tuple2<Long,T> > >> instead > >> > of T. > >> > 4. Users might get confused on how to set the ProcessingTime since > >> > ProcessingTime is the time at which the records are processed and > users > >> > might expect that to be a responsibility of Flink > >> > > >> > Ideally, ProcessingTime should be the time at which a StreamRecord is > >> > processed. And if a record is Processed multiple times, e.g., in the > >> case > >> > when an element was not evicted from the window, hence processed again > >> > during the next trigger the ProcessingTime should be the time at which > >> the > >> > record was seen/processed the first time. "If my understanding of > >> > ProcessingTime is correct", I am thinking I can iterate through the > >> records > >> > and set the current timestamp as the ProcessingTime if absent. (before > >> > doing the eviction) > >> > > >> > Something like: > >> > for(StreamRecord<Object> element: elements) { > >> > if (!element.hasTimestamp()) { > >> > element.setTimestamp(System.currentTimeMillis()); > >> > } > >> > } > >> > > >> > B) Regarding not exposing StreamRecord<IN> in the Evictor. If Evictor > is > >> > given Iterable<IN> then we cannot retrieve time information of the > >> records > >> > in the EventTimeEvictor do the eviction (but I do see that > StreamRecord > >> is > >> > marked with @Internal) > >> > > >> > C) Regarding modifying WindowOperator class to take type parameter <S > >> > extends AppendingState<IN, ACC>> so that we can remove the duplicate > >> code > >> > from EvictingWindowOperator, I would prefer to separate it from this > >> FLIP > >> > and create a JIRA for it, what do you say? > >> > > >> > Please let me know your thoughts. > >> > > >> > Regards, > >> > Vishnu > >> > > >> > On Sun, Jul 31, 2016 at 12:07 PM, Aljoscha Krettek < > aljos...@apache.org > >> > > >> > wrote: > >> > > >> > > Hi, > >> > > regarding a), b) and c): The WindowOperator can be extended to have > >> this > >> > > signature: > >> > > public class WindowOperator<K, IN, ACC, OUT, W extends Window, S > >> extends > >> > > AppendingState<IN, ACC>> > >> > > > >> > > that way the shape of state is generic and EvictingWindowOperator > can > >> use > >> > > ListState<IN> there. > >> > > > >> > > regarding 2.: Yes, we can either take the current processing > >> time/event > >> > > time or the max timestamp of elements in the window as the benchmark > >> > > against which we compare. > >> > > > >> > > About ProcessingTimeEvictor: the proposal was to make the timestamp > >> > > explicit in the type of elements. Otherwise, how would you access > the > >> > > processing time of each element? (As I said, the timestamp field in > >> > > StreamRecord does not usually contain a processing-time timestamp > and > >> I > >> > > would like to remove the StreamRecord from the type of the Iterable > >> that > >> > is > >> > > passed to the evictor to avoid code duplication in > >> > EvictingWindowOperator) > >> > > I'm open for suggestions there since I didn't come up with a better > >> > > solution yet. :-) > >> > > > >> > > Cheers, > >> > > Aljoscha > >> > > > >> > > > >> > > > >> > > On Sat, 30 Jul 2016 at 05:56 Vishnu Viswanath < > >> > > vishnu.viswanat...@gmail.com> > >> > > wrote: > >> > > > >> > > > Hi Aljoscha, > >> > > > > >> > > > 1. Regarding the Evictor interface taking Iterable<IN> instead of > >> > > > StreamRecord - > >> > > > > >> > > > a) I am not quite sure I understood what you meant by *"It could > >> be a > >> > > very > >> > > > thin subclass of WindowOperator"* - Currently, most of the code > >> > > duplication > >> > > > in EvictingWindowOperator is due to the windowStateDescriptor > >> > (ListState > >> > > > instead of AppendingState compared to WindowOperator). Is this > >> > correct?. > >> > > > > >> > > > b) Do you hope to keep using AppendingState instead of ListState > to > >> > > avoid > >> > > > the duplicate code (e.g., processWatermark(), trigger() etc). If > we > >> use > >> > > > AppendingState, the get() method returns an state of the OUT type > >> ACC, > >> > > > which cannot be passed to Evictor. So I am assuming we will have > to > >> > keep > >> > > > using ListState here. > >> > > > > >> > > > c) My not so good idea was to use the FluentIterable to convert > the > >> > > > Iterable<StreamRecord<IN>> to Iterable<IN> and pass it on to > Evictor > >> > and > >> > > > Window function. Evictor can remove the elements from the > Iterable. > >> > (Even > >> > > > Window function can remove elements). Then clear the state and add > >> > > > elements(after removal) back to the state. But in that case, I > need > >> to > >> > > > reconstruct StreamRecord<IN> from IN. Doing so, we will lose the > >> > > timestamp > >> > > > information that might have been previously set on the original > >> > > > StreamRecord<IN> - is there any other way to recreate > StreamRecord? > >> > > > > >> > > > > >> > > > > >> > > > 2. Regarding ProcessingTimeEvictor - > >> > > > > >> > > > A TimeEvictor has to evict elements from the window which are > older > >> > than > >> > > a > >> > > > given Period from the element with maximum timestamp in the > window. > >> > When > >> > > > considering ProcessingTimestamp(even if it was explicitly set), > >> > shouldn't > >> > > > the timestamp associated with records be strictly increasing. > i.e., > >> > newer > >> > > > elements should have higher timestamp than earlier elements. So to > >> get > >> > > the > >> > > > max timestamp we could just get the last element. When using > >> > > > EventTimeEvictor, the elements might have arrived out of order > >> hence we > >> > > > can't just take the timestamp of the last element as maximum > >> timestamp, > >> > > but > >> > > > check each and every element in the window. > >> > > > > >> > > > We should have two versions of TimeEvictors - EventTime and > >> > > ProcessingTime, > >> > > > but does ProcessingTimeEvictor need to take a Tupel2<Long,T> since > >> > > anyways > >> > > > we are going to get the max timestamp by looking at the last > >> element in > >> > > the > >> > > > window?. > >> > > > > >> > > > Thanks, > >> > > > Vishnu > >> > > > > >> > > > On Fri, Jul 29, 2016 at 6:22 AM, Aljoscha Krettek < > >> aljos...@apache.org > >> > > > >> > > > wrote: > >> > > > > >> > > > > About processing time and timestamps: > >> > > > > > >> > > > > The timestamp is either set in the source of in an > >> > > > > in-between TimestampAssigner that can be used with > >> > > > > DataStream.assignTimestampsAndWatermarks(). However, the > >> timestamp in > >> > > the > >> > > > > element is normally not a "processing-time timestamp". I think > it > >> > might > >> > > > > make sense to split the functionality for the evictors into two > >> > parts: > >> > > > one > >> > > > > that implicitly sets a timestamp and one that uses these > >> timestamps. > >> > It > >> > > > > could look like this: > >> > > > > > >> > > > > DataStream<T> input = ... > >> > > > > // this makes the current processing time explicit in the > tuples: > >> > > > > DataStream<Tuple2<Long, T>> withTimestamps = input.map(new > >> > > > > ReifyProcessingTIme<T>()); > >> > > > > withTimestamps > >> > > > > .keyBy(...) > >> > > > > .window(..) > >> > > > > .evictor(new ProcessingTimeEvictor<T>()) > >> > > > > .apply(...) > >> > > > > > >> > > > > where ProcessingTimeEvictor looks like this: > >> > > > > > >> > > > > class ProcessingTimeEvictor<T> extends Evictor<Tuple2<Long, T>> > { > >> > > > > void evictBefore(Iterable<Tuple2<Long, T>>, ...); > >> > > > > void evictAfter ... > >> > > > > } > >> > > > > > >> > > > > This would make everything that is happening explicit in the > type > >> > > > > signatures and explicit for the user. > >> > > > > > >> > > > > Cheers, > >> > > > > Aljoscha > >> > > > > > >> > > > > On Thu, 28 Jul 2016 at 18:32 Aljoscha Krettek < > >> aljos...@apache.org> > >> > > > wrote: > >> > > > > > >> > > > > > Hi, > >> > > > > > in fact, changing it to Iterable<IN> would simplify things > >> because > >> > > then > >> > > > > we > >> > > > > > would not have to duplicate code for the > EvictingWindowOperator > >> any > >> > > > more. > >> > > > > > It could be a very thin subclass of WindowOperator. > >> > > > > > > >> > > > > > Cheers, > >> > > > > > Aljoscha > >> > > > > > > >> > > > > > On Wed, 27 Jul 2016 at 03:56 Vishnu Viswanath < > >> > > > > > vishnu.viswanat...@gmail.com> wrote: > >> > > > > > > >> > > > > >> Hi Aljoscha, > >> > > > > >> > >> > > > > >> Regarding your concern - to not expose the StreamRecord in > the > >> > > > Evictor, > >> > > > > >> were you able to find any alternative? > >> > > > > >> > >> > > > > >> I tried to make the methods take Iterable<IN> input similar > to > >> the > >> > > > > >> WindowFunction, but that didn't work since we have to clear > the > >> > > state > >> > > > > and > >> > > > > >> add the elements back to the state (to fix the bug mentioned > in > >> > the > >> > > > > >> previous mail) > >> > > > > >> > >> > > > > >> If you think the interface that accepts > >> Iterable<StreamRecord<T>> > >> > > > > >> elements is > >> > > > > >> good enough, I have the changes ready. > >> > > > > >> > >> > > > > >> Thanks, > >> > > > > >> Vishnu > >> > > > > >> > >> > > > > >> On Mon, Jul 25, 2016 at 7:48 AM, Aljoscha Krettek < > >> > > > aljos...@apache.org> > >> > > > > >> wrote: > >> > > > > >> > >> > > > > >> > Hi, > >> > > > > >> > the elements are currently not being removed from the > >> buffers. > >> > > > That's > >> > > > > a > >> > > > > >> bug > >> > > > > >> > that we could fix while adding the new Evictor interface. > >> > > > > >> > > >> > > > > >> > Cheers, > >> > > > > >> > Aljoscha > >> > > > > >> > > >> > > > > >> > On Mon, 25 Jul 2016 at 13:00 Radu Tudoran < > >> > > radu.tudo...@huawei.com> > >> > > > > >> wrote: > >> > > > > >> > > >> > > > > >> > > Hi Aljoscha, > >> > > > > >> > > > >> > > > > >> > > Can you point us to the way it is handled now. Is there > >> > anything > >> > > > > else > >> > > > > >> for > >> > > > > >> > > the removing of elements other than the skip in > >> > > > > >> EvictingWindowOperator. > >> > > > > >> > Is > >> > > > > >> > > there something as it was before version 1.x where you > had > >> an > >> > > > > explicit > >> > > > > >> > > remove from window buffers? > >> > > > > >> > > > >> > > > > >> > > Dr. Radu Tudoran > >> > > > > >> > > Research Engineer - Big Data Expert > >> > > > > >> > > IT R&D Division > >> > > > > >> > > > >> > > > > >> > > > >> > > > > >> > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > >> > > > > >> > > European Research Center > >> > > > > >> > > Riesstrasse 25, 80992 München > >> > > > > >> > > > >> > > > > >> > > E-mail: radu.tudo...@huawei.com > >> > > > > >> > > Mobile: +49 15209084330 > >> > > > > >> > > Telephone: +49 891588344173 > >> > > > > >> > > > >> > > > > >> > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > >> > > > > >> > > Hansaallee 205, 40549 Düsseldorf, Germany, > www.huawei.com > >> > > > > >> > > Registered Office: Düsseldorf, Register Court Düsseldorf, > >> HRB > >> > > > 56063, > >> > > > > >> > > Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN > >> > > > > >> > > Sitz der Gesellschaft: Düsseldorf, Amtsgericht > Düsseldorf, > >> HRB > >> > > > > 56063, > >> > > > > >> > > Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN > >> > > > > >> > > This e-mail and its attachments contain confidential > >> > information > >> > > > > from > >> > > > > >> > > HUAWEI, which is intended only for the person or entity > >> whose > >> > > > > address > >> > > > > >> is > >> > > > > >> > > listed above. Any use of the information contained herein > >> in > >> > any > >> > > > way > >> > > > > >> > > (including, but not limited to, total or partial > >> disclosure, > >> > > > > >> > reproduction, > >> > > > > >> > > or dissemination) by persons other than the intended > >> > > recipient(s) > >> > > > is > >> > > > > >> > > prohibited. If you receive this e-mail in error, please > >> notify > >> > > the > >> > > > > >> sender > >> > > > > >> > > by phone or email immediately and delete it! > >> > > > > >> > > > >> > > > > >> > > > >> > > > > >> > > -----Original Message----- > >> > > > > >> > > From: Aljoscha Krettek [mailto:aljos...@apache.org] > >> > > > > >> > > Sent: Monday, July 25, 2016 11:45 AM > >> > > > > >> > > To: dev@flink.apache.org > >> > > > > >> > > Subject: Re: [DISCUSS][FLIP-4] Enhance Window Evictor in > >> Flink > >> > > > > >> > > > >> > > > > >> > > Hi, > >> > > > > >> > > I think there is not yet a clear specification for how > the > >> > > actual > >> > > > > >> removal > >> > > > > >> > > of elements from the buffer will work. I think naively > one > >> can > >> > > do: > >> > > > > >> > > > >> > > > > >> > > Iterable<E> currentElements = state.get() > >> > > > > >> > > evictor.evict(currentElements); // this will remove some > >> stuff > >> > > > from > >> > > > > >> > there, > >> > > > > >> > > or mark for removal > >> > > > > >> > > > >> > > > > >> > > state.clear() > >> > > > > >> > > // the Iterable does not loop over the removed/marked > >> elements > >> > > > > >> > > for (E element : currentElements) { > >> > > > > >> > > state.add(element) > >> > > > > >> > > } > >> > > > > >> > > > >> > > > > >> > > This is very costly but the only way I see of doing this > >> right > >> > > now > >> > > > > >> with > >> > > > > >> > > every state backend. > >> > > > > >> > > > >> > > > > >> > > Cheers, > >> > > > > >> > > Aljoscha > >> > > > > >> > > > >> > > > > >> > > On Mon, 25 Jul 2016 at 09:46 Radu Tudoran < > >> > > > radu.tudo...@huawei.com> > >> > > > > >> > wrote: > >> > > > > >> > > > >> > > > > >> > > > Hi, > >> > > > > >> > > > > >> > > > > >> > > > Thanks for the clarification. Can someone point to > where > >> the > >> > > > > events > >> > > > > >> are > >> > > > > >> > > > removed from buffers - I am trying to understand the > new > >> > logic > >> > > > of > >> > > > > >> > > handling > >> > > > > >> > > > the eviction in this new API. Thanks > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > -----Original Message----- > >> > > > > >> > > > From: Vishnu Viswanath [mailto:vishnu.viswanath25@gma > >> il.com > >> > ] > >> > > > > >> > > > Sent: Saturday, July 23, 2016 3:04 AM > >> > > > > >> > > > To: Dev > >> > > > > >> > > > Subject: Re: [DISCUSS][FLIP-4] Enhance Window Evictor > in > >> > Flink > >> > > > > >> > > > > >> > > > > >> > > > Hi Radu, > >> > > > > >> > > > > >> > > > > >> > > > - Yes we can remove elements from the iterator. > >> > > > > >> > > > - Right now the EvictingWindowOperator just skips the > >> > elements > >> > > > > from > >> > > > > >> the > >> > > > > >> > > > Iterable before passing to the window function(Yes this > >> has > >> > to > >> > > > be > >> > > > > >> > changed > >> > > > > >> > > > in the new API) > >> > > > > >> > > > - Regarding how the last question on how elements are > >> being > >> > > > > removed > >> > > > > >> > from > >> > > > > >> > > > the window buffer. I am not sure how it is working > right > >> > now, > >> > > > but > >> > > > > >> when > >> > > > > >> > > > trying out the new API that I am working on, I did > find a > >> > bug > >> > > > > where > >> > > > > >> the > >> > > > > >> > > > evicted elements are not actually removed from the > >> State. I > >> > > have > >> > > > > >> added > >> > > > > >> > a > >> > > > > >> > > > fix for that. (You can see a mail regarding that in > this > >> > mail > >> > > > > >> chain) > >> > > > > >> > > > > >> > > > > >> > > > Thanks, > >> > > > > >> > > > Vishnu > >> > > > > >> > > > > >> > > > > >> > > > On Fri, Jul 22, 2016 at 1:03 PM, Radu Tudoran < > >> > > > > >> radu.tudo...@huawei.com > >> > > > > >> > > > >> > > > > >> > > > wrote: > >> > > > > >> > > > > >> > > > > >> > > > > Hi, > >> > > > > >> > > > > > >> > > > > >> > > > > Overall I believe that the interfaces and the > proposal > >> is > >> > > > good. > >> > > > > I > >> > > > > >> > have > >> > > > > >> > > > the > >> > > > > >> > > > > following question though: can you delete via the > >> iterator > >> > > > > >> > > > > (Iterable<StreamRecord<T>> elements) the elements? > >> > > > > >> > > > > > >> > > > > >> > > > > I tried to look over the code where the eviction > >> happens > >> > (I > >> > > > did > >> > > > > >> not > >> > > > > >> > do > >> > > > > >> > > > > these since version 0.10...looks very different now > :) > >> > > )...the > >> > > > > >> only > >> > > > > >> > > > > reference I found was the EvictingWindowOperator > which > >> at > >> > > the > >> > > > > >> > > > > fireOrContinue has a "skip" based on the number of > >> > elements > >> > > > > >> returned > >> > > > > >> > > from > >> > > > > >> > > > > the evictor...and these are not put in the collection > >> to > >> > be > >> > > > > given > >> > > > > >> to > >> > > > > >> > > the > >> > > > > >> > > > > user function to be applied. I think these will also > >> need > >> > to > >> > > > be > >> > > > > >> > changed > >> > > > > >> > > > to > >> > > > > >> > > > > adjust to the "any operator from anywhere in the > window > >> > > > buffer". > >> > > > > >> > > > > Also - as we are on this topic - can someone explain > >> how > >> > > these > >> > > > > >> > elements > >> > > > > >> > > > > that are not consider anymore for the user function > are > >> > > > actually > >> > > > > >> > > deleted > >> > > > > >> > > > > from the window buffer?..i did not manage to find > >> this.. > >> > > some > >> > > > > >> > reference > >> > > > > >> > > > to > >> > > > > >> > > > > classes/code where this happens would be useful > >> > > > > >> > > > > > >> > > > > >> > > > > > >> > > > > >> > > > > Dr. Radu Tudoran > >> > > > > >> > > > > Research Engineer - Big Data Expert > >> > > > > >> > > > > IT R&D Division > >> > > > > >> > > > > > >> > > > > >> > > > > > >> > > > > >> > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > >> > > > > >> > > > > European Research Center > >> > > > > >> > > > > Riesstrasse 25, 80992 München > >> > > > > >> > > > > > >> > > > > >> > > > > E-mail: radu.tudo...@huawei.com > >> > > > > >> > > > > Mobile: +49 15209084330 > >> > > > > >> > > > > Telephone: +49 891588344173 > >> > > > > >> > > > > > >> > > > > >> > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > >> > > > > >> > > > > Hansaallee 205, 40549 Düsseldorf, Germany, > >> www.huawei.com > >> > > > > >> > > > > Registered Office: Düsseldorf, Register Court > >> Düsseldorf, > >> > > HRB > >> > > > > >> 56063, > >> > > > > >> > > > > Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN > >> > > > > >> > > > > Sitz der Gesellschaft: Düsseldorf, Amtsgericht > >> Düsseldorf, > >> > > HRB > >> > > > > >> 56063, > >> > > > > >> > > > > Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN > >> > > > > >> > > > > This e-mail and its attachments contain confidential > >> > > > information > >> > > > > >> from > >> > > > > >> > > > > HUAWEI, which is intended only for the person or > entity > >> > > whose > >> > > > > >> address > >> > > > > >> > > is > >> > > > > >> > > > > listed above. Any use of the information contained > >> herein > >> > in > >> > > > any > >> > > > > >> way > >> > > > > >> > > > > (including, but not limited to, total or partial > >> > disclosure, > >> > > > > >> > > > reproduction, > >> > > > > >> > > > > or dissemination) by persons other than the intended > >> > > > > recipient(s) > >> > > > > >> is > >> > > > > >> > > > > prohibited. If you receive this e-mail in error, > please > >> > > notify > >> > > > > the > >> > > > > >> > > sender > >> > > > > >> > > > > by phone or email immediately and delete it! > >> > > > > >> > > > > > >> > > > > >> > > > > > >> > > > > >> > > > > -----Original Message----- > >> > > > > >> > > > > From: Vishnu Viswanath [mailto: > >> > vishnu.viswanat...@gmail.com > >> > > ] > >> > > > > >> > > > > Sent: Friday, July 22, 2016 12:43 PM > >> > > > > >> > > > > To: Dev > >> > > > > >> > > > > Subject: Re: [DISCUSS][FLIP-4] Enhance Window Evictor > >> in > >> > > Flink > >> > > > > >> > > > > > >> > > > > >> > > > > Hi, > >> > > > > >> > > > > > >> > > > > >> > > > > I have created a FLIP page for this enhancement > >> > > > > >> > > > > > >> > > > > >> > > > > > >> > > > > >> > > > > >> > > > > >> > > > >> > > > > >> > > >> > > > > >> > >> > > > > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP- > >> > > 4+%3A+Enhance+Window+Evictor > >> > > > > >> > > > > > >> > > > > >> > > > > Thanks, > >> > > > > >> > > > > Vishnu > >> > > > > >> > > > > > >> > > > > >> > > > > On Thu, Jul 21, 2016 at 6:53 AM, Vishnu Viswanath < > >> > > > > >> > > > > vishnu.viswanat...@gmail.com> wrote: > >> > > > > >> > > > > > >> > > > > >> > > > > > Thanks Aljoscha. > >> > > > > >> > > > > > > >> > > > > >> > > > > > On Thu, Jul 21, 2016 at 4:46 AM, Aljoscha Krettek < > >> > > > > >> > > aljos...@apache.org > >> > > > > >> > > > > > >> > > > > >> > > > > > wrote: > >> > > > > >> > > > > > > >> > > > > >> > > > > >> Hi, > >> > > > > >> > > > > >> this, in fact, seems to be a bug. There should be > >> > > something > >> > > > > >> like > >> > > > > >> > > > > >> windowState.clear(); > >> > > > > >> > > > > >> for (IN element: projectedContents) { > >> > > > > >> > > > > >> windowState.add(element); > >> > > > > >> > > > > >> } > >> > > > > >> > > > > >> > >> > > > > >> > > > > >> after passing the elements to the window function. > >> > > > > >> > > > > >> > >> > > > > >> > > > > >> This is very inefficient but the only way I see of > >> > doing > >> > > it > >> > > > > >> right > >> > > > > >> > > now. > >> > > > > >> > > > > >> > >> > > > > >> > > > > >> Cheers, > >> > > > > >> > > > > >> Aljoscha > >> > > > > >> > > > > >> > >> > > > > >> > > > > >> > >> > > > > >> > > > > >> On Thu, 21 Jul 2016 at 01:32 Vishnu Viswanath < > >> > > > > >> > > > > >> vishnu.viswanat...@gmail.com> > >> > > > > >> > > > > >> wrote: > >> > > > > >> > > > > >> > >> > > > > >> > > > > >> > Hi, > >> > > > > >> > > > > >> > > >> > > > > >> > > > > >> > When we use RocksDB as state backend, how does > the > >> > > > backend > >> > > > > >> state > >> > > > > >> > > get > >> > > > > >> > > > > >> > updated after some elements are evicted from the > >> > > window? > >> > > > > >> > > > > >> > I don't see any update call being made to remove > >> the > >> > > > > element > >> > > > > >> > from > >> > > > > >> > > > the > >> > > > > >> > > > > >> state > >> > > > > >> > > > > >> > stored in RocksDB. > >> > > > > >> > > > > >> > > >> > > > > >> > > > > >> > It looks like the RocksDBListState is only > having > >> > get() > >> > > > and > >> > > > > >> > add() > >> > > > > >> > > > > >> methods > >> > > > > >> > > > > >> > since it is an AppendingState, but that causes > the > >> > > > evicted > >> > > > > >> > > elements > >> > > > > >> > > > to > >> > > > > >> > > > > >> come > >> > > > > >> > > > > >> > back when the trigger is fired next time. (It > >> works > >> > > fine > >> > > > > >> when I > >> > > > > >> > > use > >> > > > > >> > > > > >> > MemoryStateBackend) > >> > > > > >> > > > > >> > > >> > > > > >> > > > > >> > Is this expected behavior or am I missing > >> something. > >> > > > > >> > > > > >> > > >> > > > > >> > > > > >> > Thanks, > >> > > > > >> > > > > >> > Vishnu > >> > > > > >> > > > > >> > > >> > > > > >> > > > > >> > On Mon, Jul 18, 2016 at 7:15 AM, Vishnu > Viswanath > >> < > >> > > > > >> > > > > >> > vishnu.viswanat...@gmail.com> wrote: > >> > > > > >> > > > > >> > > >> > > > > >> > > > > >> > > Hi Aljoscha, > >> > > > > >> > > > > >> > > > >> > > > > >> > > > > >> > > Thanks! Yes, I have the create page option now > >> in > >> > > wiki. > >> > > > > >> > > > > >> > > > >> > > > > >> > > > > >> > > Regards, > >> > > > > >> > > > > >> > > Vishnu Viswanath, > >> > > > > >> > > > > >> > > > >> > > > > >> > > > > >> > > On Mon, Jul 18, 2016 at 6:34 AM, Aljoscha > >> Krettek < > >> > > > > >> > > > > >> aljos...@apache.org> > >> > > > > >> > > > > >> > > wrote: > >> > > > > >> > > > > >> > > > >> > > > > >> > > > > >> > >> @Radu, addition of more window types and > >> sorting > >> > > > should > >> > > > > be > >> > > > > >> > part > >> > > > > >> > > > of > >> > > > > >> > > > > >> > another > >> > > > > >> > > > > >> > >> design proposal. This is interesting stuff > but > >> I > >> > > think > >> > > > > we > >> > > > > >> > > should > >> > > > > >> > > > > keep > >> > > > > >> > > > > >> > >> issues separated because things can get > >> > complicated > >> > > > very > >> > > > > >> > > quickly. > >> > > > > >> > > > > >> > >> > >> > > > > >> > > > > >> > >> On Mon, 18 Jul 2016 at 12:32 Aljoscha > Krettek < > >> > > > > >> > > > aljos...@apache.org > >> > > > > >> > > > > > > >> > > > > >> > > > > >> > >> wrote: > >> > > > > >> > > > > >> > >> > >> > > > > >> > > > > >> > >> > Hi, > >> > > > > >> > > > > >> > >> > about TimeEvictor, yes, I think there > should > >> be > >> > > > > specific > >> > > > > >> > > > evictors > >> > > > > >> > > > > >> for > >> > > > > >> > > > > >> > >> > processing time and event time. Also, the > >> > current > >> > > > time > >> > > > > >> > should > >> > > > > >> > > > be > >> > > > > >> > > > > >> > >> > retrievable from the EvictorContext. > >> > > > > >> > > > > >> > >> > > >> > > > > >> > > > > >> > >> > For the wiki you will need permissions. > This > >> was > >> > > > > >> recently > >> > > > > >> > > > changed > >> > > > > >> > > > > >> > >> because > >> > > > > >> > > > > >> > >> > there was too much spam. I gave you > >> permission > >> > to > >> > > > add > >> > > > > >> > pages. > >> > > > > >> > > > Can > >> > > > > >> > > > > >> you > >> > > > > >> > > > > >> > >> please > >> > > > > >> > > > > >> > >> > try and check if it works? > >> > > > > >> > > > > >> > >> > > >> > > > > >> > > > > >> > >> > Cheers, > >> > > > > >> > > > > >> > >> > Aljoscha > >> > > > > >> > > > > >> > >> > > >> > > > > >> > > > > >> > >> > On Fri, 15 Jul 2016 at 13:28 Vishnu > >> Viswanath < > >> > > > > >> > > > > >> > >> > vishnu.viswanat...@gmail.com> wrote: > >> > > > > >> > > > > >> > >> > > >> > > > > >> > > > > >> > >> >> Hi all, > >> > > > > >> > > > > >> > >> >> > >> > > > > >> > > > > >> > >> >> How do we create a FLIP page, is there any > >> > > > permission > >> > > > > >> > setup > >> > > > > >> > > > > >> > required? I > >> > > > > >> > > > > >> > >> >> don't see any "Create" page(after logging > >> in) > >> > > > option > >> > > > > in > >> > > > > >> > the > >> > > > > >> > > > > >> header as > >> > > > > >> > > > > >> > >> >> mentioned in > >> > > > > >> > > > > >> > >> >> > >> > > > > >> > > > > >> > >> >> > >> > > > > >> > > > > >> > >> > >> > > > > >> > > > > >> > > >> > > > > >> > > > > >> > >> > > > > >> > > > > > >> > > > > >> > > > > >> > > > > >> > > > >> > > > > >> > > >> > > > > >> > >> > > > > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/ > >> > > Flink+Improvement+Proposals > >> > > > > >> > > > > >> > >> >> > >> > > > > >> > > > > >> > >> >> > >> > > > > >> > > > > >> > >> >> Thanks, > >> > > > > >> > > > > >> > >> >> Vishnu > >> > > > > >> > > > > >> > >> >> > >> > > > > >> > > > > >> > >> >> On Wed, Jul 13, 2016 at 10:22 PM, Vishnu > >> > > Viswanath > >> > > > < > >> > > > > >> > > > > >> > >> >> vishnu.viswanat...@gmail.com> wrote: > >> > > > > >> > > > > >> > >> >> > >> > > > > >> > > > > >> > >> >> > Hi Aljoscha, > >> > > > > >> > > > > >> > >> >> > > >> > > > > >> > > > > >> > >> >> > I agree, the user will know exactly that > >> they > >> > > are > >> > > > > >> > creating > >> > > > > >> > > > an > >> > > > > >> > > > > >> > >> EventTime > >> > > > > >> > > > > >> > >> >> > based evictor or ProcessingTime based > >> evictor > >> > > > > >> looking at > >> > > > > >> > > the > >> > > > > >> > > > > >> code. > >> > > > > >> > > > > >> > >> >> > So do you think it will be ok to have > >> > multiple > >> > > > > >> versions > >> > > > > >> > of > >> > > > > >> > > > > >> > >> TimeEvictor > >> > > > > >> > > > > >> > >> >> > (one for event time and one for > processing > >> > > time) > >> > > > > and > >> > > > > >> > also > >> > > > > >> > > a > >> > > > > >> > > > > >> > >> DeltaEvcitor > >> > > > > >> > > > > >> > >> >> > (again 2 versions- for event time and > >> > > processing > >> > > > > >> time) ? > >> > > > > >> > > > > >> > >> >> > > >> > > > > >> > > > > >> > >> >> > Please note that the existing behavior > of > >> > > > > >> > > > > >> TimeEvictor/DeltaEvictor > >> > > > > >> > > > > >> > >> does > >> > > > > >> > > > > >> > >> >> > not consider if it is EventTime or > >> > > ProcessingTime > >> > > > > >> > > > > >> > >> >> > e.g., in TimeEvictor the current time is > >> > > > considered > >> > > > > >> as > >> > > > > >> > the > >> > > > > >> > > > > >> > timestamp > >> > > > > >> > > > > >> > >> of > >> > > > > >> > > > > >> > >> >> > the last element in the window > >> > > > > >> > > > > >> > >> >> > > >> > > > > >> > > > > >> > >> >> > *long currentTime = > >> > > > > >> > > > > Iterables.getLast(elements).getTimestamp();* > >> > > > > >> > > > > >> > >> >> > > >> > > > > >> > > > > >> > >> >> > not the highest timestamp of all > elements > >> > > > > >> > > > > >> > >> >> > what I am trying to achieve is something > >> > like: > >> > > > > >> > > > > >> > >> >> > > >> > > > > >> > > > > >> > >> >> > *long currentTime;* > >> > > > > >> > > > > >> > >> >> > * if (ctx.isEventTime()) {* > >> > > > > >> > > > > >> > >> >> > * currentTime = > >> getMaxTimestamp(elements);* > >> > > > > >> > > > > >> > >> >> > * } else {* > >> > > > > >> > > > > >> > >> >> > * currentTime = > >> > > > > >> > > Iterables.getLast(elements).getTimestamp();* > >> > > > > >> > > > > >> > >> >> > * }* > >> > > > > >> > > > > >> > >> >> > > >> > > > > >> > > > > >> > >> >> > Similarly, in DeltaEvictor the > >> > *`lastElement`* > >> > > is > >> > > > > >> > > > > >> > >> >> > *`Iterables.getLast(elements);`* and I > am > >> > > > thinking > >> > > > > we > >> > > > > >> > > should > >> > > > > >> > > > > >> > consider > >> > > > > >> > > > > >> > >> >> the > >> > > > > >> > > > > >> > >> >> > element with max timestamp as the last > >> > element > >> > > > > >> instead > >> > > > > >> > of > >> > > > > >> > > > just > >> > > > > >> > > > > >> > >> getting > >> > > > > >> > > > > >> > >> >> the > >> > > > > >> > > > > >> > >> >> > last inserted element as *`lastElement`* > >> > > > > >> > > > > >> > >> >> > > >> > > > > >> > > > > >> > >> >> > Do you think it is the right thing to do > >> or > >> > > leave > >> > > > > the > >> > > > > >> > > > behavior > >> > > > > >> > > > > >> > >> Evictors > >> > > > > >> > > > > >> > >> >> as > >> > > > > >> > > > > >> > >> >> > is, w.r.t to choosing the last element? > >> > > > > >> > > > > >> > >> >> > > >> > > > > >> > > > > >> > >> >> > Thanks, > >> > > > > >> > > > > >> > >> >> > Vishnu > >> > > > > >> > > > > >> > >> >> > > >> > > > > >> > > > > >> > >> >> > On Wed, Jul 13, 2016 at 11:07 AM, > Aljoscha > >> > > > Krettek > >> > > > > < > >> > > > > >> > > > > >> > >> aljos...@apache.org > >> > > > > >> > > > > >> > >> >> > > >> > > > > >> > > > > >> > >> >> > wrote: > >> > > > > >> > > > > >> > >> >> > > >> > > > > >> > > > > >> > >> >> >> I still think it should be explicit in > >> the > >> > > > class. > >> > > > > >> For > >> > > > > >> > > > > example, > >> > > > > >> > > > > >> if > >> > > > > >> > > > > >> > >> you > >> > > > > >> > > > > >> > >> >> have > >> > > > > >> > > > > >> > >> >> >> this code: > >> > > > > >> > > > > >> > >> >> >> > >> > > > > >> > > > > >> > >> >> >> input > >> > > > > >> > > > > >> > >> >> >> .keyBy() > >> > > > > >> > > > > >> > >> >> >> .window() > >> > > > > >> > > > > >> > >> >> >> .trigger(EventTimeTrigger.create()) > >> > > > > >> > > > > >> > >> >> >> .evictor(TimeTrigger.create()) > >> > > > > >> > > > > >> > >> >> >> > >> > > > > >> > > > > >> > >> >> >> the time behavior of the trigger is > >> > explicitly > >> > > > > >> > specified > >> > > > > >> > > > > while > >> > > > > >> > > > > >> the > >> > > > > >> > > > > >> > >> >> evictor > >> > > > > >> > > > > >> > >> >> >> would dynamically adapt based on > internal > >> > > > workings > >> > > > > >> that > >> > > > > >> > > the > >> > > > > >> > > > > >> user > >> > > > > >> > > > > >> > >> might > >> > > > > >> > > > > >> > >> >> not > >> > > > > >> > > > > >> > >> >> >> be aware of. Having the behavior > >> explicit at > >> > > the > >> > > > > >> call > >> > > > > >> > > site > >> > > > > >> > > > is > >> > > > > >> > > > > >> very > >> > > > > >> > > > > >> > >> >> >> important, in my opinion. > >> > > > > >> > > > > >> > >> >> >> > >> > > > > >> > > > > >> > >> >> >> On Wed, 13 Jul 2016 at 16:28 Vishnu > >> > Viswanath > >> > > < > >> > > > > >> > > > > >> > >> >> >> vishnu.viswanat...@gmail.com> > >> > > > > >> > > > > >> > >> >> >> wrote: > >> > > > > >> > > > > >> > >> >> >> > >> > > > > >> > > > > >> > >> >> >> > Hi, > >> > > > > >> > > > > >> > >> >> >> > > >> > > > > >> > > > > >> > >> >> >> > I was hoping to use the isEventTime > >> method > >> > > in > >> > > > > the > >> > > > > >> > > > > >> WindowAssigner > >> > > > > >> > > > > >> > >> to > >> > > > > >> > > > > >> > >> >> set > >> > > > > >> > > > > >> > >> >> >> > that information in the > EvictorContext. > >> > > > > >> > > > > >> > >> >> >> > What do you think?. > >> > > > > >> > > > > >> > >> >> >> > > >> > > > > >> > > > > >> > >> >> >> > Thanks and Regards, > >> > > > > >> > > > > >> > >> >> >> > Vishnu Viswanath, > >> > > > > >> > > > > >> > >> >> >> > > >> > > > > >> > > > > >> > >> >> >> > On Wed, Jul 13, 2016 at 10:09 AM, > >> Aljoscha > >> > > > > >> Krettek < > >> > > > > >> > > > > >> > >> >> aljos...@apache.org > >> > > > > >> > > > > >> > >> >> >> > > >> > > > > >> > > > > >> > >> >> >> > wrote: > >> > > > > >> > > > > >> > >> >> >> > > >> > > > > >> > > > > >> > >> >> >> > > Hi, > >> > > > > >> > > > > >> > >> >> >> > > I think the way to go here is to > add > >> > both > >> > > an > >> > > > > >> > > > > >> EventTimeEvictor > >> > > > > >> > > > > >> > >> and a > >> > > > > >> > > > > >> > >> >> >> > > ProcessingTimeEvictor. The problem > is > >> > that > >> > > > > >> > > > "isEventTime" > >> > > > > >> > > > > >> > cannot > >> > > > > >> > > > > >> > >> >> >> really be > >> > > > > >> > > > > >> > >> >> >> > > determined. That's also the reason > >> why > >> > > there > >> > > > > is > >> > > > > >> an > >> > > > > >> > > > > >> > >> EventTimeTrigger > >> > > > > >> > > > > >> > >> >> >> and a > >> > > > > >> > > > > >> > >> >> >> > > ProcessingTimeTrigger. It was just > an > >> > > > > oversight > >> > > > > >> > that > >> > > > > >> > > > the > >> > > > > >> > > > > >> > >> >> TimeEvictor > >> > > > > >> > > > > >> > >> >> >> does > >> > > > > >> > > > > >> > >> >> >> > > not also have these two versions. > >> > > > > >> > > > > >> > >> >> >> > > > >> > > > > >> > > > > >> > >> >> >> > > About EvictingWindowOperator, I > think > >> > you > >> > > > can > >> > > > > >> make > >> > > > > >> > > the > >> > > > > >> > > > > two > >> > > > > >> > > > > >> > >> methods > >> > > > > >> > > > > >> > >> >> >> > > non-final in WindowOperator, yes. > >> > > > > >> > > > > >> > >> >> >> > > > >> > > > > >> > > > > >> > >> >> >> > > Cheers, > >> > > > > >> > > > > >> > >> >> >> > > Aljoscha > >> > > > > >> > > > > >> > >> >> >> > > > >> > > > > >> > > > > >> > >> >> >> > > On Wed, 13 Jul 2016 at 14:32 Vishnu > >> > > > Viswanath > >> > > > > < > >> > > > > >> > > > > >> > >> >> >> > > vishnu.viswanat...@gmail.com> > >> > > > > >> > > > > >> > >> >> >> > > wrote: > >> > > > > >> > > > > >> > >> >> >> > > > >> > > > > >> > > > > >> > >> >> >> > > > Hi Aljoscha, > >> > > > > >> > > > > >> > >> >> >> > > > > >> > > > > >> > > > > >> > >> >> >> > > > I am thinking of adding a method > >> > boolean > >> > > > > >> > > > isEventTime(); > >> > > > > >> > > > > >> in > >> > > > > >> > > > > >> > the > >> > > > > >> > > > > >> > >> >> >> > > > EvictorContext apart from > >> > > > > >> > > > > >> > >> >> >> > > > > >> > > > > >> > > > > >> > >> >> >> > > > long getCurrentProcessingTime(); > >> > > > > >> > > > > >> > >> >> >> > > > MetricGroup getMetricGroup(); > >> > > > > >> > > > > >> > >> >> >> > > > long getCurrentWatermark(); > >> > > > > >> > > > > >> > >> >> >> > > > > >> > > > > >> > > > > >> > >> >> >> > > > This method can be used to make > the > >> > > > Evictor > >> > > > > >> not > >> > > > > >> > > > iterate > >> > > > > >> > > > > >> > >> through > >> > > > > >> > > > > >> > >> >> all > >> > > > > >> > > > > >> > >> >> >> the > >> > > > > >> > > > > >> > >> >> >> > > > elements in TimeEvictor. There > will > >> > be a > >> > > > few > >> > > > > >> > > changes > >> > > > > >> > > > in > >> > > > > >> > > > > >> the > >> > > > > >> > > > > >> > >> >> existing > >> > > > > >> > > > > >> > >> >> >> > > > behavior of TimeEvictor and > >> > DeltaEvictor > >> > > > (I > >> > > > > >> have > >> > > > > >> > > > > >> mentioned > >> > > > > >> > > > > >> > >> this > >> > > > > >> > > > > >> > >> >> in > >> > > > > >> > > > > >> > >> >> >> the > >> > > > > >> > > > > >> > >> >> >> > > > design doc) > >> > > > > >> > > > > >> > >> >> >> > > > > >> > > > > >> > > > > >> > >> >> >> > > > Also, is there any specific > reason > >> why > >> > > the > >> > > > > >> open > >> > > > > >> > and > >> > > > > >> > > > > close > >> > > > > >> > > > > >> > >> method > >> > > > > >> > > > > >> > >> >> in > >> > > > > >> > > > > >> > >> >> >> > > > WindowEvictor is made final? > Since > >> the > >> > > > > >> > > EvictorContext > >> > > > > >> > > > > >> will > >> > > > > >> > > > > >> > be > >> > > > > >> > > > > >> > >> in > >> > > > > >> > > > > >> > >> >> the > >> > > > > >> > > > > >> > >> >> >> > > > EvictingWindowOperator, I need to > >> > > override > >> > > > > the > >> > > > > >> > open > >> > > > > >> > > > and > >> > > > > >> > > > > >> > close > >> > > > > >> > > > > >> > >> in > >> > > > > >> > > > > >> > >> >> >> > > > EvitingWindowOperator to make the > >> > > > reference > >> > > > > of > >> > > > > >> > > > > >> > EvictorContext > >> > > > > >> > > > > >> > >> >> null. > >> > > > > >> > > > > >> > >> >> >> > > > > >> > > > > >> > > > > >> > >> >> >> > > > Thanks and Regards, > >> > > > > >> > > > > >> > >> >> >> > > > Vishnu Viswanath, > >> > > > > >> > > > > >> > >> >> >> > > > > >> > > > > >> > > > > >> > >> >> >> > > > On Fri, Jul 8, 2016 at 7:40 PM, > >> Vishnu > >> > > > > >> Viswanath > >> > > > > >> > < > >> > > > > >> > > > > >> > >> >> >> > > > vishnu.viswanat...@gmail.com> > >> wrote: > >> > > > > >> > > > > >> > >> >> >> > > > > >> > > > > >> > > > > >> > >> >> >> > > > My thought process when asking if > >> we > >> > can > >> > > > use > >> > > > > >> > state > >> > > > > >> > > > > >> backend > >> > > > > >> > > > > >> > in > >> > > > > >> > > > > >> > >> >> window > >> > > > > >> > > > > >> > >> >> >> > > > > function was : can we add the > >> > elements > >> > > > to > >> > > > > be > >> > > > > >> > > > evicted > >> > > > > >> > > > > >> into > >> > > > > >> > > > > >> > >> some > >> > > > > >> > > > > >> > >> >> >> state > >> > > > > >> > > > > >> > >> >> >> > > and > >> > > > > >> > > > > >> > >> >> >> > > > > allow the evictAfter to read it > >> from > >> > > > some > >> > > > > >> > context > >> > > > > >> > > > and > >> > > > > >> > > > > >> > >> remove it > >> > > > > >> > > > > >> > >> >> >> from > >> > > > > >> > > > > >> > >> >> >> > > the > >> > > > > >> > > > > >> > >> >> >> > > > > window? > >> > > > > >> > > > > >> > >> >> >> > > > > > >> > > > > >> > > > > >> > >> >> >> > > > > > >> > > > > >> > > > > >> > >> >> >> > > > > On Fri, Jul 8, 2016 at 7:30 PM, > >> > Vishnu > >> > > > > >> > Viswanath > >> > > > > >> > > < > >> > > > > >> > > > > >> > >> >> >> > > > > vishnu.viswanat...@gmail.com> > >> > wrote: > >> > > > > >> > > > > >> > >> >> >> > > > > > >> > > > > >> > > > > >> > >> >> >> > > > >> Hi Aljoscha, > >> > > > > >> > > > > >> > >> >> >> > > > >> > >> > > > > >> > > > > >> > >> >> >> > > > >> Thanks for the explanation, > and > >> > sorry > >> > > > for > >> > > > > >> late > >> > > > > >> > > > reply > >> > > > > >> > > > > >> was > >> > > > > >> > > > > >> > >> busy > >> > > > > >> > > > > >> > >> >> >> with > >> > > > > >> > > > > >> > >> >> >> > > work. > >> > > > > >> > > > > >> > >> >> >> > > > >> > >> > > > > >> > > > > >> > >> >> >> > > > >> I did think about this > >> scenario, in > >> > > > fact > >> > > > > >> in my > >> > > > > >> > > > > >> previous > >> > > > > >> > > > > >> > >> mail I > >> > > > > >> > > > > >> > >> >> >> > thought > >> > > > > >> > > > > >> > >> >> >> > > > of > >> > > > > >> > > > > >> > >> >> >> > > > >> posting this question, then I > >> > > > understood > >> > > > > >> that > >> > > > > >> > > this > >> > > > > >> > > > > >> > problem > >> > > > > >> > > > > >> > >> >> will > >> > > > > >> > > > > >> > >> >> >> be > >> > > > > >> > > > > >> > >> >> >> > > > >> there which ever method we > >> > > > choose(Trigger > >> > > > > >> > > looking > >> > > > > >> > > > > for > >> > > > > >> > > > > >> > >> pattern > >> > > > > >> > > > > >> > >> >> or > >> > > > > >> > > > > >> > >> >> >> > > Window > >> > > > > >> > > > > >> > >> >> >> > > > >> looking for pattern). > >> > > > > >> > > > > >> > >> >> >> > > > >> > >> > > > > >> > > > > >> > >> >> >> > > > >> I do have a pretty good > >> watermark > >> > but > >> > > > my > >> > > > > >> > concern > >> > > > > >> > > > is > >> > > > > >> > > > > >> that > >> > > > > >> > > > > >> > it > >> > > > > >> > > > > >> > >> >> >> changes > >> > > > > >> > > > > >> > >> >> >> > > > based > >> > > > > >> > > > > >> > >> >> >> > > > >> on the key of these messages(I > >> > don't > >> > > > know > >> > > > > >> if > >> > > > > >> > it > >> > > > > >> > > is > >> > > > > >> > > > > >> > >> possible, > >> > > > > >> > > > > >> > >> >> >> haven't > >> > > > > >> > > > > >> > >> >> >> > > > >> started coding that yet. May > be > >> you > >> > > > could > >> > > > > >> tell > >> > > > > >> > > > me). > >> > > > > >> > > > > >> Even > >> > > > > >> > > > > >> > if > >> > > > > >> > > > > >> > >> >> it is > >> > > > > >> > > > > >> > >> >> >> > yes > >> > > > > >> > > > > >> > >> >> >> > > > some > >> > > > > >> > > > > >> > >> >> >> > > > >> of these watermarks will be > >> long(in > >> > > > > days), > >> > > > > >> > > which I > >> > > > > >> > > > > >> don't > >> > > > > >> > > > > >> > >> want > >> > > > > >> > > > > >> > >> >> the > >> > > > > >> > > > > >> > >> >> >> > > > trigger > >> > > > > >> > > > > >> > >> >> >> > > > >> to wait that long. > >> > > > > >> > > > > >> > >> >> >> > > > >> > >> > > > > >> > > > > >> > >> >> >> > > > >> It looks like it is not easy > to > >> > have > >> > > an > >> > > > > >> > > evictAfter > >> > > > > >> > > > > >> based > >> > > > > >> > > > > >> > on > >> > > > > >> > > > > >> > >> >> >> window > >> > > > > >> > > > > >> > >> >> >> > > > >> function(without introducing > >> > > coupling), > >> > > > > but > >> > > > > >> > can > >> > > > > >> > > > the > >> > > > > >> > > > > >> > current > >> > > > > >> > > > > >> > >> >> >> window > >> > > > > >> > > > > >> > >> >> >> > > apply > >> > > > > >> > > > > >> > >> >> >> > > > >> function be modified to allow > >> it to > >> > > > > change > >> > > > > >> the > >> > > > > >> > > > > >> elements > >> > > > > >> > > > > >> > in > >> > > > > >> > > > > >> > >> it > >> > > > > >> > > > > >> > >> >> - > >> > > > > >> > > > > >> > >> >> >> may > >> > > > > >> > > > > >> > >> >> >> > be > >> > > > > >> > > > > >> > >> >> >> > > > >> using some state backend(I > don't > >> > know > >> > > > how > >> > > > > >> > > excatly > >> > > > > >> > > > > the > >> > > > > >> > > > > >> > >> >> internals > >> > > > > >> > > > > >> > >> >> >> of > >> > > > > >> > > > > >> > >> >> >> > > these > >> > > > > >> > > > > >> > >> >> >> > > > >> work, so this might be a wrong > >> > > > question) > >> > > > > >> > > > > >> > >> >> >> > > > >> > >> > > > > >> > > > > >> > >> >> >> > > > >> Thanks and Regards, > >> > > > > >> > > > > >> > >> >> >> > > > >> Vishnu Viswanath, > >> > > > > >> > > > > >> > >> >> >> > > > >> > >> > > > > >> > > > > >> > >> >> >> > > > >> On Fri, Jul 8, 2016 at 10:20 > AM, > >> > > > Aljoscha > >> > > > > >> > > Krettek > >> > > > > >> > > > < > >> > > > > >> > > > > >> > >> >> >> > > aljos...@apache.org> > >> > > > > >> > > > > >> > >> >> >> > > > >> wrote: > >> > > > > >> > > > > >> > >> >> >> > > > >> > >> > > > > >> > > > > >> > >> >> >> > > > >>> Hi Vishnu, > >> > > > > >> > > > > >> > >> >> >> > > > >>> how long would these patterns > >> be? > >> > > The > >> > > > > >> Trigger > >> > > > > >> > > > would > >> > > > > >> > > > > >> not > >> > > > > >> > > > > >> > >> have > >> > > > > >> > > > > >> > >> >> to > >> > > > > >> > > > > >> > >> >> >> > sort > >> > > > > >> > > > > >> > >> >> >> > > > the > >> > > > > >> > > > > >> > >> >> >> > > > >>> elements for every new > element > >> but > >> > > > just > >> > > > > >> > insert > >> > > > > >> > > > the > >> > > > > >> > > > > >> new > >> > > > > >> > > > > >> > >> >> element > >> > > > > >> > > > > >> > >> >> >> into > >> > > > > >> > > > > >> > >> >> >> > > an > >> > > > > >> > > > > >> > >> >> >> > > > >>> internal data structure. Only > >> when > >> > > it > >> > > > > sees > >> > > > > >> > that > >> > > > > >> > > > the > >> > > > > >> > > > > >> > >> >> watermark is > >> > > > > >> > > > > >> > >> >> >> > > past a > >> > > > > >> > > > > >> > >> >> >> > > > >>> certain point would it check > >> > whether > >> > > > the > >> > > > > >> > > pattern > >> > > > > >> > > > > >> matches > >> > > > > >> > > > > >> > >> and > >> > > > > >> > > > > >> > >> >> >> > actually > >> > > > > >> > > > > >> > >> >> >> > > > >>> Trigger. > >> > > > > >> > > > > >> > >> >> >> > > > >>> > >> > > > > >> > > > > >> > >> >> >> > > > >>> A general note regarding > order > >> and > >> > > > event > >> > > > > >> > time: > >> > > > > >> > > I > >> > > > > >> > > > > >> think > >> > > > > >> > > > > >> > >> >> relying > >> > > > > >> > > > > >> > >> >> >> on > >> > > > > >> > > > > >> > >> >> >> > > this > >> > > > > >> > > > > >> > >> >> >> > > > >>> for > >> > > > > >> > > > > >> > >> >> >> > > > >>> computation is very tricky > >> unless > >> > > the > >> > > > > >> > watermark > >> > > > > >> > > > is > >> > > > > >> > > > > >> 100 % > >> > > > > >> > > > > >> > >> >> >> correct or > >> > > > > >> > > > > >> > >> >> >> > > you > >> > > > > >> > > > > >> > >> >> >> > > > >>> completely discard elements > >> that > >> > > > arrive > >> > > > > >> after > >> > > > > >> > > the > >> > > > > >> > > > > >> > >> watermark, > >> > > > > >> > > > > >> > >> >> >> i.e. > >> > > > > >> > > > > >> > >> >> >> > > > >>> elements > >> > > > > >> > > > > >> > >> >> >> > > > >>> that would break the promise > of > >> > the > >> > > > > >> watermark > >> > > > > >> > > > that > >> > > > > >> > > > > no > >> > > > > >> > > > > >> > >> >> elements > >> > > > > >> > > > > >> > >> >> >> with > >> > > > > >> > > > > >> > >> >> >> > > an > >> > > > > >> > > > > >> > >> >> >> > > > >>> earlier timestamp will ever > >> > arrive. > >> > > > The > >> > > > > >> > reason > >> > > > > >> > > > for > >> > > > > >> > > > > >> this > >> > > > > >> > > > > >> > is > >> > > > > >> > > > > >> > >> >> that > >> > > > > >> > > > > >> > >> >> >> > there > >> > > > > >> > > > > >> > >> >> >> > > > >>> could > >> > > > > >> > > > > >> > >> >> >> > > > >>> always enter new elements > that > >> end > >> > > up > >> > > > > >> between > >> > > > > >> > > > > already > >> > > > > >> > > > > >> > seen > >> > > > > >> > > > > >> > >> >> >> > elements. > >> > > > > >> > > > > >> > >> >> >> > > > For > >> > > > > >> > > > > >> > >> >> >> > > > >>> example, let's say we have > this > >> > > > sequence > >> > > > > >> of > >> > > > > >> > > > > elements > >> > > > > >> > > > > >> > when > >> > > > > >> > > > > >> > >> the > >> > > > > >> > > > > >> > >> >> >> > trigger > >> > > > > >> > > > > >> > >> >> >> > > > >>> fires: > >> > > > > >> > > > > >> > >> >> >> > > > >>> > >> > > > > >> > > > > >> > >> >> >> > > > >>> a-b-a > >> > > > > >> > > > > >> > >> >> >> > > > >>> > >> > > > > >> > > > > >> > >> >> >> > > > >>> This is the sequence that you > >> are > >> > > > > looking > >> > > > > >> for > >> > > > > >> > > and > >> > > > > >> > > > > you > >> > > > > >> > > > > >> > emit > >> > > > > >> > > > > >> > >> >> some > >> > > > > >> > > > > >> > >> >> >> > > result > >> > > > > >> > > > > >> > >> >> >> > > > >>> from > >> > > > > >> > > > > >> > >> >> >> > > > >>> the WindowFunction. Now, new > >> > > elements > >> > > > > >> arrive > >> > > > > >> > > that > >> > > > > >> > > > > >> fall > >> > > > > >> > > > > >> > in > >> > > > > >> > > > > >> > >> >> >> between > >> > > > > >> > > > > >> > >> >> >> > the > >> > > > > >> > > > > >> > >> >> >> > > > >>> elements we already have: > >> > > > > >> > > > > >> > >> >> >> > > > >>> > >> > > > > >> > > > > >> > >> >> >> > > > >>> a-d-e-b-f-g-a > >> > > > > >> > > > > >> > >> >> >> > > > >>> > >> > > > > >> > > > > >> > >> >> >> > > > >>> This is an updated, sorted > >> view of > >> > > the > >> > > > > >> actual > >> > > > > >> > > > > >> event-time > >> > > > > >> > > > > >> > >> >> stream > >> > > > > >> > > > > >> > >> >> >> and > >> > > > > >> > > > > >> > >> >> >> > > we > >> > > > > >> > > > > >> > >> >> >> > > > >>> didn't realize that the > stream > >> > > > actually > >> > > > > >> looks > >> > > > > >> > > > like > >> > > > > >> > > > > >> this > >> > > > > >> > > > > >> > >> >> before. > >> > > > > >> > > > > >> > >> >> >> > Does > >> > > > > >> > > > > >> > >> >> >> > > > this > >> > > > > >> > > > > >> > >> >> >> > > > >>> still match the original > >> pattern > >> > or > >> > > > > >> should we > >> > > > > >> > > now > >> > > > > >> > > > > >> > consider > >> > > > > >> > > > > >> > >> >> this > >> > > > > >> > > > > >> > >> >> >> as > >> > > > > >> > > > > >> > >> >> >> > > > >>> non-matching? If no, then the > >> > > earlier > >> > > > > >> > > successful > >> > > > > >> > > > > >> match > >> > > > > >> > > > > >> > for > >> > > > > >> > > > > >> > >> >> a-b-a > >> > > > > >> > > > > >> > >> >> >> > was > >> > > > > >> > > > > >> > >> >> >> > > > >>> wrong > >> > > > > >> > > > > >> > >> >> >> > > > >>> and we should never have > >> processed > >> > > it > >> > > > > but > >> > > > > >> we > >> > > > > >> > > > didn't > >> > > > > >> > > > > >> know > >> > > > > >> > > > > >> > >> at > >> > > > > >> > > > > >> > >> >> the > >> > > > > >> > > > > >> > >> >> >> > time. > >> > > > > >> > > > > >> > >> >> >> > > > If > >> > > > > >> > > > > >> > >> >> >> > > > >>> yes, then pattern matching > like > >> > this > >> > > > can > >> > > > > >> be > >> > > > > >> > > done > >> > > > > >> > > > in > >> > > > > >> > > > > >> the > >> > > > > >> > > > > >> > >> >> Trigger > >> > > > > >> > > > > >> > >> >> >> by > >> > > > > >> > > > > >> > >> >> >> > > > having > >> > > > > >> > > > > >> > >> >> >> > > > >>> something like pattern slots: > >> You > >> > > > don't > >> > > > > >> have > >> > > > > >> > to > >> > > > > >> > > > > store > >> > > > > >> > > > > >> > all > >> > > > > >> > > > > >> > >> >> >> elements > >> > > > > >> > > > > >> > >> >> >> > in > >> > > > > >> > > > > >> > >> >> >> > > > the > >> > > > > >> > > > > >> > >> >> >> > > > >>> Trigger, you just need to > store > >> > > > possible > >> > > > > >> > > > candidates > >> > > > > >> > > > > >> that > >> > > > > >> > > > > >> > >> >> could > >> > > > > >> > > > > >> > >> >> >> > match > >> > > > > >> > > > > >> > >> >> >> > > > the > >> > > > > >> > > > > >> > >> >> >> > > > >>> pattern and ignore the other > >> > > > > (in-between) > >> > > > > >> > > > elements. > >> > > > > >> > > > > >> > >> >> >> > > > >>> > >> > > > > >> > > > > >> > >> >> >> > > > >>> Cheers, > >> > > > > >> > > > > >> > >> >> >> > > > >>> Aljoscha > >> > > > > >> > > > > >> > >> >> >> > > > >>> > >> > > > > >> > > > > >> > >> >> >> > > > >>> On Fri, 8 Jul 2016 at 14:10 > >> Vishnu > >> > > > > >> Viswanath > >> > > > > >> > < > >> > > > > >> > > > > >> > >> >> >> > > > >>> vishnu.viswanat...@gmail.com > > > >> > > > > >> > > > > >> > >> >> >> > > > >>> wrote: > >> > > > > >> > > > > >> > >> >> >> > > > >>> > >> > > > > >> > > > > >> > >> >> >> > > > >>> > Hi Aljoscha, > >> > > > > >> > > > > >> > >> >> >> > > > >>> > > >> > > > > >> > > > > >> > >> >> >> > > > >>> > That is a good idea, trying > >> to > >> > tie > >> > > > it > >> > > > > >> back > >> > > > > >> > to > >> > > > > >> > > > the > >> > > > > >> > > > > >> use > >> > > > > >> > > > > >> > >> case, > >> > > > > >> > > > > >> > >> >> >> > > > >>> > e.g., suppose trigger is > >> looking > >> > > > for a > >> > > > > >> > > pattern, > >> > > > > >> > > > > >> a-b-a > >> > > > > >> > > > > >> > >> and > >> > > > > >> > > > > >> > >> >> >> when it > >> > > > > >> > > > > >> > >> >> >> > > > sees > >> > > > > >> > > > > >> > >> >> >> > > > >>> such > >> > > > > >> > > > > >> > >> >> >> > > > >>> > a pattern, it will trigger > >> the > >> > > > window > >> > > > > >> and > >> > > > > >> > it > >> > > > > >> > > > > knows > >> > > > > >> > > > > >> > that > >> > > > > >> > > > > >> > >> now > >> > > > > >> > > > > >> > >> >> >> the > >> > > > > >> > > > > >> > >> >> >> > > > >>> Evictor is > >> > > > > >> > > > > >> > >> >> >> > > > >>> > going to evict the element > b, > >> > and > >> > > > > >> trigger > >> > > > > >> > > > updates > >> > > > > >> > > > > >> its > >> > > > > >> > > > > >> > >> >> state as > >> > > > > >> > > > > >> > >> >> >> > a-a > >> > > > > >> > > > > >> > >> >> >> > > > >>> (even > >> > > > > >> > > > > >> > >> >> >> > > > >>> > before the window & evictor > >> > > > completes) > >> > > > > >> and > >> > > > > >> > > will > >> > > > > >> > > > > be > >> > > > > >> > > > > >> > >> looking > >> > > > > >> > > > > >> > >> >> for > >> > > > > >> > > > > >> > >> >> >> > the > >> > > > > >> > > > > >> > >> >> >> > > > >>> rest of > >> > > > > >> > > > > >> > >> >> >> > > > >>> > the pattern i.e., b-a. But > I > >> can > >> > > > think > >> > > > > >> of 1 > >> > > > > >> > > > > problem > >> > > > > >> > > > > >> > >> here, > >> > > > > >> > > > > >> > >> >> >> > > > >>> > > >> > > > > >> > > > > >> > >> >> >> > > > >>> > - the events can arrive > >> out > >> > of > >> > > > > order, > >> > > > > >> > > i.e., > >> > > > > >> > > > > the > >> > > > > >> > > > > >> > >> trigger > >> > > > > >> > > > > >> > >> >> >> might > >> > > > > >> > > > > >> > >> >> >> > be > >> > > > > >> > > > > >> > >> >> >> > > > >>> seeing > >> > > > > >> > > > > >> > >> >> >> > > > >>> > a pattern a-a-b but > actual > >> > > event > >> > > > > >> time is > >> > > > > >> > > > a-b-a > >> > > > > >> > > > > >> then > >> > > > > >> > > > > >> > >> >> trigger > >> > > > > >> > > > > >> > >> >> >> > will > >> > > > > >> > > > > >> > >> >> >> > > > >>> have to > >> > > > > >> > > > > >> > >> >> >> > > > >>> > sort the elements in the > >> > window > >> > > > > >> > everytime > >> > > > > >> > > it > >> > > > > >> > > > > >> sees > >> > > > > >> > > > > >> > an > >> > > > > >> > > > > >> > >> >> >> element. > >> > > > > >> > > > > >> > >> >> >> > (I > >> > > > > >> > > > > >> > >> >> >> > > > was > >> > > > > >> > > > > >> > >> >> >> > > > >>> > planning to do this > >> sorting > >> > in > >> > > > the > >> > > > > >> > window, > >> > > > > >> > > > > which > >> > > > > >> > > > > >> > >> will be > >> > > > > >> > > > > >> > >> >> >> less > >> > > > > >> > > > > >> > >> >> >> > > > often > >> > > > > >> > > > > >> > >> >> >> > > > >>> - > >> > > > > >> > > > > >> > >> >> >> > > > >>> > only > >> > > > > >> > > > > >> > >> >> >> > > > >>> > when the trigger fires) > >> > > > > >> > > > > >> > >> >> >> > > > >>> > > >> > > > > >> > > > > >> > >> >> >> > > > >>> > Thanks and Regards, > >> > > > > >> > > > > >> > >> >> >> > > > >>> > Vishnu Viswanath, > >> > > > > >> > > > > >> > >> >> >> > > > >>> > > >> > > > > >> > > > > >> > >> >> >> > > > >>> > On Fri, Jul 8, 2016 at 6:04 > >> AM, > >> > > > > Aljoscha > >> > > > > >> > > > Krettek > >> > > > > >> > > > > < > >> > > > > >> > > > > >> > >> >> >> > > > aljos...@apache.org> > >> > > > > >> > > > > >> > >> >> >> > > > >>> > wrote: > >> > > > > >> > > > > >> > >> >> >> > > > >>> > > >> > > > > >> > > > > >> > >> >> >> > > > >>> > Hi, > >> > > > > >> > > > > >> > >> >> >> > > > >>> > > come to think of it, the > >> right > >> > > > place > >> > > > > >> to > >> > > > > >> > put > >> > > > > >> > > > > such > >> > > > > >> > > > > >> > >> checks > >> > > > > >> > > > > >> > >> >> is > >> > > > > >> > > > > >> > >> >> >> > > actually > >> > > > > >> > > > > >> > >> >> >> > > > >>> the > >> > > > > >> > > > > >> > >> >> >> > > > >>> > > Trigger. It would have to > >> be a > >> > > > > custom > >> > > > > >> > > trigger > >> > > > > >> > > > > >> that > >> > > > > >> > > > > >> > >> >> observes > >> > > > > >> > > > > >> > >> >> >> > time > >> > > > > >> > > > > >> > >> >> >> > > > but > >> > > > > >> > > > > >> > >> >> >> > > > >>> also > >> > > > > >> > > > > >> > >> >> >> > > > >>> > > keeps some internal state > >> > > machine > >> > > > to > >> > > > > >> > decide > >> > > > > >> > > > > when > >> > > > > >> > > > > >> it > >> > > > > >> > > > > >> > >> has > >> > > > > >> > > > > >> > >> >> >> > observed > >> > > > > >> > > > > >> > >> >> >> > > > the > >> > > > > >> > > > > >> > >> >> >> > > > >>> > right > >> > > > > >> > > > > >> > >> >> >> > > > >>> > > pattern in the window. > Then > >> > the > >> > > > > window > >> > > > > >> > > > function > >> > > > > >> > > > > >> > would > >> > > > > >> > > > > >> > >> >> just > >> > > > > >> > > > > >> > >> >> >> have > >> > > > > >> > > > > >> > >> >> >> > > to > >> > > > > >> > > > > >> > >> >> >> > > > >>> do the > >> > > > > >> > > > > >> > >> >> >> > > > >>> > > processing and you have > >> good > >> > > > > >> separation > >> > > > > >> > of > >> > > > > >> > > > > >> concerns. > >> > > > > >> > > > > >> > >> Does > >> > > > > >> > > > > >> > >> >> >> that > >> > > > > >> > > > > >> > >> >> >> > > make > >> > > > > >> > > > > >> > >> >> >> > > > >>> > sense? > >> > > > > >> > > > > >> > >> >> >> > > > >>> > > > >> > > > > >> > > > > >> > >> >> >> > > > >>> > > I'm ignoring time and > >> sorting > >> > by > >> > > > > time > >> > > > > >> for > >> > > > > >> > > now > >> > > > > >> > > > > >> > because > >> > > > > >> > > > > >> > >> we > >> > > > > >> > > > > >> > >> >> >> > probably > >> > > > > >> > > > > >> > >> >> >> > > > >>> need > >> > > > > >> > > > > >> > >> >> >> > > > >>> > > another design document > for > >> > > that. > >> > > > To > >> > > > > >> me > >> > > > > >> > it > >> > > > > >> > > > > seems > >> > > > > >> > > > > >> > like > >> > > > > >> > > > > >> > >> a > >> > > > > >> > > > > >> > >> >> >> bigger > >> > > > > >> > > > > >> > >> >> >> > > > thing. > >> > > > > >> > > > > >> > >> >> >> > > > >>> > > > >> > > > > >> > > > > >> > >> >> >> > > > >>> > > Cheers, > >> > > > > >> > > > > >> > >> >> >> > > > >>> > > Aljoscha > >> > > > > >> > > > > >> > >> >> >> > > > >> > > > > >