Hi all,

It seems that there’s no objections to the window design. So could we open 
subtasks to start working on it now ?

- Jark Wu 

> 在 2016年9月7日,下午4:29,Jark Wu <wuchong...@alibaba-inc.com> 写道:
> Hi Fabian, 
> Thanks for sharing your ideas. 
> They all make sense to me. Regarding to reassigning timestamp, I do not have 
> an use case. I come up with this because DataStream has a TimestampAssigner :)
> +1 for this FLIP. 
> - Jark Wu 
>> 在 2016年9月7日,下午2:59,Fabian Hueske <fhue...@gmail.com 
>> <mailto:fhue...@gmail.com>> 写道:
>> Hi,
>> thanks for your comments and questions!
>> Actually, you are bringing up the points that Timo and I discussed the most
>> when designing the FLIP ;-)
>> - We also thought about the syntactic shortcut for running aggregates like
>> you proposed (table.groupBy(‘a).select(…)). Our motivation to not allow
>> this shortcut is to prevent users from accidentally performing a
>> "dangerous" operation. The problem with unbounded sliding row-windows is
>> that their state does never expire. If you have an evolving key space, you
>> will likely run into problems at some point because the operator state
>> grows too large. IMO, a row-window session is a better approach, because it
>> defines a timeout after which state can be discarded. groupBy.select is a
>> very common operation in batch but its semantics in streaming are very
>> different. In my opinion it makes sense to make users aware of these
>> differences through the API.
>> - Reassigning timestamps and watermarks is a very delicate issue. You are
>> right, that Calcite exposes this field which is necessary due to the
>> semantics of SQL. However, also in Calcite you cannot freely choose the
>> timestamp attribute for streaming queries (it must be a monotone or
>> quasi-monotone attribute) which is hard to reason about (and guarantee)
>> after a few operators have been applied. Streaming tables in Flink will
>> likely have a time attribute which is identical to the initial rowtime.
>> However, Flink does modify timestamps internally, e.g., for records that
>> are emitted from time windows, in order to ensure that consecutive windows
>> perform as expected. Modify or reassign timestamps in the middle of a job
>> can result in unexpected results which are very hard to reason about. Do
>> you have a concrete use case in mind for reassigning timestamps?
>> - The idea to represent rowtime and systime as object is good. Our
>> motivation to go for reserved Scala symbols was to have a uniform syntax
>> with windows over streaming and batch tables. On batch tables you can
>> compute time windows basically over every time attribute (they are treated
>> similar to grouping attributes with a bit of extra logic to extract the
>> grouping key for sliding and session windows). If you write window(Tumble
>> over 10.minutes on 'rowtime) on a streaming table, 'rowtime would indicate
>> event-time. On a batch table with a 'rowtime attribute, the same operator
>> would be internally converted into a group by. By going for the object
>> approach we would lose this compatibility (or would need to introduce an
>> additional column attribute to specifiy the window attribute for batch
>> tables).
>> As usual some of the design decisions are based on preferences.
>> Do they make sense to you? Let me know what you think.
>> Best, Fabian
>> 2016-09-07 5:12 GMT+02:00 Jark Wu <wuchong...@alibaba-inc.com 
>> <mailto:wuchong...@alibaba-inc.com>>:
>>> Hi all,
>>> I'm on vacation for about five days , sorry to have missed this great FLIP.
>>> Yes, the non-windowed aggregates is a special case of row-window. And the
>>> proposal looks really good.  Can we have a simplified form for the special
>>> case? Such as : 
>>> table.groupBy(‘a).rowWindow(SlideRows.unboundedPreceding).select(…)
>>> can be simplified to  table.groupBy(‘a).select(…). The latter will actually
>>> call the former.
>>> Another question is about the rowtime. As the FLIP said, DataStream and
>>> StreamTableSource is responsible to assign timestamps and watermarks,
>>> furthermore “rowtime” and “systemtime” are not real column. IMO, it is
>>> different with Calcite’s rowtime, which is a real column in the table. In
>>> FLIP's way, we will lose some flexibility. Because the timestamp column may
>>> be created after some transformations or join operation, not created at
>>> beginning. So why do we have to define rowtime at beginning? (because of
>>> watermark?)     Can we have a way to define rowtime after source table like
>>> TimestampAssinger?
>>> Regarding to “allowLateness” method. I come up a trick that we can make
>>> ‘rowtime and ‘system to be a Scala object, not a symbol expression. The API
>>> will looks like this :
>>> window(Tumble over 10.minutes on rowtime allowLateness as ‘w)
>>> The implementation will look like this:
>>> class TumblingWindow(size: Expression) extends Window {
>>>  def on(time: rowtime.type): TumblingEventTimeWindow =
>>>      new TumblingEventTimeWindow(alias, ‘rowtime, size)        // has
>>> allowLateness() method
>>>  def on(time: systemtime.type): TumblingProcessingTimeWindow=
>>>     new TumblingProcessingTimeWindow(alias, ‘systemtime, size)
>>> // hasn’t allowLateness() method
>>> }
>>> object rowtime
>>> object systemtime
>>> What do you think about this?
>>> - Jark Wu
>>>> 在 2016年9月6日,下午11:00,Timo Walther <twal...@apache.org 
>>>> <mailto:twal...@apache.org>> 写道:
>>>> Hi all,
>>>> I thought about the API of the FLIP again. If we allow the "systemtime"
>>> attribute, we cannot implement a nice method chaining where the user can
>>> define a "allowLateness" only on event time. So even if the user expressed
>>> that "systemtime" is used we have to offer a "allowLateness" method because
>>> we have to assume that this attribute can also be the batch event time
>>> column, which is not very nice.
>>>> class TumblingWindow(size: Expression) extends Window {
>>>> def on(timeField: Expression): TumblingEventTimeWindow =
>>>>   new TumblingEventTimeWindow(alias, timeField, size) // has
>>> allowLateness() method
>>>> }
>>>> What do you think?
>>>> Timo
>>>> Am 05/09/16 um 10:41 schrieb Fabian Hueske:
>>>>> Hi Jark,
>>>>> you had asked for non-windowed aggregates in the Table API a few times.
>>>>> FLIP-11 proposes row-window aggregates which are a generalization of
>>>>> running aggregates (SlideRow unboundedPreceding).
>>>>> Can you have a look at the FLIP and give feedback whether this is what
>>> you
>>>>> are looking for?
>>>>> Improvement suggestions are very welcome as well.
>>>>> Thank you,
>>>>> Fabian
>>>>> 2016-09-01 16:12 GMT+02:00 Timo Walther <twal...@apache.org 
>>>>> <mailto:twal...@apache.org>>:
>>>>>> Hi all!
>>>>>> Fabian and I worked on a FLIP for Stream Aggregations in the Table API.
>>>>>> You can find the FLIP-11 here:
>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-11% 
>>>>>> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%>
>>>>>> 3A+Table+API+Stream+Aggregations
>>>>>> Motivation for the FLIP:
>>>>>> The Table API is a declarative API to define queries on static and
>>>>>> streaming tables. So far, only projection, selection, and union are
>>>>>> supported operations on streaming tables.
>>>>>> This FLIP proposes to add support for different types of aggregations
>>> on
>>>>>> top of streaming tables. In particular, we seek to support:
>>>>>> - Group-window aggregates, i.e., aggregates which are computed for a
>>> group
>>>>>> of elements. A (time or row-count) window is required to bound the
>>> infinite
>>>>>> input stream into a finite group.
>>>>>> - Row-window aggregates, i.e., aggregates which are computed for each
>>> row,
>>>>>> based on a window (range) of preceding and succeeding rows.
>>>>>> Each type of aggregate shall be supported on keyed/grouped or
>>>>>> non-keyed/grouped data streams for streaming tables as well as batch
>>> tables.
>>>>>> We are looking forward to your feedback.
>>>>>> Timo
>>>> --
>>>> Freundliche Grüße / Kind Regards
>>>> Timo Walther
>>>> Follow me: @twalthr
>>>> https://www.linkedin.com/in/twalthr <https://www.linkedin.com/in/twalthr>

Reply via email to