Hi everybody, Timo proposed our FLIP-11 a bit more than three weeks ago. I will update the status of the FLIP to accepted.
Thanks, Fabian 2016-09-19 9:16 GMT+02:00 Timo Walther <twal...@apache.org>: > Hi Jark, > > yes I think enough time has passed. We can start implementing the changes. > What do you think Fabian? > > If there are no objections, I will create the subtasks in Jira today. For > FLIP-11/1 I already have implemented a prototype, I just have to do some > refactoring/documentation before opening a PR. > > Timo > > > Am 18/09/16 um 04:46 schrieb Jark Wu: > > Hi all, >> >> It seems that there’s no objections to the window design. So could we >> open subtasks to start working on it now ? >> >> - Jark Wu >> >> 在 2016年9月7日,下午4:29,Jark Wu <wuchong...@alibaba-inc.com> 写道: >>> >>> Hi Fabian, >>> >>> Thanks for sharing your ideas. >>> >>> They all make sense to me. Regarding to reassigning timestamp, I do not >>> have an use case. I come up with this because DataStream has a >>> TimestampAssigner :) >>> >>> +1 for this FLIP. >>> >>> - Jark Wu >>> >>> 在 2016年9月7日,下午2:59,Fabian Hueske <fhue...@gmail.com <mailto: >>>> fhue...@gmail.com>> 写道: >>>> >>>> Hi, >>>> >>>> thanks for your comments and questions! >>>> Actually, you are bringing up the points that Timo and I discussed the >>>> most >>>> when designing the FLIP ;-) >>>> >>>> - We also thought about the syntactic shortcut for running aggregates >>>> like >>>> you proposed (table.groupBy(‘a).select(…)). Our motivation to not allow >>>> this shortcut is to prevent users from accidentally performing a >>>> "dangerous" operation. The problem with unbounded sliding row-windows is >>>> that their state does never expire. If you have an evolving key space, >>>> you >>>> will likely run into problems at some point because the operator state >>>> grows too large. IMO, a row-window session is a better approach, >>>> because it >>>> defines a timeout after which state can be discarded. groupBy.select is >>>> a >>>> very common operation in batch but its semantics in streaming are very >>>> different. In my opinion it makes sense to make users aware of these >>>> differences through the API. >>>> >>>> - Reassigning timestamps and watermarks is a very delicate issue. You >>>> are >>>> right, that Calcite exposes this field which is necessary due to the >>>> semantics of SQL. However, also in Calcite you cannot freely choose the >>>> timestamp attribute for streaming queries (it must be a monotone or >>>> quasi-monotone attribute) which is hard to reason about (and guarantee) >>>> after a few operators have been applied. Streaming tables in Flink will >>>> likely have a time attribute which is identical to the initial rowtime. >>>> However, Flink does modify timestamps internally, e.g., for records that >>>> are emitted from time windows, in order to ensure that consecutive >>>> windows >>>> perform as expected. Modify or reassign timestamps in the middle of a >>>> job >>>> can result in unexpected results which are very hard to reason about. Do >>>> you have a concrete use case in mind for reassigning timestamps? >>>> >>>> - The idea to represent rowtime and systime as object is good. Our >>>> motivation to go for reserved Scala symbols was to have a uniform syntax >>>> with windows over streaming and batch tables. On batch tables you can >>>> compute time windows basically over every time attribute (they are >>>> treated >>>> similar to grouping attributes with a bit of extra logic to extract the >>>> grouping key for sliding and session windows). If you write >>>> window(Tumble >>>> over 10.minutes on 'rowtime) on a streaming table, 'rowtime would >>>> indicate >>>> event-time. On a batch table with a 'rowtime attribute, the same >>>> operator >>>> would be internally converted into a group by. By going for the object >>>> approach we would lose this compatibility (or would need to introduce an >>>> additional column attribute to specifiy the window attribute for batch >>>> tables). >>>> >>>> As usual some of the design decisions are based on preferences. >>>> Do they make sense to you? Let me know what you think. >>>> >>>> Best, Fabian >>>> >>>> >>>> 2016-09-07 5:12 GMT+02:00 Jark Wu <wuchong...@alibaba-inc.com <mailto: >>>> wuchong...@alibaba-inc.com>>: >>>> >>>> Hi all, >>>>> >>>>> I'm on vacation for about five days , sorry to have missed this great >>>>> FLIP. >>>>> >>>>> Yes, the non-windowed aggregates is a special case of row-window. And >>>>> the >>>>> proposal looks really good. Can we have a simplified form for the >>>>> special >>>>> case? Such as : table.groupBy(‘a).rowWindow(Sl >>>>> ideRows.unboundedPreceding).select(…) >>>>> can be simplified to table.groupBy(‘a).select(…). The latter will >>>>> actually >>>>> call the former. >>>>> >>>>> Another question is about the rowtime. As the FLIP said, DataStream and >>>>> StreamTableSource is responsible to assign timestamps and watermarks, >>>>> furthermore “rowtime” and “systemtime” are not real column. IMO, it is >>>>> different with Calcite’s rowtime, which is a real column in the table. >>>>> In >>>>> FLIP's way, we will lose some flexibility. Because the timestamp >>>>> column may >>>>> be created after some transformations or join operation, not created at >>>>> beginning. So why do we have to define rowtime at beginning? (because >>>>> of >>>>> watermark?) Can we have a way to define rowtime after source table >>>>> like >>>>> TimestampAssinger? >>>>> >>>>> Regarding to “allowLateness” method. I come up a trick that we can make >>>>> ‘rowtime and ‘system to be a Scala object, not a symbol expression. >>>>> The API >>>>> will looks like this : >>>>> >>>>> window(Tumble over 10.minutes on rowtime allowLateness as ‘w) >>>>> >>>>> The implementation will look like this: >>>>> >>>>> class TumblingWindow(size: Expression) extends Window { >>>>> def on(time: rowtime.type): TumblingEventTimeWindow = >>>>> new TumblingEventTimeWindow(alias, ‘rowtime, size) // has >>>>> allowLateness() method >>>>> >>>>> def on(time: systemtime.type): TumblingProcessingTimeWindow= >>>>> new TumblingProcessingTimeWindow(alias, ‘systemtime, size) >>>>> // hasn’t allowLateness() method >>>>> } >>>>> object rowtime >>>>> object systemtime >>>>> >>>>> What do you think about this? >>>>> >>>>> - Jark Wu >>>>> >>>>> 在 2016年9月6日,下午11:00,Timo Walther <twal...@apache.org <mailto: >>>>>> twal...@apache.org>> 写道: >>>>>> >>>>>> Hi all, >>>>>> >>>>>> I thought about the API of the FLIP again. If we allow the >>>>>> "systemtime" >>>>>> >>>>> attribute, we cannot implement a nice method chaining where the user >>>>> can >>>>> define a "allowLateness" only on event time. So even if the user >>>>> expressed >>>>> that "systemtime" is used we have to offer a "allowLateness" method >>>>> because >>>>> we have to assume that this attribute can also be the batch event time >>>>> column, which is not very nice. >>>>> >>>>>> class TumblingWindow(size: Expression) extends Window { >>>>>> def on(timeField: Expression): TumblingEventTimeWindow = >>>>>> new TumblingEventTimeWindow(alias, timeField, size) // has >>>>>> >>>>> allowLateness() method >>>>> >>>>>> } >>>>>> >>>>>> What do you think? >>>>>> >>>>>> Timo >>>>>> >>>>>> >>>>>> Am 05/09/16 um 10:41 schrieb Fabian Hueske: >>>>>> >>>>>>> Hi Jark, >>>>>>> >>>>>>> you had asked for non-windowed aggregates in the Table API a few >>>>>>> times. >>>>>>> FLIP-11 proposes row-window aggregates which are a generalization of >>>>>>> running aggregates (SlideRow unboundedPreceding). >>>>>>> >>>>>>> Can you have a look at the FLIP and give feedback whether this is >>>>>>> what >>>>>>> >>>>>> you >>>>> >>>>>> are looking for? >>>>>>> Improvement suggestions are very welcome as well. >>>>>>> >>>>>>> Thank you, >>>>>>> Fabian >>>>>>> >>>>>>> 2016-09-01 16:12 GMT+02:00 Timo Walther <twal...@apache.org <mailto: >>>>>>> twal...@apache.org>>: >>>>>>> >>>>>>> Hi all! >>>>>>>> >>>>>>>> Fabian and I worked on a FLIP for Stream Aggregations in the Table >>>>>>>> API. >>>>>>>> You can find the FLIP-11 here: >>>>>>>> >>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-11% < >>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%> >>>>>>>> 3A+Table+API+Stream+Aggregations >>>>>>>> >>>>>>>> Motivation for the FLIP: >>>>>>>> >>>>>>>> The Table API is a declarative API to define queries on static and >>>>>>>> streaming tables. So far, only projection, selection, and union are >>>>>>>> supported operations on streaming tables. >>>>>>>> >>>>>>>> This FLIP proposes to add support for different types of >>>>>>>> aggregations >>>>>>>> >>>>>>> on >>>>> >>>>>> top of streaming tables. In particular, we seek to support: >>>>>>>> >>>>>>>> - Group-window aggregates, i.e., aggregates which are computed for a >>>>>>>> >>>>>>> group >>>>> >>>>>> of elements. A (time or row-count) window is required to bound the >>>>>>>> >>>>>>> infinite >>>>> >>>>>> input stream into a finite group. >>>>>>>> >>>>>>>> - Row-window aggregates, i.e., aggregates which are computed for >>>>>>>> each >>>>>>>> >>>>>>> row, >>>>> >>>>>> based on a window (range) of preceding and succeeding rows. >>>>>>>> Each type of aggregate shall be supported on keyed/grouped or >>>>>>>> non-keyed/grouped data streams for streaming tables as well as batch >>>>>>>> >>>>>>> tables. >>>>> >>>>>> We are looking forward to your feedback. >>>>>>>> >>>>>>>> Timo >>>>>>>> >>>>>>>> >>>>>> -- >>>>>> Freundliche Grüße / Kind Regards >>>>>> >>>>>> Timo Walther >>>>>> >>>>>> Follow me: @twalthr >>>>>> https://www.linkedin.com/in/twalthr <https://www.linkedin.com/in/t >>>>>> walthr> >>>>>> >>>>> >>>>> >> > > -- > Freundliche Grüße / Kind Regards > > Timo Walther > > Follow me: @twalthr > https://www.linkedin.com/in/twalthr > >