Hi everybody,

Timo proposed our FLIP-11 a bit more than three weeks ago.
I will update the status of the FLIP to accepted.

Thanks,
Fabian

2016-09-19 9:16 GMT+02:00 Timo Walther <twal...@apache.org>:

> Hi Jark,
>
> yes I think enough time has passed. We can start implementing the changes.
> What do you think Fabian?
>
> If there are no objections, I will create the subtasks in Jira today. For
> FLIP-11/1 I already have implemented a prototype, I just have to do some
> refactoring/documentation before opening a PR.
>
> Timo
>
>
> Am 18/09/16 um 04:46 schrieb Jark Wu:
>
> Hi all,
>>
>> It seems that there’s no objections to the window design. So could we
>> open subtasks to start working on it now ?
>>
>> - Jark Wu
>>
>> 在 2016年9月7日,下午4:29,Jark Wu <wuchong...@alibaba-inc.com> 写道:
>>>
>>> Hi Fabian,
>>>
>>> Thanks for sharing your ideas.
>>>
>>> They all make sense to me. Regarding to reassigning timestamp, I do not
>>> have an use case. I come up with this because DataStream has a
>>> TimestampAssigner :)
>>>
>>> +1 for this FLIP.
>>>
>>> - Jark Wu
>>>
>>> 在 2016年9月7日,下午2:59,Fabian Hueske <fhue...@gmail.com <mailto:
>>>> fhue...@gmail.com>> 写道:
>>>>
>>>> Hi,
>>>>
>>>> thanks for your comments and questions!
>>>> Actually, you are bringing up the points that Timo and I discussed the
>>>> most
>>>> when designing the FLIP ;-)
>>>>
>>>> - We also thought about the syntactic shortcut for running aggregates
>>>> like
>>>> you proposed (table.groupBy(‘a).select(…)). Our motivation to not allow
>>>> this shortcut is to prevent users from accidentally performing a
>>>> "dangerous" operation. The problem with unbounded sliding row-windows is
>>>> that their state does never expire. If you have an evolving key space,
>>>> you
>>>> will likely run into problems at some point because the operator state
>>>> grows too large. IMO, a row-window session is a better approach,
>>>> because it
>>>> defines a timeout after which state can be discarded. groupBy.select is
>>>> a
>>>> very common operation in batch but its semantics in streaming are very
>>>> different. In my opinion it makes sense to make users aware of these
>>>> differences through the API.
>>>>
>>>> - Reassigning timestamps and watermarks is a very delicate issue. You
>>>> are
>>>> right, that Calcite exposes this field which is necessary due to the
>>>> semantics of SQL. However, also in Calcite you cannot freely choose the
>>>> timestamp attribute for streaming queries (it must be a monotone or
>>>> quasi-monotone attribute) which is hard to reason about (and guarantee)
>>>> after a few operators have been applied. Streaming tables in Flink will
>>>> likely have a time attribute which is identical to the initial rowtime.
>>>> However, Flink does modify timestamps internally, e.g., for records that
>>>> are emitted from time windows, in order to ensure that consecutive
>>>> windows
>>>> perform as expected. Modify or reassign timestamps in the middle of a
>>>> job
>>>> can result in unexpected results which are very hard to reason about. Do
>>>> you have a concrete use case in mind for reassigning timestamps?
>>>>
>>>> - The idea to represent rowtime and systime as object is good. Our
>>>> motivation to go for reserved Scala symbols was to have a uniform syntax
>>>> with windows over streaming and batch tables. On batch tables you can
>>>> compute time windows basically over every time attribute (they are
>>>> treated
>>>> similar to grouping attributes with a bit of extra logic to extract the
>>>> grouping key for sliding and session windows). If you write
>>>> window(Tumble
>>>> over 10.minutes on 'rowtime) on a streaming table, 'rowtime would
>>>> indicate
>>>> event-time. On a batch table with a 'rowtime attribute, the same
>>>> operator
>>>> would be internally converted into a group by. By going for the object
>>>> approach we would lose this compatibility (or would need to introduce an
>>>> additional column attribute to specifiy the window attribute for batch
>>>> tables).
>>>>
>>>> As usual some of the design decisions are based on preferences.
>>>> Do they make sense to you? Let me know what you think.
>>>>
>>>> Best, Fabian
>>>>
>>>>
>>>> 2016-09-07 5:12 GMT+02:00 Jark Wu <wuchong...@alibaba-inc.com <mailto:
>>>> wuchong...@alibaba-inc.com>>:
>>>>
>>>> Hi all,
>>>>>
>>>>> I'm on vacation for about five days , sorry to have missed this great
>>>>> FLIP.
>>>>>
>>>>> Yes, the non-windowed aggregates is a special case of row-window. And
>>>>> the
>>>>> proposal looks really good.  Can we have a simplified form for the
>>>>> special
>>>>> case? Such as : table.groupBy(‘a).rowWindow(Sl
>>>>> ideRows.unboundedPreceding).select(…)
>>>>> can be simplified to  table.groupBy(‘a).select(…). The latter will
>>>>> actually
>>>>> call the former.
>>>>>
>>>>> Another question is about the rowtime. As the FLIP said, DataStream and
>>>>> StreamTableSource is responsible to assign timestamps and watermarks,
>>>>> furthermore “rowtime” and “systemtime” are not real column. IMO, it is
>>>>> different with Calcite’s rowtime, which is a real column in the table.
>>>>> In
>>>>> FLIP's way, we will lose some flexibility. Because the timestamp
>>>>> column may
>>>>> be created after some transformations or join operation, not created at
>>>>> beginning. So why do we have to define rowtime at beginning? (because
>>>>> of
>>>>> watermark?)     Can we have a way to define rowtime after source table
>>>>> like
>>>>> TimestampAssinger?
>>>>>
>>>>> Regarding to “allowLateness” method. I come up a trick that we can make
>>>>> ‘rowtime and ‘system to be a Scala object, not a symbol expression.
>>>>> The API
>>>>> will looks like this :
>>>>>
>>>>> window(Tumble over 10.minutes on rowtime allowLateness as ‘w)
>>>>>
>>>>> The implementation will look like this:
>>>>>
>>>>> class TumblingWindow(size: Expression) extends Window {
>>>>>   def on(time: rowtime.type): TumblingEventTimeWindow =
>>>>>       new TumblingEventTimeWindow(alias, ‘rowtime, size)        // has
>>>>> allowLateness() method
>>>>>
>>>>>   def on(time: systemtime.type): TumblingProcessingTimeWindow=
>>>>>      new TumblingProcessingTimeWindow(alias, ‘systemtime, size)
>>>>> // hasn’t allowLateness() method
>>>>> }
>>>>> object rowtime
>>>>> object systemtime
>>>>>
>>>>> What do you think about this?
>>>>>
>>>>> - Jark Wu
>>>>>
>>>>> 在 2016年9月6日,下午11:00,Timo Walther <twal...@apache.org <mailto:
>>>>>> twal...@apache.org>> 写道:
>>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> I thought about the API of the FLIP again. If we allow the
>>>>>> "systemtime"
>>>>>>
>>>>> attribute, we cannot implement a nice method chaining where the user
>>>>> can
>>>>> define a "allowLateness" only on event time. So even if the user
>>>>> expressed
>>>>> that "systemtime" is used we have to offer a "allowLateness" method
>>>>> because
>>>>> we have to assume that this attribute can also be the batch event time
>>>>> column, which is not very nice.
>>>>>
>>>>>> class TumblingWindow(size: Expression) extends Window {
>>>>>> def on(timeField: Expression): TumblingEventTimeWindow =
>>>>>>    new TumblingEventTimeWindow(alias, timeField, size) // has
>>>>>>
>>>>> allowLateness() method
>>>>>
>>>>>> }
>>>>>>
>>>>>> What do you think?
>>>>>>
>>>>>> Timo
>>>>>>
>>>>>>
>>>>>> Am 05/09/16 um 10:41 schrieb Fabian Hueske:
>>>>>>
>>>>>>> Hi Jark,
>>>>>>>
>>>>>>> you had asked for non-windowed aggregates in the Table API a few
>>>>>>> times.
>>>>>>> FLIP-11 proposes row-window aggregates which are a generalization of
>>>>>>> running aggregates (SlideRow unboundedPreceding).
>>>>>>>
>>>>>>> Can you have a look at the FLIP and give feedback whether this is
>>>>>>> what
>>>>>>>
>>>>>> you
>>>>>
>>>>>> are looking for?
>>>>>>> Improvement suggestions are very welcome as well.
>>>>>>>
>>>>>>> Thank you,
>>>>>>> Fabian
>>>>>>>
>>>>>>> 2016-09-01 16:12 GMT+02:00 Timo Walther <twal...@apache.org <mailto:
>>>>>>> twal...@apache.org>>:
>>>>>>>
>>>>>>> Hi all!
>>>>>>>>
>>>>>>>> Fabian and I worked on a FLIP for Stream Aggregations in the Table
>>>>>>>> API.
>>>>>>>> You can find the FLIP-11 here:
>>>>>>>>
>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-11% <
>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%>
>>>>>>>> 3A+Table+API+Stream+Aggregations
>>>>>>>>
>>>>>>>> Motivation for the FLIP:
>>>>>>>>
>>>>>>>> The Table API is a declarative API to define queries on static and
>>>>>>>> streaming tables. So far, only projection, selection, and union are
>>>>>>>> supported operations on streaming tables.
>>>>>>>>
>>>>>>>> This FLIP proposes to add support for different types of
>>>>>>>> aggregations
>>>>>>>>
>>>>>>> on
>>>>>
>>>>>> top of streaming tables. In particular, we seek to support:
>>>>>>>>
>>>>>>>> - Group-window aggregates, i.e., aggregates which are computed for a
>>>>>>>>
>>>>>>> group
>>>>>
>>>>>> of elements. A (time or row-count) window is required to bound the
>>>>>>>>
>>>>>>> infinite
>>>>>
>>>>>> input stream into a finite group.
>>>>>>>>
>>>>>>>> - Row-window aggregates, i.e., aggregates which are computed for
>>>>>>>> each
>>>>>>>>
>>>>>>> row,
>>>>>
>>>>>> based on a window (range) of preceding and succeeding rows.
>>>>>>>> Each type of aggregate shall be supported on keyed/grouped or
>>>>>>>> non-keyed/grouped data streams for streaming tables as well as batch
>>>>>>>>
>>>>>>> tables.
>>>>>
>>>>>> We are looking forward to your feedback.
>>>>>>>>
>>>>>>>> Timo
>>>>>>>>
>>>>>>>>
>>>>>> --
>>>>>> Freundliche Grüße / Kind Regards
>>>>>>
>>>>>> Timo Walther
>>>>>>
>>>>>> Follow me: @twalthr
>>>>>> https://www.linkedin.com/in/twalthr <https://www.linkedin.com/in/t
>>>>>> walthr>
>>>>>>
>>>>>
>>>>>
>>
>
> --
> Freundliche Grüße / Kind Regards
>
> Timo Walther
>
> Follow me: @twalthr
> https://www.linkedin.com/in/twalthr
>
>

Reply via email to