Hi Fabian:
What is the strategy for new syntax which calcite does not support? The
calcite will support it? For example, the row window syntax.
Thank you very much!
-----邮件原件-----
发件人: Fabian Hueske [mailto:[email protected]]
发送时间: 2016年10月13日 18:17
收件人: [email protected]
抄送: Sean Wang; Timo Walther
主题: Re: 答复: RE:[DISCUSS] FLIP-11: Table API Stream Aggregations
Hi Zhangrucong,
yes, we want to use Calcite's SQL parser including its window syntax, i.e.,
- the standard SQL OVER windows (in streaming with a few restriction such as no
different partitionings or orders)
- the GroupBy window functions (TUMBLE, HOP, SESSION).
The GroupBy window function are not implemented in Calcite yet. There is
CALCITE-1345 [1] to track the issue.
As Shaoxuan mentioned, we are not using the STREAM keyword to be SQL compliant.
Best, Fabian
[1] https://issues.apache.org/jira/browse/CALCITE-1345
2016-10-13 12:05 GMT+02:00 Fabian Hueske <[email protected]>:
> Hi everybody,
>
> happy to see a good discussion here :-) I'll reply to Shaoxuan's mail
> first and comment on Zhangrucong question in a separate mail.
>
> Shaoxuan, thanks for the suggestions! I think we all agree that for
> SQL we should definitely follow the standard (batch) SQL syntax.
> In my opinion, the Table API does not necessarily have to be as close
> as possible to SQL but should try to make a few things easier and also
> safer (easier is of course subjective).
>
> - GroupBy without windows: These are currently intentionally not
> supported and also not part of FLIP-11. Our motivation for not
> supporting this, is to guard the user from defining a query that fails
> when being executed due to a very memory consuming operation. FLIP-11
> provides a way to define such a query as a sliding row window with
> unbounded preceding rows. With the upcoming SQL proposal, queries that
> consume unbounded memory should be identified and rejected. I would be
> in favor of allowing groupBy without windows once the guarding mechanism are
> in place.
>
> - GroupBy with window: I think this is a question of taste. Having a
> window() call, makes the feature more explicit in my opinion. However,
> I'm not opposed to move the windows into the groupBy clause.
> Implementation-wise it should be easy to move the window definition
> into to groupBy clause for the Scala Table API. For the Java Table API
> we would need to extend the parser quite a bit because windows would
> need to be defined as Strings and not via objects.
>
> - RowWindows: The rowWindow() call mimics the standard SQL WINDOW
> clause (implemented by PostgreSQL and Calcite) which allows to have "reusable"
> window definitions. I think this is a desirable feature. In the
> FLIP-11 proposal the over() clause in select() refers to the
> predefined windows with aliases. In case only one window is defined,
> the over() clause is optional and the same (and only) window is
> applied to all aggregates. I think we can make the over() call
> mandatory to have the windowing more explicit. It should also be
> possible to extend the over clause to directly accept RowWindows
> instead of window aliases. I would not make this a priority at the
> moment, but a feature that could be later added, because
> rowWindow() and over() cover all cases. Similar as for GroupBy with
> windows, we would need to extend the parser for the Java Table API though.
>
> Finally, I have an own suggestion:
> In FLIP-11, groupBy() is used to define the partitioning of
> RowWindows. I think this should be changed to partitionBy() because
> groupBy() groups data and applies an aggregation to all rows of a
> group which is not happening here. In original SQL, the OVER clause
> features a PARTITION BY clause. We are moving this out of the window
> definition, i.e., OVER clause, to enforce the same partitioning for
> all windows (different partitionings would be a challenge to execute in a
> parallel system).
>
> @Timo and all: What do you think about:
>
> - moving windows into the groupBy() call
> - making over() for rowWindow() with a single window definition
> - additionally allowing window definitions in over()
> - using partitionBy() instead of groupBy() for row windows?
>
> Best, Fabian
>
> 2016-10-13 11:10 GMT+02:00 Zhangrucong <[email protected]>:
>
>> Hi shaoxuan:
>>
>> I think, the streamsql must be excuted in table environment. So I
>> call this table API ‘s StreamSQL. What do you call for this, stream
>> Table API or streamsql? It is fu
>>
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>> val tblEnv = TableEnvironment.getTableEnvironment(env)
>> val ds: DataStream[(String,Long, Long)] =
>> env.readTextFile("/home/demo") tblEnv.registerDataStream("Order", ds,
>> 'userID, 'count, 'num)
>> .map(f=>(f, 1L, 1L))
>> val sql = tblEnv.sql("SELECT Stream * FROM Order WHERE userID='A'")
>>
>> So in my opinion, the grammar which is marked red should be
>> compatible with calcite's StreamSQL grammar.
>>
>> By the way, thanks very much for telling me the modified content in
>> Flink StreamSQL. I will look the new proposal .
>>
>> Thanks!
>> 发件人: Sean Wang [mailto:[email protected]]
>> 发送时间: 2016年10月13日 16:29
>> 收件人: [email protected]; Zhangrucong
>> 主题: Re: RE:[DISCUSS] FLIP-11: Table API Stream Aggregations
>>
>> Hi zhangrucong,
>> I am not sure what you mean by "table API'S StreamSQL", I guess you
>> mean "stream TableAPI"?
>> TableAPI should be compatible with calcite SQL. (By compatible, My
>> understanding is that both TableAPI and SQL will be translated to the
>> same logical plan - the same set of REL and REX).
>> BTW, please note that we recently have merged a change to remove
>> STREAM keyword for flink stream SQL(FLINK-4546). In our opinion,
>> batch and stream are not necessarily to be differentiated at the SQL
>> level. The major difference between batch and stream is "WHEN and HOW to
>> emit the result".
>> We have been working on a new proposal with Fabian on this change. I
>> guess it will be sent out for review very soon.
>>
>> Regards,
>> Shaoxuan
>>
>>
>> On Thu, Oct 13, 2016 at 2:29 PM, Zhangrucong <[email protected]
>> <mailto:[email protected]>> wrote:
>> Hi shaoxuan:
>> Does the table API'S StreamSQL grammar is compatible with calcite's
>> StreamSQL grammar?
>>
>>
>> 1、In calcite, the tumble window is realized by using function tumble
>> or hop. And the function must be used with group by, like this:
>>
>> SELECT
>> TUMBLE_END(rowtime, INTERVAL '30' MINUTE, TIME '0:12') AS rowtime,
>> productId,
>> COUNT(*) AS c,
>> SUM(units) AS units
>> FROM Orders
>> GROUP BY TUMBLE(rowtime, INTERVAL '30' MINUTE, TIME '0:12'),
>> productId;
>>
>> 2、 The sliding window uses keywords "window" and "over". Like this:
>>
>> SELECT *
>> FROM (
>> SELECT STREAM rowtime,
>> productId,
>> units,
>> AVG(units) OVER product (RANGE INTERVAL '10' MINUTE PRECEDING) AS m10,
>> AVG(units) OVER product (RANGE INTERVAL '7' DAY PRECEDING) AS d7
>> FROM Orders
>> WINDOW product AS (
>> ORDER BY rowtime
>> PARTITION BY productId))
>>
>>
>>
>> Thanks!
>>
>> -----邮件原件-----
>> 发件人: 王绍翾(大沙)
>> [mailto:[email protected]<mailto:shaoxuan.wsx@ali
>> baba-inc.com>]
>> 发送时间: 2016年10月13日 2:03
>> 收件人: [email protected]<mailto:[email protected]>
>> 主题: RE:[DISCUSS] FLIP-11: Table API Stream Aggregations
>>
>> Hi Fabian, Timo, and Jark.Thanks for kicking off this FLIP. This is a
>> really great and promising proposal. I have a few comments to the "window"
>> operator proposed in this FLIP (I am hoping it is not too late to
>> bring up this). First, window is not always needed for the stream
>> aggregation. There are cases where we want do an aggreation on a
>> stream, while the query/emit strategy decides when to emit a
>> streaming output. Second, window is needed when we want do an
>> aggregation for a certain rage, but window is not an operator. We
>> basically use window to define the range for aggregation. In tableAPI, a
>> window should be defined together with "groupby" and "select"
>> operators, either inside a "groupby" operator or after an "over"
>> clause in "select" operator. This will make the TableAPI in the similar
>> manner as SQL.
>> For instance,[A groupby without window] <Table API> val res = tab
>> .groupBy(‘a)
>> .select(‘a, ‘b.sum)
>> <SQL>
>> SELECT a, SUM(b)
>> FROM tab
>> GROUP BY a
>> [A tumble window inside groupby]
>> <Table API>val res = tab
>> .groupBy(‘a, tumble(10.minutes, ‘rowtime)) .select(‘a, ‘b.sum)
>> <SQL>SELECT a, SUM(b)FROM tab GROUP BY a, TUMBLE(10.minutes ,
>> ‘rowtime) [A row tumble window after OVER] <Table API>.groupby('a)
>> //optional .select(‘a, ‘b.count over rowTumble(10.minutes,
>> ‘rowtime))<SQL>SELECT a,
>> COUNT(b) OVER ROWTUMBLE(10.minutes, ‘rowtime)FROM tab GROUP BY a
>> Please let me know what you think.
>> Regards,Shaoxuan
>> ------------------------------------------------------------------发件人
>> :Fabian
>> Hueske
>> <[email protected]<mailto:[email protected]>>发送时间:2016年9月26日(星期一)
>> 21:13收件人:[email protected]<mailto:[email protected]> <
>> [email protected]<mailto:[email protected]>>主 题:Re: [DISCUSS]
>> FLIP-11: Table API Stream Aggregations Hi everybody,
>>
>> Timo proposed our FLIP-11 a bit more than three weeks ago.
>> I will update the status of the FLIP to accepted.
>>
>> Thanks,
>> Fabian
>>
>> 2016-09-19 9:16 GMT+02:00 Timo Walther <[email protected]<mailto:twa
>> [email protected]>>:
>>
>> > Hi Jark,
>> >
>> > yes I think enough time has passed. We can start implementing the
>> changes.
>> > What do you think Fabian?
>> >
>> > If there are no objections, I will create the subtasks in Jira today.
>> >For
>> > FLIP-11/1 I already have implemented a prototype, I just have to do
>> >some refactoring/documentation before opening a PR.
>> >
>> > Timo
>> >
>> >
>> > Am 18/09/16 um 04:46 schrieb Jark Wu:
>> >
>> > Hi all,
>> >>
>> >> It seems that there’s no objections to the window design. So could
>> >> we open subtasks to start working on it now ?
>> >>
>> >> - Jark Wu
>> >>
>> >> 在 2016年9月7日,下午4:29,Jark Wu <[email protected]<mailto:
>> [email protected]>> 写道:
>> >>>
>> >>> Hi Fabian,
>> >>>
>> >>> Thanks for sharing your ideas.
>> >>>
>> >>> They all make sense to me. Regarding to reassigning timestamp, I
>> >>>do not have an use case. I come up with this because DataStream
>> >>>has a TimestampAssigner :)
>> >>>
>> >>> +1 for this FLIP.
>> >>>
>> >>> - Jark Wu
>> >>>
>> >>> 在 2016年9月7日,下午2:59,Fabian Hueske <[email protected]<mailto:fhue
>> [email protected]> <mailto:
>> >>>> [email protected]<mailto:[email protected]>>> 写道:
>> >>>>
>> >>>> Hi,
>> >>>>
>> >>>> thanks for your comments and questions!
>> >>>> Actually, you are bringing up the points that Timo and I
>> >>>>discussed the most when designing the FLIP ;-)
>> >>>>
>> >>>> - We also thought about the syntactic shortcut for running
>> >>>>aggregates like you proposed (table.groupBy(‘a).select(…)). Our
>> >>>>motivation to not allow this shortcut is to prevent users from
>> >>>>accidentally performing a "dangerous" operation. The problem
>> >>>>with unbounded sliding row-windows is that their state does
>> >>>>never expire. If you have an evolving key space, you will
>> >>>>likely run into problems at some point because the operator state
>> >>>>grows too large. IMO, a row-window session is a better approach,
>> >>>>because it defines a timeout after which state can be discarded.
>> >>>>groupBy.select is
>> >>>> a
>> >>>> very common operation in batch but its semantics in streaming
>> >>>>are very different. In my opinion it makes sense to make users
>> >>>>aware of these differences through the API.
>> >>>>
>> >>>> - Reassigning timestamps and watermarks is a very delicate issue.
>> >>>>You
>> >>>> are
>> >>>> right, that Calcite exposes this field which is necessary due to
>> >>>>the semantics of SQL. However, also in Calcite you cannot freely
>> >>>>choose the timestamp attribute for streaming queries (it must be
>> >>>>a monotone or quasi-monotone attribute) which is hard to reason
>> >>>>about (and
>> >>>>guarantee)
>> >>>> after a few operators have been applied. Streaming tables in
>> >>>>Flink will likely have a time attribute which is identical to
>> >>>>the initial
>> rowtime.
>> >>>> However, Flink does modify timestamps internally, e.g., for
>> >>>>records that are emitted from time windows, in order to ensure
>> >>>>that consecutive windows perform as expected. Modify or
>> >>>>reassign timestamps in the middle of a job can result in
>> >>>>unexpected results which are very hard to reason about. Do you
>> >>>>have a concrete use case in mind for reassigning timestamps?
>> >>>>
>> >>>> - The idea to represent rowtime and systime as object is good.
>> >>>>Our motivation to go for reserved Scala symbols was to have a
>> >>>>uniform syntax with windows over streaming and batch tables. On
>> >>>>batch tables you can compute time windows basically over every
>> >>>>time attribute (they are treated similar to grouping attributes
>> >>>>with a bit of extra logic to extract the grouping key for
>> >>>>sliding and session windows). If you write window(Tumble over
>> >>>>10.minutes on 'rowtime) on a streaming table, 'rowtime would
>> >>>>indicate event-time. On a batch table with a 'rowtime attribute,
>> >>>>the same operator would be internally converted into a group
>> >>>>by. By going for the object approach we would lose this
>> >>>>compatibility (or would need to introduce an additional column
>> >>>>attribute to specifiy the window attribute for batch tables).
>> >>>>
>> >>>> As usual some of the design decisions are based on preferences.
>> >>>> Do they make sense to you? Let me know what you think.
>> >>>>
>> >>>> Best, Fabian
>> >>>>
>> >>>>
>> >>>> 2016-09-07 5:12 GMT+02:00 Jark Wu <[email protected]<ma
>> ilto:[email protected]> <mailto:
>> >>>> [email protected]<mailto:[email protected]>>>:
>> >>>>
>> >>>> Hi all,
>> >>>>>
>> >>>>> I'm on vacation for about five days , sorry to have missed this
>> >>>>>great FLIP.
>> >>>>>
>> >>>>> Yes, the non-windowed aggregates is a special case of row-window.
>> >>>>>And
>> >>>>> the
>> >>>>> proposal looks really good. Can we have a simplified form for
>> >>>>>the special case? Such as : table.groupBy(‘a).rowWindow(Sl
>> >>>>> ideRows.unboundedPreceding).select(…)
>> >>>>> can be simplified to table.groupBy(‘a).select(…). The latter
>> >>>>>will actually call the former.
>> >>>>>
>> >>>>> Another question is about the rowtime. As the FLIP said,
>> >>>>>DataStream and StreamTableSource is responsible to assign
>> >>>>>timestamps and watermarks, furthermore “rowtime” and
>> >>>>>“systemtime” are not real column. IMO, it is different with
>> >>>>>Calcite’s rowtime, which is a real column in the
>> table.
>> >>>>> In
>> >>>>> FLIP's way, we will lose some flexibility. Because the timestamp
>> >>>>> column may
>> >>>>> be created after some transformations or join operation, not
>> >>>>>created at
>> >>>>> beginning. So why do we have to define rowtime at beginning?
>> >>>>>(because
>> >>>>> of
>> >>>>> watermark?) Can we have a way to define rowtime after source
>> >>>>>table
>> >>>>> like
>> >>>>> TimestampAssinger?
>> >>>>>
>> >>>>> Regarding to “allowLateness” method. I come up a trick that we can
>> >>>>>make
>> >>>>> ‘rowtime and ‘system to be a Scala object, not a symbol expression.
>> >>>>> The API
>> >>>>> will looks like this :
>> >>>>>
>> >>>>> window(Tumble over 10.minutes on rowtime allowLateness as ‘w)
>> >>>>>
>> >>>>> The implementation will look like this:
>> >>>>>
>> >>>>> class TumblingWindow(size: Expression) extends Window {
>> >>>>> def on(time: rowtime.type): TumblingEventTimeWindow =
>> >>>>> new TumblingEventTimeWindow(alias, ‘rowtime, size) //
>> >>>>>has
>> >>>>> allowLateness() method
>> >>>>>
>> >>>>> def on(time: systemtime.type): TumblingProcessingTimeWindow=
>> >>>>> new TumblingProcessingTimeWindow(alias, ‘systemtime, size)
>> >>>>> // hasn’t allowLateness() method
>> >>>>> }
>> >>>>> object rowtime
>> >>>>> object systemtime
>> >>>>>
>> >>>>> What do you think about this?
>> >>>>>
>> >>>>> - Jark Wu
>> >>>>>
>> >>>>> 在 2016年9月6日,下午11:00,Timo Walther <[email protected]<mailto:twa
>> [email protected]> <mailto:
>> >>>>>> [email protected]<mailto:[email protected]>>> 写道:
>> >>>>>>
>> >>>>>> Hi all,
>> >>>>>>
>> >>>>>> I thought about the API of the FLIP again. If we allow the
>> >>>>>> "systemtime"
>> >>>>>>
>> >>>>> attribute, we cannot implement a nice method chaining where the
>> >>>>>user
>> >>>>> can
>> >>>>> define a "allowLateness" only on event time. So even if the user
>> >>>>> expressed
>> >>>>> that "systemtime" is used we have to offer a "allowLateness"
>> >>>>>method
>> >>>>> because
>> >>>>> we have to assume that this attribute can also be the batch event
>> >>>>>time
>> >>>>> column, which is not very nice.
>> >>>>>
>> >>>>>> class TumblingWindow(size: Expression) extends Window {
>> >>>>>> def on(timeField: Expression): TumblingEventTimeWindow =
>> >>>>>> new TumblingEventTimeWindow(alias, timeField, size) // has
>> >>>>>>
>> >>>>> allowLateness() method
>> >>>>>
>> >>>>>> }
>> >>>>>>
>> >>>>>> What do you think?
>> >>>>>>
>> >>>>>> Timo
>> >>>>>>
>> >>>>>>
>> >>>>>> Am 05/09/16 um 10:41 schrieb Fabian Hueske:
>> >>>>>>
>> >>>>>>> Hi Jark,
>> >>>>>>>
>> >>>>>>> you had asked for non-windowed aggregates in the Table API a few
>> >>>>>>> times.
>> >>>>>>> FLIP-11 proposes row-window aggregates which are a
>> >>>>>>>generalization of
>> >>>>>>> running aggregates (SlideRow unboundedPreceding).
>> >>>>>>>
>> >>>>>>> Can you have a look at the FLIP and give feedback whether this
>> >>>>>>>is
>> >>>>>>> what
>> >>>>>>>
>> >>>>>> you
>> >>>>>
>> >>>>>> are looking for?
>> >>>>>>> Improvement suggestions are very welcome as well.
>> >>>>>>>
>> >>>>>>> Thank you,
>> >>>>>>> Fabian
>> >>>>>>>
>> >>>>>>> 2016-09-01 16<tel:2016-09-01%C2%A016>:12 GMT+02:00 Timo Walther <
>> [email protected]<mailto:[email protected]> <mailto:
>> >>>>>>> [email protected]<mailto:[email protected]>>>:
>> >>>>>>>
>> >>>>>>> Hi all!
>> >>>>>>>>
>> >>>>>>>> Fabian and I worked on a FLIP for Stream Aggregations in the
>> >>>>>>>>Table
>> >>>>>>>> API.
>> >>>>>>>> You can find the FLIP-11 here:
>> >>>>>>>>
>> >>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%<h
>> ttps://cwiki.apache.org/confluence/display/FLINK/FLIP-11%25> <
>> >>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%<h
>> ttps://cwiki.apache.org/confluence/display/FLINK/FLIP-11%25>>
>> >>>>>>>> 3A+Table+API+Stream+Aggregations
>> >>>>>>>>
>> >>>>>>>> Motivation for the FLIP:
>> >>>>>>>>
>> >>>>>>>> The Table API is a declarative API to define queries on static
>> >>>>>>>>and
>> >>>>>>>> streaming tables. So far, only projection, selection, and union
>> >>>>>>>>are
>> >>>>>>>> supported operations on streaming tables.
>> >>>>>>>>
>> >>>>>>>> This FLIP proposes to add support for different types of
>> >>>>>>>> aggregations
>> >>>>>>>>
>> >>>>>>> on
>> >>>>>
>> >>>>>> top of streaming tables. In particular, we seek to support:
>> >>>>>>>>
>> >>>>>>>> - Group-window aggregates, i.e., aggregates which are computed
>> >>>>>>>>for a
>> >>>>>>>>
>> >>>>>>> group
>> >>>>>
>> >>>>>> of elements. A (time or row-count) window is required to bound
>> >>>>>>the
>> >>>>>>>>
>> >>>>>>> infinite
>> >>>>>
>> >>>>>> input stream into a finite group.
>> >>>>>>>>
>> >>>>>>>> - Row-window aggregates, i.e., aggregates which are computed
>> >>>>>>>>for
>> >>>>>>>> each
>> >>>>>>>>
>> >>>>>>> row,
>> >>>>>
>> >>>>>> based on a window (range) of preceding and succeeding rows.
>> >>>>>>>> Each type of aggregate shall be supported on keyed/grouped or
>> >>>>>>>> non-keyed/grouped data streams for streaming tables as well as
>> >>>>>>>>batch
>> >>>>>>>>
>> >>>>>>> tables.
>> >>>>>
>> >>>>>> We are looking forward to your feedback.
>> >>>>>>>>
>> >>>>>>>> Timo
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>> --
>> >>>>>> Freundliche Grüße / Kind Regards
>> >>>>>>
>> >>>>>> Timo Walther
>> >>>>>>
>> >>>>>> Follow me: @twalthr
>> >>>>>> https://www.linkedin.com/in/twalthr
>> >>>>>><https://www.linkedin.com/in/t
>> >>>>>> walthr>
>> >>>>>>
>> >>>>>
>> >>>>>
>> >>
>> >
>> > --
>> > Freundliche Grüße / Kind Regards
>> >
>> > Timo Walther
>> >
>> > Follow me: @twalthr
>> > https://www.linkedin.com/in/twalthr
>> >
>> >
>>
>>
>