Thanks for your quick reply, Fabian.

I have a few minor comments&suggestions:

<GroupBy with window>
- Agree that we should consider GroupBy without window after the new SQL
proposal is settled down.

<GroupBy with window>
- For Java API, we can keep window() call, and put window alias into
Groupby clause. This can be also applied to rowwindow case.

<RowWindows> & <partitionby() for rowwindow>
-+1 to support replace groupby() by partitionby(). BTW, in the case of
over, instead of partitionby, are we going to support orderby? If yes, I
would suggest to define rowwindow as  rowwindow(PartionByParaType, OrderBy
ParaType, WindowParaType).

So
- moving windows into the groupBy() call :   +1
- making over() for rowWindow() with a single window definition.
- additionally allowing window definitions in over():  +1 yes for scala,
but use alias for java API.
- using partitionBy() instead of groupBy() for row windows?: +1, but better
to consider orderby, it may be even better to move partitionBy() into
rowwindow.

Regards,
Shaoxuan


On Thu, Oct 13, 2016 at 6:05 PM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi everybody,
>
> happy to see a good discussion here :-)
> I'll reply to Shaoxuan's mail first and comment on Zhangrucong question
> in a separate mail.
>
> Shaoxuan, thanks for the suggestions! I think we all agree that for SQL we
> should definitely follow the standard (batch) SQL syntax.
> In my opinion, the Table API does not necessarily have to be as close as
> possible to SQL but should try to make a few things easier and also safer
> (easier is of course subjective).
>
> - GroupBy without windows: These are currently intentionally not supported
> and also not part of FLIP-11. Our motivation for not supporting this, is to
> guard the user from defining a query that fails when being executed due to
> a very memory consuming operation. FLIP-11 provides a way to define such a
> query as a sliding row window with unbounded preceding rows. With the
> upcoming SQL proposal, queries that consume unbounded memory should be
> identified and rejected. I would be in favor of allowing groupBy without
> windows once the guarding mechanism are in place.
>
> - GroupBy with window: I think this is a question of taste. Having a
> window() call, makes the feature more explicit in my opinion. However, I'm
> not opposed to move the windows into the groupBy clause.
> Implementation-wise it should be easy to move the window definition into to
> groupBy clause for the Scala Table API. For the Java Table API we would
> need to extend the parser quite a bit because windows would need to be
> defined as Strings and not via objects.
>
> - RowWindows: The rowWindow() call mimics the standard SQL WINDOW clause
> (implemented by PostgreSQL and Calcite) which allows to have "reusable"
> window definitions. I think this is a desirable feature. In the FLIP-11
> proposal the over() clause in select() refers to the predefined windows
> with aliases. In case only one window is defined, the over() clause is
> optional and the same (and only) window is applied to all aggregates. I
> think we can make the over() call mandatory to have the windowing more
> explicit. It should also be possible to extend the over clause to directly
> accept RowWindows instead of window aliases. I would not make this a
> priority at the moment, but a feature that could be later added, because
> rowWindow() and over() cover all cases. Similar as for GroupBy with
> windows, we would need to extend the parser for the Java Table API though.
>
> Finally, I have an own suggestion:
> In FLIP-11, groupBy() is  used to define the partitioning of RowWindows. I
> think this should be changed to partitionBy() because groupBy() groups data
> and applies an aggregation to all rows of a group which is not happening
> here. In original SQL, the OVER clause features a PARTITION BY clause. We
> are moving this out of the window definition, i.e., OVER clause, to enforce
> the same partitioning for all windows (different partitionings would be a
> challenge to execute in a parallel system).
>
> @Timo and all: What do you think about:
>
> - moving windows into the groupBy() call
> - making over() for rowWindow() with a single window definition
> - additionally allowing window definitions in over()
> - using partitionBy() instead of groupBy() for row windows?
>
> Best, Fabian
>
> 2016-10-13 11:10 GMT+02:00 Zhangrucong <zhangruc...@huawei.com>:
>
>> Hi shaoxuan:
>>
>> I think,  the streamsql must be excuted in table environment. So I call
>> this table API ‘s StreamSQL. What do you call for this, stream Table API or
>> streamsql? It is fu
>>
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>> val tblEnv = TableEnvironment.getTableEnvironment(env)
>> val ds: DataStream[(String,Long, Long)] = env.readTextFile("/home/demo")
>> tblEnv.registerDataStream("Order", ds, 'userID, 'count, 'num)
>>     .map(f=>(f, 1L, 1L))
>> val sql = tblEnv.sql("SELECT Stream * FROM Order WHERE userID='A'")
>>
>> So in my opinion, the grammar which is marked red should be compatible
>> with calcite's StreamSQL grammar.
>>
>> By the way,  thanks very much for telling me the modified content in
>> Flink StreamSQL. I will look the new proposal .
>>
>> Thanks!
>> 发件人: Sean Wang [mailto:wshaox...@gmail.com]
>> 发送时间: 2016年10月13日 16:29
>> 收件人: dev@flink.apache.org; Zhangrucong
>> 主题: Re: RE:[DISCUSS] FLIP-11: Table API Stream Aggregations
>>
>> Hi  zhangrucong,
>> I am not sure what you mean by "table API'S StreamSQL", I guess you mean
>> "stream TableAPI"?
>> TableAPI should be compatible with calcite SQL. (By compatible, My
>> understanding is that both TableAPI and SQL will be translated to the same
>> logical plan - the same set of REL and REX).
>> BTW, please note that we recently have merged a change to remove STREAM
>> keyword for flink stream SQL(FLINK-4546). In our opinion, batch and stream
>> are not necessarily to be differentiated at the SQL level. The major
>> difference between batch and stream is "WHEN and HOW to emit the result".
>> We have been working on a new proposal with Fabian on this change. I
>> guess it will be sent out for review very soon.
>>
>> Regards,
>> Shaoxuan
>>
>>
>> On Thu, Oct 13, 2016 at 2:29 PM, Zhangrucong <zhangruc...@huawei.com
>> <mailto:zhangruc...@huawei.com>> wrote:
>> Hi shaoxuan:
>> Does the table API'S StreamSQL grammar is compatible with calcite's
>> StreamSQL grammar?
>>
>>
>> 1、In calcite, the tumble window is realized by using function tumble or
>> hop. And the function must be used with group by, like this:
>>
>> SELECT
>>   TUMBLE_END(rowtime, INTERVAL '30' MINUTE, TIME '0:12') AS rowtime,
>>   productId,
>>   COUNT(*) AS c,
>>   SUM(units) AS units
>> FROM Orders
>> GROUP BY TUMBLE(rowtime, INTERVAL '30' MINUTE, TIME '0:12'),
>>   productId;
>>
>> 2、 The sliding window uses keywords "window" and "over". Like this:
>>
>> SELECT  *
>> FROM (
>>   SELECT STREAM rowtime,
>>     productId,
>>     units,
>>     AVG(units) OVER product (RANGE INTERVAL '10' MINUTE PRECEDING) AS m10,
>>     AVG(units) OVER product (RANGE INTERVAL '7' DAY PRECEDING) AS d7
>>   FROM Orders
>>   WINDOW product AS (
>>     ORDER BY rowtime
>>     PARTITION BY productId))
>>
>>
>>
>> Thanks!
>>
>> -----邮件原件-----
>> 发件人: 王绍翾(大沙) [mailto:shaoxuan....@alibaba-inc.com<mailto:shaoxuan.wsx@ali
>> baba-inc.com>]
>> 发送时间: 2016年10月13日 2:03
>> 收件人: dev@flink.apache.org<mailto:dev@flink.apache.org>
>> 主题: RE:[DISCUSS] FLIP-11: Table API Stream Aggregations
>>
>> Hi Fabian, Timo, and Jark.Thanks for kicking off this FLIP. This is a
>> really great and promising proposal. I have a few comments to the "window"
>> operator proposed in this FLIP (I am hoping it is not too late to bring up
>> this). First, window is not always needed for the stream aggregation. There
>> are cases where we want do an aggreation on a stream, while the query/emit
>> strategy decides when to emit a streaming output. Second, window is needed
>> when we want do an aggregation for a certain rage, but window is not an
>> operator. We basically use window to define the range for aggregation. In
>> tableAPI, a window should be defined together with "groupby" and "select"
>> operators, either inside a "groupby" operator or after an "over" clause in
>> "select" operator. This will make the TableAPI in the similar manner as SQL.
>> For instance,[A groupby without window]
>> <Table API>
>> val res = tab
>> .groupBy(‘a)
>> .select(‘a, ‘b.sum)
>> <SQL>
>> SELECT a, SUM(b)
>> FROM tab
>> GROUP BY a
>> [A tumble window inside groupby]
>> <Table API>val res = tab
>> .groupBy(‘a, tumble(10.minutes, ‘rowtime)) .select(‘a, ‘b.sum)
>> <SQL>SELECT a, SUM(b)FROM tab GROUP BY a, TUMBLE(10.minutes , ‘rowtime) [A
>> row tumble window after OVER] <Table API>.groupby('a) //optional
>> .select(‘a, ‘b.count over rowTumble(10.minutes, ‘rowtime))<SQL>SELECT a,
>> COUNT(b) OVER ROWTUMBLE(10.minutes, ‘rowtime)FROM tab GROUP BY a Please let
>> me know what you think.
>> Regards,Shaoxuan
>> ------------------------------------------------------------------发件人:Fabian
>> Hueske <fhue...@gmail.com<mailto:fhue...@gmail.com>>发送时间:2016年9月26日(星期一)
>> 21:13收件人:dev@flink.apache.org<mailto:dev@flink.apache.org> <
>> dev@flink.apache.org<mailto:dev@flink.apache.org>>主 题:Re: [DISCUSS]
>> FLIP-11: Table API Stream Aggregations Hi everybody,
>>
>> Timo proposed our FLIP-11 a bit more than three weeks ago.
>> I will update the status of the FLIP to accepted.
>>
>> Thanks,
>> Fabian
>>
>> 2016-09-19 9:16 GMT+02:00 Timo Walther <twal...@apache.org<mailto:twa
>> l...@apache.org>>:
>>
>> > Hi Jark,
>> >
>> > yes I think enough time has passed. We can start implementing the
>> changes.
>> > What do you think Fabian?
>> >
>> > If there are no objections, I will create the subtasks in Jira today.
>> >For
>> > FLIP-11/1 I already have implemented a prototype, I just have to do
>> >some
>> > refactoring/documentation before opening a PR.
>> >
>> > Timo
>> >
>> >
>> > Am 18/09/16 um 04:46 schrieb Jark Wu:
>> >
>> > Hi all,
>> >>
>> >> It seems that there’s no objections to the window design. So could we
>> >> open subtasks to start working on it now ?
>> >>
>> >> - Jark Wu
>> >>
>> >> 在 2016年9月7日,下午4:29,Jark Wu <wuchong...@alibaba-inc.com<mailto:
>> wuchong...@alibaba-inc.com>> 写道:
>> >>>
>> >>> Hi Fabian,
>> >>>
>> >>> Thanks for sharing your ideas.
>> >>>
>> >>> They all make sense to me. Regarding to reassigning timestamp, I do
>> >>>not
>> >>> have an use case. I come up with this because DataStream has a
>> >>> TimestampAssigner :)
>> >>>
>> >>> +1 for this FLIP.
>> >>>
>> >>> - Jark Wu
>> >>>
>> >>> 在 2016年9月7日,下午2:59,Fabian Hueske <fhue...@gmail.com<mailto:fhue
>> s...@gmail.com> <mailto:
>>
>> >>>> fhue...@gmail.com<mailto:fhue...@gmail.com>>> 写道:
>> >>>>
>> >>>> Hi,
>> >>>>
>> >>>> thanks for your comments and questions!
>> >>>> Actually, you are bringing up the points that Timo and I discussed
>> >>>>the
>> >>>> most
>> >>>> when designing the FLIP ;-)
>> >>>>
>> >>>> - We also thought about the syntactic shortcut for running
>> >>>>aggregates
>> >>>> like
>> >>>> you proposed (table.groupBy(‘a).select(…)). Our motivation to not
>> >>>>allow
>> >>>> this shortcut is to prevent users from accidentally performing a
>> >>>> "dangerous" operation. The problem with unbounded sliding
>> >>>>row-windows is
>> >>>> that their state does never expire. If you have an evolving key
>> >>>>space,
>> >>>> you
>> >>>> will likely run into problems at some point because the operator
>> >>>>state
>> >>>> grows too large. IMO, a row-window session is a better approach,
>> >>>> because it
>> >>>> defines a timeout after which state can be discarded.
>> >>>>groupBy.select is
>> >>>> a
>> >>>> very common operation in batch but its semantics in streaming are
>> >>>>very
>> >>>> different. In my opinion it makes sense to make users aware of
>> >>>>these
>> >>>> differences through the API.
>> >>>>
>> >>>> - Reassigning timestamps and watermarks is a very delicate issue.
>> >>>>You
>> >>>> are
>> >>>> right, that Calcite exposes this field which is necessary due to
>> >>>>the
>> >>>> semantics of SQL. However, also in Calcite you cannot freely choose
>> >>>>the
>> >>>> timestamp attribute for streaming queries (it must be a monotone or
>> >>>> quasi-monotone attribute) which is hard to reason about (and
>> >>>>guarantee)
>> >>>> after a few operators have been applied. Streaming tables in Flink
>> >>>>will
>> >>>> likely have a time attribute which is identical to the initial
>> rowtime.
>> >>>> However, Flink does modify timestamps internally, e.g., for records
>> >>>>that
>> >>>> are emitted from time windows, in order to ensure that consecutive
>> >>>> windows
>> >>>> perform as expected. Modify or reassign timestamps in the middle of
>> >>>>a
>> >>>> job
>> >>>> can result in unexpected results which are very hard to reason
>> >>>>about. Do
>> >>>> you have a concrete use case in mind for reassigning timestamps?
>> >>>>
>> >>>> - The idea to represent rowtime and systime as object is good. Our
>> >>>> motivation to go for reserved Scala symbols was to have a uniform
>> >>>>syntax
>> >>>> with windows over streaming and batch tables. On batch tables you
>> >>>>can
>> >>>> compute time windows basically over every time attribute (they are
>> >>>> treated
>> >>>> similar to grouping attributes with a bit of extra logic to extract
>> >>>>the
>> >>>> grouping key for sliding and session windows). If you write
>> >>>> window(Tumble
>> >>>> over 10.minutes on 'rowtime) on a streaming table, 'rowtime would
>> >>>> indicate
>> >>>> event-time. On a batch table with a 'rowtime attribute, the same
>> >>>> operator
>> >>>> would be internally converted into a group by. By going for the
>> >>>>object
>> >>>> approach we would lose this compatibility (or would need to
>> >>>>introduce an
>> >>>> additional column attribute to specifiy the window attribute for
>> >>>>batch
>> >>>> tables).
>> >>>>
>> >>>> As usual some of the design decisions are based on preferences.
>> >>>> Do they make sense to you? Let me know what you think.
>> >>>>
>> >>>> Best, Fabian
>> >>>>
>> >>>>
>> >>>> 2016-09-07 5:12 GMT+02:00 Jark Wu <wuchong...@alibaba-inc.com<ma
>> ilto:wuchong...@alibaba-inc.com> <mailto:
>>
>> >>>> wuchong...@alibaba-inc.com<mailto:wuchong...@alibaba-inc.com>>>:
>> >>>>
>> >>>> Hi all,
>> >>>>>
>> >>>>> I'm on vacation for about five days , sorry to have missed this
>> >>>>>great
>> >>>>> FLIP.
>> >>>>>
>> >>>>> Yes, the non-windowed aggregates is a special case of row-window.
>> >>>>>And
>> >>>>> the
>> >>>>> proposal looks really good.  Can we have a simplified form for the
>> >>>>> special
>> >>>>> case? Such as : table.groupBy(‘a).rowWindow(Sl
>> >>>>> ideRows.unboundedPreceding).select(…)
>> >>>>> can be simplified to  table.groupBy(‘a).select(…). The latter will
>> >>>>> actually
>> >>>>> call the former.
>> >>>>>
>> >>>>> Another question is about the rowtime. As the FLIP said,
>> >>>>>DataStream and
>> >>>>> StreamTableSource is responsible to assign timestamps and
>> >>>>>watermarks,
>> >>>>> furthermore “rowtime” and “systemtime” are not real column. IMO,
>> >>>>>it is
>> >>>>> different with Calcite’s rowtime, which is a real column in the
>> table.
>> >>>>> In
>> >>>>> FLIP's way, we will lose some flexibility. Because the timestamp
>> >>>>> column may
>> >>>>> be created after some transformations or join operation, not
>> >>>>>created at
>> >>>>> beginning. So why do we have to define rowtime at beginning?
>> >>>>>(because
>> >>>>> of
>> >>>>> watermark?)     Can we have a way to define rowtime after source
>> >>>>>table
>> >>>>> like
>> >>>>> TimestampAssinger?
>> >>>>>
>> >>>>> Regarding to “allowLateness” method. I come up a trick that we can
>> >>>>>make
>> >>>>> ‘rowtime and ‘system to be a Scala object, not a symbol expression.
>> >>>>> The API
>> >>>>> will looks like this :
>> >>>>>
>> >>>>> window(Tumble over 10.minutes on rowtime allowLateness as ‘w)
>> >>>>>
>> >>>>> The implementation will look like this:
>> >>>>>
>> >>>>> class TumblingWindow(size: Expression) extends Window {
>> >>>>>   def on(time: rowtime.type): TumblingEventTimeWindow =
>> >>>>>       new TumblingEventTimeWindow(alias, ‘rowtime, size)        //
>> >>>>>has
>> >>>>> allowLateness() method
>> >>>>>
>> >>>>>   def on(time: systemtime.type): TumblingProcessingTimeWindow=
>> >>>>>      new TumblingProcessingTimeWindow(alias, ‘systemtime, size)
>> >>>>> // hasn’t allowLateness() method
>> >>>>> }
>> >>>>> object rowtime
>> >>>>> object systemtime
>> >>>>>
>> >>>>> What do you think about this?
>> >>>>>
>> >>>>> - Jark Wu
>> >>>>>
>> >>>>> 在 2016年9月6日,下午11:00,Timo Walther <twal...@apache.org<mailto:twa
>> l...@apache.org> <mailto:
>>
>> >>>>>> twal...@apache.org<mailto:twal...@apache.org>>> 写道:
>> >>>>>>
>> >>>>>> Hi all,
>> >>>>>>
>> >>>>>> I thought about the API of the FLIP again. If we allow the
>> >>>>>> "systemtime"
>> >>>>>>
>> >>>>> attribute, we cannot implement a nice method chaining where the
>> >>>>>user
>> >>>>> can
>> >>>>> define a "allowLateness" only on event time. So even if the user
>> >>>>> expressed
>> >>>>> that "systemtime" is used we have to offer a "allowLateness"
>> >>>>>method
>> >>>>> because
>> >>>>> we have to assume that this attribute can also be the batch event
>> >>>>>time
>> >>>>> column, which is not very nice.
>> >>>>>
>> >>>>>> class TumblingWindow(size: Expression) extends Window {
>> >>>>>> def on(timeField: Expression): TumblingEventTimeWindow =
>> >>>>>>    new TumblingEventTimeWindow(alias, timeField, size) // has
>> >>>>>>
>> >>>>> allowLateness() method
>> >>>>>
>> >>>>>> }
>> >>>>>>
>> >>>>>> What do you think?
>> >>>>>>
>> >>>>>> Timo
>> >>>>>>
>> >>>>>>
>> >>>>>> Am 05/09/16 um 10:41 schrieb Fabian Hueske:
>> >>>>>>
>> >>>>>>> Hi Jark,
>> >>>>>>>
>> >>>>>>> you had asked for non-windowed aggregates in the Table API a few
>> >>>>>>> times.
>> >>>>>>> FLIP-11 proposes row-window aggregates which are a
>> >>>>>>>generalization of
>> >>>>>>> running aggregates (SlideRow unboundedPreceding).
>> >>>>>>>
>> >>>>>>> Can you have a look at the FLIP and give feedback whether this
>> >>>>>>>is
>> >>>>>>> what
>> >>>>>>>
>> >>>>>> you
>> >>>>>
>> >>>>>> are looking for?
>> >>>>>>> Improvement suggestions are very welcome as well.
>> >>>>>>>
>> >>>>>>> Thank you,
>> >>>>>>> Fabian
>> >>>>>>>
>> >>>>>>> 2016-09-01 16<tel:2016-09-01%C2%A016>:12 GMT+02:00 Timo Walther <
>> twal...@apache.org<mailto:twal...@apache.org> <mailto:
>> >>>>>>> twal...@apache.org<mailto:twal...@apache.org>>>:
>> >>>>>>>
>> >>>>>>> Hi all!
>> >>>>>>>>
>> >>>>>>>> Fabian and I worked on a FLIP for Stream Aggregations in the
>> >>>>>>>>Table
>> >>>>>>>> API.
>> >>>>>>>> You can find the FLIP-11 here:
>> >>>>>>>>
>> >>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%<h
>> ttps://cwiki.apache.org/confluence/display/FLINK/FLIP-11%25> <
>> >>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%<h
>> ttps://cwiki.apache.org/confluence/display/FLINK/FLIP-11%25>>
>>
>> >>>>>>>> 3A+Table+API+Stream+Aggregations
>> >>>>>>>>
>> >>>>>>>> Motivation for the FLIP:
>> >>>>>>>>
>> >>>>>>>> The Table API is a declarative API to define queries on static
>> >>>>>>>>and
>> >>>>>>>> streaming tables. So far, only projection, selection, and union
>> >>>>>>>>are
>> >>>>>>>> supported operations on streaming tables.
>> >>>>>>>>
>> >>>>>>>> This FLIP proposes to add support for different types of
>> >>>>>>>> aggregations
>> >>>>>>>>
>> >>>>>>> on
>> >>>>>
>> >>>>>> top of streaming tables. In particular, we seek to support:
>> >>>>>>>>
>> >>>>>>>> - Group-window aggregates, i.e., aggregates which are computed
>> >>>>>>>>for a
>> >>>>>>>>
>> >>>>>>> group
>> >>>>>
>> >>>>>> of elements. A (time or row-count) window is required to bound
>> >>>>>>the
>> >>>>>>>>
>> >>>>>>> infinite
>> >>>>>
>> >>>>>> input stream into a finite group.
>> >>>>>>>>
>> >>>>>>>> - Row-window aggregates, i.e., aggregates which are computed
>> >>>>>>>>for
>> >>>>>>>> each
>> >>>>>>>>
>> >>>>>>> row,
>> >>>>>
>> >>>>>> based on a window (range) of preceding and succeeding rows.
>> >>>>>>>> Each type of aggregate shall be supported on keyed/grouped or
>> >>>>>>>> non-keyed/grouped data streams for streaming tables as well as
>> >>>>>>>>batch
>> >>>>>>>>
>> >>>>>>> tables.
>> >>>>>
>> >>>>>> We are looking forward to your feedback.
>> >>>>>>>>
>> >>>>>>>> Timo
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>> --
>> >>>>>> Freundliche Grüße / Kind Regards
>> >>>>>>
>> >>>>>> Timo Walther
>> >>>>>>
>> >>>>>> Follow me: @twalthr
>> >>>>>> https://www.linkedin.com/in/twalthr
>> >>>>>><https://www.linkedin.com/in/t
>> >>>>>> walthr>
>> >>>>>>
>> >>>>>
>> >>>>>
>> >>
>> >
>> > --
>> > Freundliche Grüße / Kind Regards
>> >
>> > Timo Walther
>> >
>> > Follow me: @twalthr
>> > https://www.linkedin.com/in/twalthr
>> >
>> >
>>
>>
>

Reply via email to