Hi, @Aljoscha, the function param currentTimestamp comment does not match the recordTimestamp "long extractTimestamp(T element, long recordTimestamp)" on wiki.
Best, Zhangguanghui Dawid Wysakowicz <dwysakow...@apache.org> 于2020年5月13日周三 上午12:28写道: > Thank you for the update and sorry again for chiming in so late... > > Best, > > Dawid > > > On 12/05/2020 18:21, Aljoscha Krettek wrote: > > Yes, I am also ok with a SerializableTimestampAssigner. This only > > looks a bit clumsy in the API but as a user (that uses lambdas) you > > should not see this. I pushed changes for this to my branch: > > https://github.com/aljoscha/flink/tree/flink-xxx-wm-generators-rebased > > > > And yes, recordTimestamp sounds good for the TimestampAssigner. I > > admit I didn't read this well enough and only saw nativeTimestamp. > > > > Best, > > Aljoscha > > > > On 12.05.20 17:16, Dawid Wysakowicz wrote: > >> I have similar thoughts to @Stephan > >> > >> Ad. 1 I tried something like this on your branch: > >> > >> /** > >> * Adds the given {@link TimestampAssigner} to this {@link > >> WatermarkStrategies}. For top-level classes that implement both > >> Serializable and TimestampAssigner > >> */ > >> public <TA extends TimestampAssigner<T> & Serializable> > >> WatermarkStrategies<T> withTimestampAssigner(TA timestampAssigner) { > >> checkNotNull(timestampAssigner, "timestampAssigner"); > >> this.timestampAssigner = timestampAssigner; > >> return this; > >> } > >> > >> @FunctionalInterface > >> public interface SerializableTimestampAssigner<T> extends > >> TimestampAssigner<T>, Serializable { > >> } > >> > >> /** > >> * Adds the given {@link TimestampAssigner} to this {@link > >> WatermarkStrategies}. > >> * Helper method for serializable lambdas. > >> */ > >> public WatermarkStrategies<T> > >> withTimestampAssigner(SerializableTimestampAssigner<T> > >> timestampAssigner) { > >> checkNotNull(timestampAssigner, "timestampAssigner"); > >> this.timestampAssigner = timestampAssigner; > >> return this; > >> } > >> > >> But I understand if that's too hacky. It's just a pity that we must > >> enforce limitations on an interface that are not strictly necessary. > >> > >> Ad 2/3 > >> > >> I am aware the watermark assigner/timestamp extractor can be applied > >> further down the graph. Originally I also wanted to suggest > >> sourceTimestamp and SourceTimestampAssigner, but then I realized it can > >> be used also after the sources as you correctly pointed out. Even if the > >> TimestampAssigner is used after the source there might be some > >> native/record timestamp in the StreamRecord, that could've been > >> extracted by previous assigner. > >> > >> Best, > >> > >> Dawid > >> > >> On 12/05/2020 16:47, Stephan Ewen wrote: > >>> @Aljoscha > >>> > >>> About (1) could we have an interface SerializableTimestampAssigner that > >>> simply mixes in the java.io.Serializable interface? Or will this be too > >>> clumsy? > >>> > >>> About (3) RecordTimeStamp seems to fit both cases (in-source-record > >>> timestamp, in stream-record timestamp). > >>> > >>> On Tue, May 12, 2020 at 4:12 PM Aljoscha Krettek <aljos...@apache.org> > >>> wrote: > >>> > >>>> Definitely +1 to point 2) raised by Dawid. I'm not sure on points > >>>> 1) and > >>>> 3). > >>>> > >>>> 1) I can see the benefit of that but in reality most timestamp > >>>> assigners > >>>> will probably need to be Serializable. If you look at my (updated) POC > >>>> branch [1] you can see how a TimestampAssigner would be specified > >>>> on the > >>>> WatermarkStrategies helper class: [2]. The signature of this would > >>>> have > >>>> to be changed to something like: > >>>> > >>>> public <TA extends TimestampAssigner<T> & Serializable> > >>>> WatermarkStrategies<T> withTimestampAssigner(TA timestampAssigner) > >>>> > >>>> Then, however, it would not be possible for users to specify a > >>>> lambda or > >>>> anonymous inner function for the TimestampAssigner like this: > >>>> > >>>> WatermarkStrategy<Long> testWmStrategy = WatermarkStrategies > >>>> .forGenerator(new PeriodicTestWatermarkGenerator()) > >>>> .withTimestampAssigner((event, timestamp) -> event) > >>>> .build(); > >>>> > >>>> 3) This makes sense if we only allow WatermarkStrategies on sources, > >>>> where the previous timestamp really is the "native" timestamp. > >>>> Currently, we also allow setting watermark strategies at arbitrary > >>>> points in the graph. I'm thinking we probably should only allow > >>>> that in > >>>> sources but it's not the reality currently. I'm not against > >>>> renaming it, > >>>> just voicing those thoughts. > >>>> > >>>> Best, > >>>> Aljoscha > >>>> > >>>> > >>>> [1] > >>>> > https://github.com/aljoscha/flink/tree/flink-xxx-wm-generators-rebased > >>>> [2] > >>>> > >>>> > https://github.com/aljoscha/flink/blob/flink-xxx-wm-generators-rebased/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkStrategies.java#L81 > >>>> > >>>> > >>>> On 12.05.20 15:48, Stephan Ewen wrote: > >>>>> +1 to all of Dawid's suggestions, makes a lot of sense to me > >>>>> > >>>>> On Tue, May 12, 2020 at 2:32 PM Dawid Wysakowicz > >>>>> <dwysakow...@apache.org > >>>>> > >>>>> wrote: > >>>>> > >>>>>> Hi Aljoscha, > >>>>>> > >>>>>> Sorry for adding comments during the vote, but I have some really > >>>>>> minor > >>>>>> suggestions that should not influence the voting thread imo. > >>>>>> > >>>>>> 1) Does it make sense to have the TimestampAssigner extend from > >>>>>> Flink's > >>>>>> Function? This implies it has to be serializable which with the > >>>>>> factory > >>>>>> pattern is not strictly necessary, right? BTW I really like that you > >>>>>> suggested the FunctionInterface annotation there. > >>>>>> > >>>>>> 2) Could we rename the IdentityTimestampAssigner to e.g. > >>>>>> > >>>> > RecordTimestampAssigner/SystemTimestampAssigner/NativeTimestampAssigner... > >>>> > >>>>>> Personally I found the IdentityTimestampAssigner a bit misleading > >>>>>> as it > >>>>>> usually mean a no-op. Which did not click for me, as I assumed it > >>>>>> somehow returns the incoming record itself. > >>>>>> > >>>>>> 3) Could we rename the second parameter of > >>>>>> TimestampAssigner#extract to > >>>>>> e.g. recordTimestamp/nativeTimestamp? This is similar to the point > >>>>>> above. This parameter was also a bit confusing for me as I > >>>>>> thought at > >>>>>> times its somehow related to > >>>>>> TimerService#currentProcessingTimestamp()/currentWatermark() as the > >>>>>> whole system currentTimestamp. > >>>>>> > >>>>>> Other than those three points I like the proposal and I was about to > >>>>>> vote +1 if it was not for those three points. > >>>>>> > >>>>>> Best, > >>>>>> > >>>>>> Dawid > >>>>>> > >>>>>> On 11/05/2020 16:57, Jark Wu wrote: > >>>>>>> Thanks for the explanation. I like the fatory pattern to make the > >>>> member > >>>>>>> variables immutable and final. > >>>>>>> > >>>>>>> So +1 to the proposal. > >>>>>>> > >>>>>>> Best, > >>>>>>> Jark > >>>>>>> > >>>>>>> On Mon, 11 May 2020 at 22:01, Stephan Ewen <se...@apache.org> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> I am fine with that. > >>>>>>>> > >>>>>>>> Much of the principles seem agreed upon. I understand the need to > >>>>>> support > >>>>>>>> code-generated extractors and we should support most of it > >>>>>>>> already (as > >>>>>>>> Aljoscha mentioned via the factories) can extend this if needed. > >>>>>>>> > >>>>>>>> I think that the factory approach supports code-generated > >>>>>>>> extractors > >>>> in > >>>>>> a > >>>>>>>> cleaner way even than an extractor with an open/init method. > >>>>>>>> > >>>>>>>> > >>>>>>>> On Mon, May 11, 2020 at 3:38 PM Aljoscha Krettek > >>>>>>>> <aljos...@apache.org > >>>>>>>> wrote: > >>>>>>>> > >>>>>>>>> We're slightly running out of time. I would propose we vote on > >>>>>>>>> the > >>>>>> basic > >>>>>>>>> principle and remain open to later additions. This feature is > >>>>>>>>> quite > >>>>>>>>> important to make the new Kafka Source that is developed as > >>>>>>>>> part of > >>>>>>>>> FLIP-27 useful. Otherwise we would have to use the legacy > >>>>>>>>> interfaces > >>>> in > >>>>>>>>> the newly added connector. > >>>>>>>>> > >>>>>>>>> I know that's a bit unorthodox but would everyone be OK with > >>>>>>>>> what's > >>>>>>>>> currently there and then we iterate? > >>>>>>>>> > >>>>>>>>> Best, > >>>>>>>>> Aljoscha > >>>>>>>>> > >>>>>>>>> On 11.05.20 13:57, Aljoscha Krettek wrote: > >>>>>>>>>> Ah, I meant to write this in my previous email, sorry about > >>>>>>>>>> that. > >>>>>>>>>> > >>>>>>>>>> The WatermarkStrategy, which is basically a factory for a > >>>>>>>>>> WatermarkGenerator is the replacement for the open() method. > >>>>>>>>>> This is > >>>>>>>> the > >>>>>>>>>> same strategy that was followed for StreamOperatorFactory, > >>>>>>>>>> which was > >>>>>>>>>> introduced to allow code generation in the Table API [1]. If > >>>>>>>>>> we need > >>>>>>>>>> metrics or other things we would add that as a parameter to the > >>>>>> factory > >>>>>>>>>> method. What do you think? > >>>>>>>>>> > >>>>>>>>>> Best, > >>>>>>>>>> Aljoscha > >>>>>>>>>> > >>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-11974 > >>>>>>>>>> > >>>>>>>>>> On 10.05.20 05:07, Jark Wu wrote: > >>>>>>>>>>> Hi, > >>>>>>>>>>> > >>>>>>>>>>> Regarding to the `open()/close()`, I think it's necessary for > >>>>>>>>>>> Table&SQL to > >>>>>>>>>>> compile the generated code. > >>>>>>>>>>> In Table&SQL, the watermark strategy and event-timestamp is > >>>>>>>>>>> defined > >>>>>>>>> using > >>>>>>>>>>> SQL expressions, we will > >>>>>>>>>>> translate and generate Java code for the expressions. If we > >>>>>>>>>>> have > >>>>>>>>>>> `open()/close()`, we don't need lazy initialization. > >>>>>>>>>>> Besides that, I can see a need to report some metrics, e.g. the > >>>>>>>> current > >>>>>>>>>>> watermark, the dirty timestamps (null value), etc. > >>>>>>>>>>> So I think a simple `open()/close()` with a context which > >>>>>>>>>>> can get > >>>>>>>>>>> MetricGroup is nice and not complex for the first version. > >>>>>>>>>>> > >>>>>>>>>>> Best, > >>>>>>>>>>> Jark > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> On Sun, 10 May 2020 at 00:50, Stephan Ewen <se...@apache.org> > >>>> wrote: > >>>>>>>>>>>> Thanks, Aljoscha, for picking this up. > >>>>>>>>>>>> > >>>>>>>>>>>> I agree with the approach of doing the here proposed set of > >>>> changes > >>>>>>>> for > >>>>>>>>>>>> now. It already makes things simpler and adds idleness support > >>>>>>>>>>>> everywhere. > >>>>>>>>>>>> > >>>>>>>>>>>> Rich functions and state always add complexity, let's do > >>>>>>>>>>>> this in a > >>>>>>>> next > >>>>>>>>>>>> step, if we have a really compelling case. > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> On Wed, Apr 29, 2020 at 7:24 PM Aljoscha Krettek < > >>>>>>>> aljos...@apache.org> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>>> Regarding the WatermarkGenerator (WG) interface itself. The > >>>>>> proposal > >>>>>>>>> is > >>>>>>>>>>>>> basically to turn emitting into a "flatMap", we give the > >>>>>>>>>>>>> WatermarkGenerator a "collector" (the WatermarkOutput) and > >>>>>>>>>>>>> the WG > >>>>>>>> can > >>>>>>>>>>>>> decide whether to output a watermark or not and can also > >>>>>>>>>>>>> mark the > >>>>>>>>>>>>> output > >>>>>>>>>>>>> as idle. Changing the interface to return a Watermark (as the > >>>>>>>> previous > >>>>>>>>>>>>> watermark assigner interface did) would not allow that > >>>> flexibility. > >>>>>>>>>>>>> Regarding checkpointing the watermark and keeping track of > >>>>>>>>>>>>> the > >>>>>>>> minimum > >>>>>>>>>>>>> watermark, this would be the responsibility of the > >>>>>>>>>>>>> framework (or > >>>>>> the > >>>>>>>>>>>>> KafkaConsumer in the current implementation). The > >>>>>>>>>>>>> user-supplied > >>>> WG > >>>>>>>>> does > >>>>>>>>>>>>> not need to make sure the watermark doesn't regress. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Regarding making the WG a "rich function", I can see the > >>>> potential > >>>>>>>>>>>>> benefit but I also see a lot of pitfalls. For example, how > >>>>>>>>>>>>> should > >>>>>>>> the > >>>>>>>>>>>>> watermark state be handled in the case of scale-in? It > >>>>>>>>>>>>> could be > >>>>>> made > >>>>>>>>> to > >>>>>>>>>>>>> work in the Kafka case by attaching the state to the > >>>>>>>>>>>>> partition > >>>>>> state > >>>>>>>>>>>>> that we keep, but then we have potential backwards > >>>>>>>>>>>>> compatibility > >>>>>>>>>>>>> problems also for the WM state. Does the WG usually need > >>>>>>>>>>>>> to keep > >>>>>> the > >>>>>>>>>>>>> state or might it be enough if the state is transient, > >>>>>>>>>>>>> i.e. if > >>>> you > >>>>>>>>> have > >>>>>>>>>>>>> a restart the WG would loose its histogram but it would > >>>>>>>>>>>>> rebuild > >>>> it > >>>>>>>>>>>>> quickly and you would get back to the same steady state as > >>>> before. > >>>>>>>>>>>>> Best, > >>>>>>>>>>>>> Aljoscha > >>>>>>>>>>>>> > >>>>>>>>>>>>> On 27.04.20 12:12, David Anderson wrote: > >>>>>>>>>>>>>> Overall I like this proposal; thanks for bringing it > >>>>>>>>>>>>>> forward, > >>>>>>>>>>>>>> Aljoscha. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I also like the idea of making the Watermark generator a > >>>>>>>>>>>>>> rich > >>>>>>>>> function > >>>>>>>>>>>> -- > >>>>>>>>>>>>>> this should make it more straightforward to implement > >>>>>>>>>>>>>> smarter > >>>>>>>>>>>>>> watermark > >>>>>>>>>>>>>> generators. Eg, one that uses state to keep statistics > >>>>>>>>>>>>>> about the > >>>>>>>>>>>>>> actual > >>>>>>>>>>>>>> out-of-orderness, and uses those statistics to implement a > >>>>>> variable > >>>>>>>>>>>>> delay. > >>>>>>>>>>>>>> David > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On Mon, Apr 27, 2020 at 11:44 AM Kostas Kloudas < > >>>>>>>> kklou...@gmail.com> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>> Hi Aljoscha, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Thanks for opening the discussion! > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> I have two comments on the FLIP: > >>>>>>>>>>>>>>> 1) we could add lifecycle methods to the Generator, i.e. > >>>> open()/ > >>>>>>>>>>>>>>> close(), probably with a Context as argument: I have not > >>>>>>>>>>>>>>> fully > >>>>>>>>>>>>>>> thought > >>>>>>>>>>>>>>> this through but I think that this is more aligned with the > >>>> rest > >>>>>>>> of > >>>>>>>>>>>>>>> our rich functions. In addition, it will allow, for > >>>>>>>>>>>>>>> example, to > >>>>>>>>>>>>>>> initialize the Watermark value, if we decide to > >>>>>>>>>>>>>>> checkpoint the > >>>>>>>>>>>>>>> watermark (see [1]) (I also do not know if Table/SQL > >>>>>>>>>>>>>>> needs to > >>>> do > >>>>>>>>>>>>>>> anything in the open()). > >>>>>>>>>>>>>>> 2) aligned with the above, and with the case where we > >>>>>>>>>>>>>>> want to > >>>>>>>>>>>>>>> checkpoint the watermark in mind, I am wondering about > >>>>>>>>>>>>>>> how we > >>>>>>>> could > >>>>>>>>>>>>>>> implement this in the future. In the FLIP, it is > >>>>>>>>>>>>>>> proposed to > >>>>>>>> expose > >>>>>>>>>>>>>>> the WatermarkOutput in the methods of the > >>>>>>>>>>>>>>> WatermarkGenerator. > >>>>>>>> Given > >>>>>>>>>>>>>>> that there is the implicit contract that watermarks are > >>>>>>>>>>>>>>> non-decreasing, the WatermarkOutput#emitWatermark() will > >>>>>>>>>>>>>>> have > >>>> (I > >>>>>>>>>>>>>>> assume) a check that will compare the last emitted WM > >>>>>>>>>>>>>>> against > >>>> the > >>>>>>>>>>>>>>> provided one, and emit it only if it is >=. If not, then we > >>>> risk > >>>>>>>>>>>>>>> having the user shooting himself on the foot if he/she > >>>>>>>> accidentally > >>>>>>>>>>>>>>> forgets the check. Given that the WatermarkGenerator and > >>>>>>>>>>>>>>> its > >>>>>>>>>>>>>>> caller do > >>>>>>>>>>>>>>> not know if the watermark was finally emitted or not (the > >>>>>>>>>>>>>>> WatermarkOutput#emitWatermark returns void), who will be > >>>>>>>> responsible > >>>>>>>>>>>>>>> for checkpointing the WM? > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Given this, why not having the methods as: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> public interface WatermarkGenerator<T> { > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Watermark onEvent(T event, long eventTimestamp, > >>>>>>>>> WatermarkOutput > >>>>>>>>>>>>>>> output); > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Watermark onPeriodicEmit(WatermarkOutput output); > >>>>>>>>>>>>>>> } > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> and the caller will be the one enforcing any invariants, > >>>>>>>>>>>>>>> such > >>>> as > >>>>>>>>>>>>>>> non-decreasing watermarks. In this way, the caller can > >>>> checkpoint > >>>>>>>>>>>>>>> anything that is needed as it will have complete > >>>>>>>>>>>>>>> knowledge as > >>>> to > >>>>>>>> if > >>>>>>>>>>>>>>> the WM was emitted or not. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> What do you think? > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>> Kostas > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-5601 > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On Tue, Apr 21, 2020 at 2:25 PM Timo Walther < > >>>> twal...@apache.org > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>> Thanks for the proposal Aljoscha. This is a very useful > >>>>>>>>> unification. > >>>>>>>>>>>> We > >>>>>>>>>>>>>>>> have considered this FLIP already in the interfaces for > >>>> FLIP-95 > >>>>>>>> [1] > >>>>>>>>>>>> and > >>>>>>>>>>>>>>>> look forward to update to the new unified watermark > >>>>>>>>>>>>>>>> generators > >>>>>>>> once > >>>>>>>>>>>>>>>> FLIP-126 has been accepted. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>> Timo > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> [1] https://github.com/apache/flink/pull/11692 > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On 20.04.20 18:10, Aljoscha Krettek wrote: > >>>>>>>>>>>>>>>>> Hi Everyone! > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> We would like to start a discussion on "FLIP-126: > >>>>>>>>>>>>>>>>> Unify (and > >>>>>>>>>>>> separate) > >>>>>>>>>>>>>>>>> Watermark Assigners" [1]. This work was started by > >>>>>>>>>>>>>>>>> Stephan in > >>>>>> an > >>>>>>>>>>>>>>>>> experimental branch. I expanded on that work to > >>>>>>>>>>>>>>>>> provide a PoC > >>>>>>>> for > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>>> changes proposed in this FLIP: [2]. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Currently, we have two different flavours of Watermark > >>>>>>>>>>>>>>>>> Assigners: AssignerWithPunctuatedWatermarks > >>>>>>>>>>>>>>>>> and AssignerWithPeriodicWatermarks. Both of them extend > >>>>>>>>>>>>>>>>> from TimestampAssigner. This means that sources that > >>>>>>>>>>>>>>>>> want to > >>>>>>>>>>>>>>>>> support > >>>>>>>>>>>>>>>>> watermark assignment/extraction in the source need to > >>>>>>>>>>>>>>>>> support > >>>>>>>> two > >>>>>>>>>>>>>>>>> separate interfaces, we have two operator > >>>>>>>>>>>>>>>>> implementations for > >>>>>>>> the > >>>>>>>>>>>>>>>>> different flavours. Also, this makes features such as > >>>>>>>>>>>>>>>>> generic > >>>>>>>>>>>> support > >>>>>>>>>>>>>>>>> for idleness detection more complicated to implemented > >>>> because > >>>>>>>> we > >>>>>>>>>>>>> again > >>>>>>>>>>>>>>>>> have to support two types of watermark assigners. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> In this FLIP we propose two things: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Unify the Watermark Assigners into one Interface > >>>>>>>>> WatermarkGenerator > >>>>>>>>>>>>>>>>> Separate this new interface from the TimestampAssigner > >>>>>>>>>>>>>>>>> The motivation for the first is to simplify future > >>>>>>>> implementations > >>>>>>>>>>>> and > >>>>>>>>>>>>>>>>> code duplication. The motivation for the second point is > >>>> again > >>>>>>>>> code > >>>>>>>>>>>>>>>>> deduplication, most assigners currently have to extend > >>>>>>>>>>>>>>>>> from > >>>>>> some > >>>>>>>>>>>> base > >>>>>>>>>>>>>>>>> timestamp extractor or duplicate the extraction logic, or > >>>> users > >>>>>>>>>>>>>>>>> have > >>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>> override an abstract method of the watermark assigner to > >>>>>> provide > >>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>> timestamp extraction logic. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Additionally, we propose to add a generic wrapping > >>>>>>>>>>>> WatermarkGenerator > >>>>>>>>>>>>>>>>> that provides idleness detection, i.e. it can mark a > >>>>>>>>>>>> stream/partition > >>>>>>>>>>>>>>> as > >>>>>>>>>>>>>>>>> idle if no data arrives after a configured timeout. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> The "unify and separate" part refers to the fact that > >>>>>>>>>>>>>>>>> we want > >>>>>> to > >>>>>>>>>>>> unify > >>>>>>>>>>>>>>>>> punctuated and periodic assigners but at the same time > >>>>>>>>>>>>>>>>> split > >>>>>> the > >>>>>>>>>>>>>>>>> timestamp assigner from the watermark generator. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Please find more details in the FLIP [1]. Looking > >>>>>>>>>>>>>>>>> forward to > >>>>>>>>>>>>>>>>> your feedback. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>>> Aljoscha > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> [1] > >>>>>>>>>>>>>>>>> > >>>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-126%3A+Unify+%28and+separate%29+Watermark+Assigners > >>>> > >>>>>>>>>>>>>>>>> [2] > >>>> https://github.com/aljoscha/flink/tree/stephan-event-time > >>>>>> > >>>> > >> > > > >