Hi shaoxuan:
I think, the streamsql must be excuted in table environment. So I call this
table API ‘s StreamSQL. What do you call for this, stream Table API or
streamsql? It is fu
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tblEnv = TableEnvironment.getTableEnvironment(env)
val ds: DataStream[(String,Long, Long)] = env.readTextFile("/home/demo")
tblEnv.registerDataStream("Order", ds, 'userID, 'count, 'num)
.map(f=>(f, 1L, 1L))
val sql = tblEnv.sql("SELECT Stream * FROM Order WHERE userID='A'")
So in my opinion, the grammar which is marked red should be compatible with
calcite's StreamSQL grammar.
By the way, thanks very much for telling me the modified content in Flink
StreamSQL. I will look the new proposal .
Thanks!
发件人: Sean Wang [mailto:[email protected]]
发送时间: 2016年10月13日 16:29
收件人: [email protected]; Zhangrucong
主题: Re: RE:[DISCUSS] FLIP-11: Table API Stream Aggregations
Hi zhangrucong,
I am not sure what you mean by "table API'S StreamSQL", I guess you mean
"stream TableAPI"?
TableAPI should be compatible with calcite SQL. (By compatible, My
understanding is that both TableAPI and SQL will be translated to the same
logical plan - the same set of REL and REX).
BTW, please note that we recently have merged a change to remove STREAM keyword
for flink stream SQL(FLINK-4546). In our opinion, batch and stream are not
necessarily to be differentiated at the SQL level. The major difference between
batch and stream is "WHEN and HOW to emit the result".
We have been working on a new proposal with Fabian on this change. I guess it
will be sent out for review very soon.
Regards,
Shaoxuan
On Thu, Oct 13, 2016 at 2:29 PM, Zhangrucong
<[email protected]<mailto:[email protected]>> wrote:
Hi shaoxuan:
Does the table API'S StreamSQL grammar is compatible with calcite's StreamSQL
grammar?
1、In calcite, the tumble window is realized by using function tumble or hop.
And the function must be used with group by, like this:
SELECT
TUMBLE_END(rowtime, INTERVAL '30' MINUTE, TIME '0:12') AS rowtime,
productId,
COUNT(*) AS c,
SUM(units) AS units
FROM Orders
GROUP BY TUMBLE(rowtime, INTERVAL '30' MINUTE, TIME '0:12'),
productId;
2、 The sliding window uses keywords "window" and "over". Like this:
SELECT *
FROM (
SELECT STREAM rowtime,
productId,
units,
AVG(units) OVER product (RANGE INTERVAL '10' MINUTE PRECEDING) AS m10,
AVG(units) OVER product (RANGE INTERVAL '7' DAY PRECEDING) AS d7
FROM Orders
WINDOW product AS (
ORDER BY rowtime
PARTITION BY productId))
Thanks!
-----邮件原件-----
发件人: 王绍翾(大沙)
[mailto:[email protected]<mailto:[email protected]>]
发送时间: 2016年10月13日 2:03
收件人: [email protected]<mailto:[email protected]>
主题: RE:[DISCUSS] FLIP-11: Table API Stream Aggregations
Hi Fabian, Timo, and Jark.Thanks for kicking off this FLIP. This is a really
great and promising proposal. I have a few comments to the "window" operator
proposed in this FLIP (I am hoping it is not too late to bring up this). First,
window is not always needed for the stream aggregation. There are cases where
we want do an aggreation on a stream, while the query/emit strategy decides
when to emit a streaming output. Second, window is needed when we want do an
aggregation for a certain rage, but window is not an operator. We basically use
window to define the range for aggregation. In tableAPI, a window should be
defined together with "groupby" and "select" operators, either inside a
"groupby" operator or after an "over" clause in "select" operator. This will
make the TableAPI in the similar manner as SQL.
For instance,[A groupby without window]
<Table API>
val res = tab
.groupBy(‘a)
.select(‘a, ‘b.sum)
<SQL>
SELECT a, SUM(b)
FROM tab
GROUP BY a
[A tumble window inside groupby]
<Table API>val res = tab
.groupBy(‘a, tumble(10.minutes, ‘rowtime)) .select(‘a, ‘b.sum) <SQL>SELECT a,
SUM(b)FROM tab GROUP BY a, TUMBLE(10.minutes , ‘rowtime) [A row tumble window
after OVER] <Table API>.groupby('a) //optional .select(‘a, ‘b.count over
rowTumble(10.minutes, ‘rowtime))<SQL>SELECT a, COUNT(b) OVER
ROWTUMBLE(10.minutes, ‘rowtime)FROM tab GROUP BY a Please let me know what you
think.
Regards,Shaoxuan
------------------------------------------------------------------发件人:Fabian
Hueske <[email protected]<mailto:[email protected]>>发送时间:2016年9月26日(星期一)
21:13收件人:[email protected]<mailto:[email protected]>
<[email protected]<mailto:[email protected]>>主 题:Re: [DISCUSS] FLIP-11:
Table API Stream Aggregations Hi everybody,
Timo proposed our FLIP-11 a bit more than three weeks ago.
I will update the status of the FLIP to accepted.
Thanks,
Fabian
2016-09-19 9:16 GMT+02:00 Timo Walther
<[email protected]<mailto:[email protected]>>:
> Hi Jark,
>
> yes I think enough time has passed. We can start implementing the changes.
> What do you think Fabian?
>
> If there are no objections, I will create the subtasks in Jira today.
>For
> FLIP-11/1 I already have implemented a prototype, I just have to do
>some
> refactoring/documentation before opening a PR.
>
> Timo
>
>
> Am 18/09/16 um 04:46 schrieb Jark Wu:
>
> Hi all,
>>
>> It seems that there’s no objections to the window design. So could we
>> open subtasks to start working on it now ?
>>
>> - Jark Wu
>>
>> 在 2016年9月7日,下午4:29,Jark Wu
>> <[email protected]<mailto:[email protected]>> 写道:
>>>
>>> Hi Fabian,
>>>
>>> Thanks for sharing your ideas.
>>>
>>> They all make sense to me. Regarding to reassigning timestamp, I do
>>>not
>>> have an use case. I come up with this because DataStream has a
>>> TimestampAssigner :)
>>>
>>> +1 for this FLIP.
>>>
>>> - Jark Wu
>>>
>>> 在 2016年9月7日,下午2:59,Fabian Hueske
>>> <[email protected]<mailto:[email protected]> <mailto:
>>>> [email protected]<mailto:[email protected]>>> 写道:
>>>>
>>>> Hi,
>>>>
>>>> thanks for your comments and questions!
>>>> Actually, you are bringing up the points that Timo and I discussed
>>>>the
>>>> most
>>>> when designing the FLIP ;-)
>>>>
>>>> - We also thought about the syntactic shortcut for running
>>>>aggregates
>>>> like
>>>> you proposed (table.groupBy(‘a).select(…)). Our motivation to not
>>>>allow
>>>> this shortcut is to prevent users from accidentally performing a
>>>> "dangerous" operation. The problem with unbounded sliding
>>>>row-windows is
>>>> that their state does never expire. If you have an evolving key
>>>>space,
>>>> you
>>>> will likely run into problems at some point because the operator
>>>>state
>>>> grows too large. IMO, a row-window session is a better approach,
>>>> because it
>>>> defines a timeout after which state can be discarded.
>>>>groupBy.select is
>>>> a
>>>> very common operation in batch but its semantics in streaming are
>>>>very
>>>> different. In my opinion it makes sense to make users aware of
>>>>these
>>>> differences through the API.
>>>>
>>>> - Reassigning timestamps and watermarks is a very delicate issue.
>>>>You
>>>> are
>>>> right, that Calcite exposes this field which is necessary due to
>>>>the
>>>> semantics of SQL. However, also in Calcite you cannot freely choose
>>>>the
>>>> timestamp attribute for streaming queries (it must be a monotone or
>>>> quasi-monotone attribute) which is hard to reason about (and
>>>>guarantee)
>>>> after a few operators have been applied. Streaming tables in Flink
>>>>will
>>>> likely have a time attribute which is identical to the initial rowtime.
>>>> However, Flink does modify timestamps internally, e.g., for records
>>>>that
>>>> are emitted from time windows, in order to ensure that consecutive
>>>> windows
>>>> perform as expected. Modify or reassign timestamps in the middle of
>>>>a
>>>> job
>>>> can result in unexpected results which are very hard to reason
>>>>about. Do
>>>> you have a concrete use case in mind for reassigning timestamps?
>>>>
>>>> - The idea to represent rowtime and systime as object is good. Our
>>>> motivation to go for reserved Scala symbols was to have a uniform
>>>>syntax
>>>> with windows over streaming and batch tables. On batch tables you
>>>>can
>>>> compute time windows basically over every time attribute (they are
>>>> treated
>>>> similar to grouping attributes with a bit of extra logic to extract
>>>>the
>>>> grouping key for sliding and session windows). If you write
>>>> window(Tumble
>>>> over 10.minutes on 'rowtime) on a streaming table, 'rowtime would
>>>> indicate
>>>> event-time. On a batch table with a 'rowtime attribute, the same
>>>> operator
>>>> would be internally converted into a group by. By going for the
>>>>object
>>>> approach we would lose this compatibility (or would need to
>>>>introduce an
>>>> additional column attribute to specifiy the window attribute for
>>>>batch
>>>> tables).
>>>>
>>>> As usual some of the design decisions are based on preferences.
>>>> Do they make sense to you? Let me know what you think.
>>>>
>>>> Best, Fabian
>>>>
>>>>
>>>> 2016-09-07 5:12 GMT+02:00 Jark Wu
>>>> <[email protected]<mailto:[email protected]> <mailto:
>>>> [email protected]<mailto:[email protected]>>>:
>>>>
>>>> Hi all,
>>>>>
>>>>> I'm on vacation for about five days , sorry to have missed this
>>>>>great
>>>>> FLIP.
>>>>>
>>>>> Yes, the non-windowed aggregates is a special case of row-window.
>>>>>And
>>>>> the
>>>>> proposal looks really good. Can we have a simplified form for the
>>>>> special
>>>>> case? Such as : table.groupBy(‘a).rowWindow(Sl
>>>>> ideRows.unboundedPreceding).select(…)
>>>>> can be simplified to table.groupBy(‘a).select(…). The latter will
>>>>> actually
>>>>> call the former.
>>>>>
>>>>> Another question is about the rowtime. As the FLIP said,
>>>>>DataStream and
>>>>> StreamTableSource is responsible to assign timestamps and
>>>>>watermarks,
>>>>> furthermore “rowtime” and “systemtime” are not real column. IMO,
>>>>>it is
>>>>> different with Calcite’s rowtime, which is a real column in the table.
>>>>> In
>>>>> FLIP's way, we will lose some flexibility. Because the timestamp
>>>>> column may
>>>>> be created after some transformations or join operation, not
>>>>>created at
>>>>> beginning. So why do we have to define rowtime at beginning?
>>>>>(because
>>>>> of
>>>>> watermark?) Can we have a way to define rowtime after source
>>>>>table
>>>>> like
>>>>> TimestampAssinger?
>>>>>
>>>>> Regarding to “allowLateness” method. I come up a trick that we can
>>>>>make
>>>>> ‘rowtime and ‘system to be a Scala object, not a symbol expression.
>>>>> The API
>>>>> will looks like this :
>>>>>
>>>>> window(Tumble over 10.minutes on rowtime allowLateness as ‘w)
>>>>>
>>>>> The implementation will look like this:
>>>>>
>>>>> class TumblingWindow(size: Expression) extends Window {
>>>>> def on(time: rowtime.type): TumblingEventTimeWindow =
>>>>> new TumblingEventTimeWindow(alias, ‘rowtime, size) //
>>>>>has
>>>>> allowLateness() method
>>>>>
>>>>> def on(time: systemtime.type): TumblingProcessingTimeWindow=
>>>>> new TumblingProcessingTimeWindow(alias, ‘systemtime, size)
>>>>> // hasn’t allowLateness() method
>>>>> }
>>>>> object rowtime
>>>>> object systemtime
>>>>>
>>>>> What do you think about this?
>>>>>
>>>>> - Jark Wu
>>>>>
>>>>> 在 2016年9月6日,下午11:00,Timo Walther
>>>>> <[email protected]<mailto:[email protected]> <mailto:
>>>>>> [email protected]<mailto:[email protected]>>> 写道:
>>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> I thought about the API of the FLIP again. If we allow the
>>>>>> "systemtime"
>>>>>>
>>>>> attribute, we cannot implement a nice method chaining where the
>>>>>user
>>>>> can
>>>>> define a "allowLateness" only on event time. So even if the user
>>>>> expressed
>>>>> that "systemtime" is used we have to offer a "allowLateness"
>>>>>method
>>>>> because
>>>>> we have to assume that this attribute can also be the batch event
>>>>>time
>>>>> column, which is not very nice.
>>>>>
>>>>>> class TumblingWindow(size: Expression) extends Window {
>>>>>> def on(timeField: Expression): TumblingEventTimeWindow =
>>>>>> new TumblingEventTimeWindow(alias, timeField, size) // has
>>>>>>
>>>>> allowLateness() method
>>>>>
>>>>>> }
>>>>>>
>>>>>> What do you think?
>>>>>>
>>>>>> Timo
>>>>>>
>>>>>>
>>>>>> Am 05/09/16 um 10:41 schrieb Fabian Hueske:
>>>>>>
>>>>>>> Hi Jark,
>>>>>>>
>>>>>>> you had asked for non-windowed aggregates in the Table API a few
>>>>>>> times.
>>>>>>> FLIP-11 proposes row-window aggregates which are a
>>>>>>>generalization of
>>>>>>> running aggregates (SlideRow unboundedPreceding).
>>>>>>>
>>>>>>> Can you have a look at the FLIP and give feedback whether this
>>>>>>>is
>>>>>>> what
>>>>>>>
>>>>>> you
>>>>>
>>>>>> are looking for?
>>>>>>> Improvement suggestions are very welcome as well.
>>>>>>>
>>>>>>> Thank you,
>>>>>>> Fabian
>>>>>>>
>>>>>>> 2016-09-01 16<tel:2016-09-01%C2%A016>:12 GMT+02:00 Timo Walther
>>>>>>> <[email protected]<mailto:[email protected]> <mailto:
>>>>>>> [email protected]<mailto:[email protected]>>>:
>>>>>>>
>>>>>>> Hi all!
>>>>>>>>
>>>>>>>> Fabian and I worked on a FLIP for Stream Aggregations in the
>>>>>>>>Table
>>>>>>>> API.
>>>>>>>> You can find the FLIP-11 here:
>>>>>>>>
>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%<https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%25>
>>>>>>>> <
>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%<https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%25>>
>>>>>>>> 3A+Table+API+Stream+Aggregations
>>>>>>>>
>>>>>>>> Motivation for the FLIP:
>>>>>>>>
>>>>>>>> The Table API is a declarative API to define queries on static
>>>>>>>>and
>>>>>>>> streaming tables. So far, only projection, selection, and union
>>>>>>>>are
>>>>>>>> supported operations on streaming tables.
>>>>>>>>
>>>>>>>> This FLIP proposes to add support for different types of
>>>>>>>> aggregations
>>>>>>>>
>>>>>>> on
>>>>>
>>>>>> top of streaming tables. In particular, we seek to support:
>>>>>>>>
>>>>>>>> - Group-window aggregates, i.e., aggregates which are computed
>>>>>>>>for a
>>>>>>>>
>>>>>>> group
>>>>>
>>>>>> of elements. A (time or row-count) window is required to bound
>>>>>>the
>>>>>>>>
>>>>>>> infinite
>>>>>
>>>>>> input stream into a finite group.
>>>>>>>>
>>>>>>>> - Row-window aggregates, i.e., aggregates which are computed
>>>>>>>>for
>>>>>>>> each
>>>>>>>>
>>>>>>> row,
>>>>>
>>>>>> based on a window (range) of preceding and succeeding rows.
>>>>>>>> Each type of aggregate shall be supported on keyed/grouped or
>>>>>>>> non-keyed/grouped data streams for streaming tables as well as
>>>>>>>>batch
>>>>>>>>
>>>>>>> tables.
>>>>>
>>>>>> We are looking forward to your feedback.
>>>>>>>>
>>>>>>>> Timo
>>>>>>>>
>>>>>>>>
>>>>>> --
>>>>>> Freundliche Grüße / Kind Regards
>>>>>>
>>>>>> Timo Walther
>>>>>>
>>>>>> Follow me: @twalthr
>>>>>> https://www.linkedin.com/in/twalthr
>>>>>><https://www.linkedin.com/in/t
>>>>>> walthr>
>>>>>>
>>>>>
>>>>>
>>
>
> --
> Freundliche Grüße / Kind Regards
>
> Timo Walther
>
> Follow me: @twalthr
> https://www.linkedin.com/in/twalthr
>
>