Hi all, It seems that there’s no objections to the window design. So could we open subtasks to start working on it now ?
- Jark Wu > 在 2016年9月7日,下午4:29,Jark Wu <wuchong...@alibaba-inc.com> 写道: > > Hi Fabian, > > Thanks for sharing your ideas. > > They all make sense to me. Regarding to reassigning timestamp, I do not have > an use case. I come up with this because DataStream has a TimestampAssigner :) > > +1 for this FLIP. > > - Jark Wu > >> 在 2016年9月7日,下午2:59,Fabian Hueske <fhue...@gmail.com >> <mailto:fhue...@gmail.com>> 写道: >> >> Hi, >> >> thanks for your comments and questions! >> Actually, you are bringing up the points that Timo and I discussed the most >> when designing the FLIP ;-) >> >> - We also thought about the syntactic shortcut for running aggregates like >> you proposed (table.groupBy(‘a).select(…)). Our motivation to not allow >> this shortcut is to prevent users from accidentally performing a >> "dangerous" operation. The problem with unbounded sliding row-windows is >> that their state does never expire. If you have an evolving key space, you >> will likely run into problems at some point because the operator state >> grows too large. IMO, a row-window session is a better approach, because it >> defines a timeout after which state can be discarded. groupBy.select is a >> very common operation in batch but its semantics in streaming are very >> different. In my opinion it makes sense to make users aware of these >> differences through the API. >> >> - Reassigning timestamps and watermarks is a very delicate issue. You are >> right, that Calcite exposes this field which is necessary due to the >> semantics of SQL. However, also in Calcite you cannot freely choose the >> timestamp attribute for streaming queries (it must be a monotone or >> quasi-monotone attribute) which is hard to reason about (and guarantee) >> after a few operators have been applied. Streaming tables in Flink will >> likely have a time attribute which is identical to the initial rowtime. >> However, Flink does modify timestamps internally, e.g., for records that >> are emitted from time windows, in order to ensure that consecutive windows >> perform as expected. Modify or reassign timestamps in the middle of a job >> can result in unexpected results which are very hard to reason about. Do >> you have a concrete use case in mind for reassigning timestamps? >> >> - The idea to represent rowtime and systime as object is good. Our >> motivation to go for reserved Scala symbols was to have a uniform syntax >> with windows over streaming and batch tables. On batch tables you can >> compute time windows basically over every time attribute (they are treated >> similar to grouping attributes with a bit of extra logic to extract the >> grouping key for sliding and session windows). If you write window(Tumble >> over 10.minutes on 'rowtime) on a streaming table, 'rowtime would indicate >> event-time. On a batch table with a 'rowtime attribute, the same operator >> would be internally converted into a group by. By going for the object >> approach we would lose this compatibility (or would need to introduce an >> additional column attribute to specifiy the window attribute for batch >> tables). >> >> As usual some of the design decisions are based on preferences. >> Do they make sense to you? Let me know what you think. >> >> Best, Fabian >> >> >> 2016-09-07 5:12 GMT+02:00 Jark Wu <wuchong...@alibaba-inc.com >> <mailto:wuchong...@alibaba-inc.com>>: >> >>> Hi all, >>> >>> I'm on vacation for about five days , sorry to have missed this great FLIP. >>> >>> Yes, the non-windowed aggregates is a special case of row-window. And the >>> proposal looks really good. Can we have a simplified form for the special >>> case? Such as : >>> table.groupBy(‘a).rowWindow(SlideRows.unboundedPreceding).select(…) >>> can be simplified to table.groupBy(‘a).select(…). The latter will actually >>> call the former. >>> >>> Another question is about the rowtime. As the FLIP said, DataStream and >>> StreamTableSource is responsible to assign timestamps and watermarks, >>> furthermore “rowtime” and “systemtime” are not real column. IMO, it is >>> different with Calcite’s rowtime, which is a real column in the table. In >>> FLIP's way, we will lose some flexibility. Because the timestamp column may >>> be created after some transformations or join operation, not created at >>> beginning. So why do we have to define rowtime at beginning? (because of >>> watermark?) Can we have a way to define rowtime after source table like >>> TimestampAssinger? >>> >>> Regarding to “allowLateness” method. I come up a trick that we can make >>> ‘rowtime and ‘system to be a Scala object, not a symbol expression. The API >>> will looks like this : >>> >>> window(Tumble over 10.minutes on rowtime allowLateness as ‘w) >>> >>> The implementation will look like this: >>> >>> class TumblingWindow(size: Expression) extends Window { >>> def on(time: rowtime.type): TumblingEventTimeWindow = >>> new TumblingEventTimeWindow(alias, ‘rowtime, size) // has >>> allowLateness() method >>> >>> def on(time: systemtime.type): TumblingProcessingTimeWindow= >>> new TumblingProcessingTimeWindow(alias, ‘systemtime, size) >>> // hasn’t allowLateness() method >>> } >>> object rowtime >>> object systemtime >>> >>> What do you think about this? >>> >>> - Jark Wu >>> >>>> 在 2016年9月6日,下午11:00,Timo Walther <twal...@apache.org >>>> <mailto:twal...@apache.org>> 写道: >>>> >>>> Hi all, >>>> >>>> I thought about the API of the FLIP again. If we allow the "systemtime" >>> attribute, we cannot implement a nice method chaining where the user can >>> define a "allowLateness" only on event time. So even if the user expressed >>> that "systemtime" is used we have to offer a "allowLateness" method because >>> we have to assume that this attribute can also be the batch event time >>> column, which is not very nice. >>>> >>>> class TumblingWindow(size: Expression) extends Window { >>>> def on(timeField: Expression): TumblingEventTimeWindow = >>>> new TumblingEventTimeWindow(alias, timeField, size) // has >>> allowLateness() method >>>> } >>>> >>>> What do you think? >>>> >>>> Timo >>>> >>>> >>>> Am 05/09/16 um 10:41 schrieb Fabian Hueske: >>>>> Hi Jark, >>>>> >>>>> you had asked for non-windowed aggregates in the Table API a few times. >>>>> FLIP-11 proposes row-window aggregates which are a generalization of >>>>> running aggregates (SlideRow unboundedPreceding). >>>>> >>>>> Can you have a look at the FLIP and give feedback whether this is what >>> you >>>>> are looking for? >>>>> Improvement suggestions are very welcome as well. >>>>> >>>>> Thank you, >>>>> Fabian >>>>> >>>>> 2016-09-01 16:12 GMT+02:00 Timo Walther <twal...@apache.org >>>>> <mailto:twal...@apache.org>>: >>>>> >>>>>> Hi all! >>>>>> >>>>>> Fabian and I worked on a FLIP for Stream Aggregations in the Table API. >>>>>> You can find the FLIP-11 here: >>>>>> >>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-11% >>>>>> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%> >>>>>> 3A+Table+API+Stream+Aggregations >>>>>> >>>>>> Motivation for the FLIP: >>>>>> >>>>>> The Table API is a declarative API to define queries on static and >>>>>> streaming tables. So far, only projection, selection, and union are >>>>>> supported operations on streaming tables. >>>>>> >>>>>> This FLIP proposes to add support for different types of aggregations >>> on >>>>>> top of streaming tables. In particular, we seek to support: >>>>>> >>>>>> - Group-window aggregates, i.e., aggregates which are computed for a >>> group >>>>>> of elements. A (time or row-count) window is required to bound the >>> infinite >>>>>> input stream into a finite group. >>>>>> >>>>>> - Row-window aggregates, i.e., aggregates which are computed for each >>> row, >>>>>> based on a window (range) of preceding and succeeding rows. >>>>>> Each type of aggregate shall be supported on keyed/grouped or >>>>>> non-keyed/grouped data streams for streaming tables as well as batch >>> tables. >>>>>> >>>>>> We are looking forward to your feedback. >>>>>> >>>>>> Timo >>>>>> >>>> >>>> >>>> -- >>>> Freundliche Grüße / Kind Regards >>>> >>>> Timo Walther >>>> >>>> Follow me: @twalthr >>>> https://www.linkedin.com/in/twalthr <https://www.linkedin.com/in/twalthr> >>> >>> >