Hi everyone,

I think syntax in general is a question of taste, it will be hard to make everyone happy. On the one hand it would be great if Table API and SQL could look consistent, but on the other hand there are also some negative aspects:

SQL is a language that has not been developed for todays needs and Stream SQL will basically be a "hack" e.g. by using UDFs like TUMBLE, HOP etc. However, the Table API is a newly designed API and does not need the same hacky solutions.

The Table API should be a fluent API for both Scala and Java. If we are moving windows into the groupBy() call, the question is how this would look like:

.groupBy('col, tumble(12.hours, 'rowtime, 'alias)) OR .groupBy('col, Tumble over 12.hours on 'rowtime as 'alias)

In Java the window definitions would then be defined a string instead of method calls, so it is easier to for the user to make mistakes and there is no Javadoc with explanation.

I think we should decide whether a window is an operator or an expression. If it is an expression we can also allow window definition in .over() clauses. What do you think?

I support the idea of introducing partitionBy().


Am 13/10/16 um 13:04 schrieb Zhangrucong:
Hi Fabian:
         What is the strategy for new syntax which calcite does not support? 
The calcite will support it? For example, the row window syntax.

Thank you very much!

发件人: Fabian Hueske [mailto:fhue...@gmail.com]
发送时间: 2016年10月13日 18:17
收件人: dev@flink.apache.org
抄送: Sean Wang; Timo Walther
主题: Re: 答复: RE:[DISCUSS] FLIP-11: Table API Stream Aggregations

Hi Zhangrucong,

yes, we want to use Calcite's SQL parser including its window syntax, i.e.,

- the standard SQL OVER windows (in streaming with a few restriction such as no 
different partitionings or orders)
- the GroupBy window functions (TUMBLE, HOP, SESSION).

The GroupBy window function are not implemented in Calcite yet. There is
CALCITE-1345 [1] to track the issue.

As Shaoxuan mentioned, we are not using the STREAM keyword to be SQL compliant.

Best, Fabian

[1] https://issues.apache.org/jira/browse/CALCITE-1345

2016-10-13 12:05 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:

Hi everybody,

happy to see a good discussion here :-) I'll reply to Shaoxuan's mail
first and comment on Zhangrucong question in a separate mail.

Shaoxuan, thanks for the suggestions! I think we all agree that for
SQL we should definitely follow the standard (batch) SQL syntax.
In my opinion, the Table API does not necessarily have to be as close
as possible to SQL but should try to make a few things easier and also
safer (easier is of course subjective).

- GroupBy without windows: These are currently intentionally not
supported and also not part of FLIP-11. Our motivation for not
supporting this, is to guard the user from defining a query that fails
when being executed due to a very memory consuming operation. FLIP-11
provides a way to define such a query as a sliding row window with
unbounded preceding rows. With the upcoming SQL proposal, queries that
consume unbounded memory should be identified and rejected. I would be
in favor of allowing groupBy without windows once the guarding mechanism are in 

- GroupBy with window: I think this is a question of taste. Having a
window() call, makes the feature more explicit in my opinion. However,
I'm not opposed to move the windows into the groupBy clause.
Implementation-wise it should be easy to move the window definition
into to groupBy clause for the Scala Table API. For the Java Table API
we would need to extend the parser quite a bit because windows would
need to be defined as Strings and not via objects.

- RowWindows: The rowWindow() call mimics the standard SQL WINDOW
clause (implemented by PostgreSQL and Calcite) which allows to have "reusable"
window definitions. I think this is a desirable feature. In the
FLIP-11 proposal the over() clause in select() refers to the
predefined windows with aliases. In case only one window is defined,
the over() clause is optional and the same (and only) window is
applied to all aggregates. I think we can make the over() call
mandatory to have the windowing more explicit. It should also be
possible to extend the over clause to directly accept RowWindows
instead of window aliases. I would not make this a priority at the
moment, but a feature that could be later added, because
rowWindow() and over() cover all cases. Similar as for GroupBy with
windows, we would need to extend the parser for the Java Table API though.

Finally, I have an own suggestion:
In FLIP-11, groupBy() is  used to define the partitioning of
RowWindows. I think this should be changed to partitionBy() because
groupBy() groups data and applies an aggregation to all rows of a
group which is not happening here. In original SQL, the OVER clause
features a PARTITION BY clause. We are moving this out of the window
definition, i.e., OVER clause, to enforce the same partitioning for
all windows (different partitionings would be a challenge to execute in a 
parallel system).

@Timo and all: What do you think about:

- moving windows into the groupBy() call
- making over() for rowWindow() with a single window definition
- additionally allowing window definitions in over()
- using partitionBy() instead of groupBy() for row windows?

Best, Fabian

2016-10-13 11:10 GMT+02:00 Zhangrucong <zhangruc...@huawei.com>:

Hi shaoxuan:

I think,  the streamsql must be excuted in table environment. So I
call this table API ‘s StreamSQL. What do you call for this, stream
Table API or streamsql? It is fu

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tblEnv = TableEnvironment.getTableEnvironment(env)
val ds: DataStream[(String,Long, Long)] =
env.readTextFile("/home/demo") tblEnv.registerDataStream("Order", ds, 'userID, 
'count, 'num)
     .map(f=>(f, 1L, 1L))
val sql = tblEnv.sql("SELECT Stream * FROM Order WHERE userID='A'")

So in my opinion, the grammar which is marked red should be
compatible with calcite's StreamSQL grammar.

By the way,  thanks very much for telling me the modified content in
Flink StreamSQL. I will look the new proposal .

发件人: Sean Wang [mailto:wshaox...@gmail.com]
发送时间: 2016年10月13日 16:29
收件人: dev@flink.apache.org; Zhangrucong
主题: Re: RE:[DISCUSS] FLIP-11: Table API Stream Aggregations

Hi  zhangrucong,
I am not sure what you mean by "table API'S StreamSQL", I guess you
mean "stream TableAPI"?
TableAPI should be compatible with calcite SQL. (By compatible, My
understanding is that both TableAPI and SQL will be translated to the
same logical plan - the same set of REL and REX).
BTW, please note that we recently have merged a change to remove
STREAM keyword for flink stream SQL(FLINK-4546). In our opinion,
batch and stream are not necessarily to be differentiated at the SQL
level. The major difference between batch and stream is "WHEN and HOW to emit the 
We have been working on a new proposal with Fabian on this change. I
guess it will be sent out for review very soon.


On Thu, Oct 13, 2016 at 2:29 PM, Zhangrucong <zhangruc...@huawei.com
<mailto:zhangruc...@huawei.com>> wrote:
Hi shaoxuan:
Does the table API'S StreamSQL grammar is compatible with calcite's
StreamSQL grammar?

1、In calcite, the tumble window is realized by using function tumble
or hop. And the function must be used with group by, like this:

   TUMBLE_END(rowtime, INTERVAL '30' MINUTE, TIME '0:12') AS rowtime,
   COUNT(*) AS c,
   SUM(units) AS units
FROM Orders

2、 The sliding window uses keywords "window" and "over". Like this:

   SELECT STREAM rowtime,
     AVG(units) OVER product (RANGE INTERVAL '10' MINUTE PRECEDING) AS m10,
     AVG(units) OVER product (RANGE INTERVAL '7' DAY PRECEDING) AS d7
   FROM Orders
   WINDOW product AS (
     ORDER BY rowtime
     PARTITION BY productId))


发件人: 王绍翾(大沙)
发送时间: 2016年10月13日 2:03
收件人: dev@flink.apache.org<mailto:dev@flink.apache.org>
主题: RE:[DISCUSS] FLIP-11: Table API Stream Aggregations

Hi Fabian, Timo, and Jark.Thanks for kicking off this FLIP. This is a
really great and promising proposal. I have a few comments to the "window"
operator proposed in this FLIP (I am hoping it is not too late to
bring up this). First, window is not always needed for the stream
aggregation. There are cases where we want do an aggreation on a
stream, while the query/emit strategy decides when to emit a
streaming output. Second, window is needed when we want do an
aggregation for a certain rage, but window is not an operator. We
basically use window to define the range for aggregation. In tableAPI, a window should be defined 
together with "groupby" and "select"
operators, either inside a "groupby" operator or after an "over"
clause in "select" operator. This will make the TableAPI in the similar manner 
as SQL.
For instance,[A groupby without window] <Table API> val res = tab
.select(‘a, ‘b.sum)
FROM tab
[A tumble window inside groupby]
<Table API>val res = tab
.groupBy(‘a, tumble(10.minutes, ‘rowtime)) .select(‘a, ‘b.sum)
<SQL>SELECT a, SUM(b)FROM tab GROUP BY a, TUMBLE(10.minutes ,
‘rowtime) [A row tumble window after OVER] <Table API>.groupby('a)
//optional .select(‘a, ‘b.count over rowTumble(10.minutes,
‘rowtime))<SQL>SELECT a,
COUNT(b) OVER ROWTUMBLE(10.minutes, ‘rowtime)FROM tab GROUP BY a
Please let me know what you think.
21:13收件人:dev@flink.apache.org<mailto:dev@flink.apache.org> <
dev@flink.apache.org<mailto:dev@flink.apache.org>>主 题:Re: [DISCUSS]
FLIP-11: Table API Stream Aggregations Hi everybody,

Timo proposed our FLIP-11 a bit more than three weeks ago.
I will update the status of the FLIP to accepted.


2016-09-19 9:16 GMT+02:00 Timo Walther <twal...@apache.org<mailto:twa

Hi Jark,

yes I think enough time has passed. We can start implementing the
What do you think Fabian?

If there are no objections, I will create the subtasks in Jira today.
FLIP-11/1 I already have implemented a prototype, I just have to do
some  refactoring/documentation before opening a PR.


Am 18/09/16 um 04:46 schrieb Jark Wu:

Hi all,
It seems that there’s no objections to the window design. So could
we open subtasks to start working on it now ?

- Jark Wu

在 2016年9月7日,下午4:29,Jark Wu <wuchong...@alibaba-inc.com<mailto:
wuchong...@alibaba-inc.com>> 写道:
Hi Fabian,

Thanks for sharing your ideas.

They all make sense to me. Regarding to reassigning timestamp, I
do not  have an use case. I come up with this because DataStream
has a  TimestampAssigner :)

+1 for this FLIP.

- Jark Wu

在 2016年9月7日,下午2:59,Fabian Hueske <fhue...@gmail.com<mailto:fhue
s...@gmail.com> <mailto:
fhue...@gmail.com<mailto:fhue...@gmail.com>>> 写道:


thanks for your comments and questions!
Actually, you are bringing up the points that Timo and I
discussed the  most  when designing the FLIP ;-)

- We also thought about the syntactic shortcut for running
aggregates  like  you proposed (table.groupBy(‘a).select(…)). Our
motivation to not allow  this shortcut is to prevent users from
accidentally performing a  "dangerous" operation. The problem
with unbounded sliding row-windows is  that their state does
never expire. If you have an evolving key space,  you  will
likely run into problems at some point because the operator state
grows too large. IMO, a row-window session is a better approach,
because it  defines a timeout after which state can be discarded.
groupBy.select is
very common operation in batch but its semantics in streaming
are very  different. In my opinion it makes sense to make users
aware of these  differences through the API.

- Reassigning timestamps and watermarks is a very delicate issue.
right, that Calcite exposes this field which is necessary due to
the  semantics of SQL. However, also in Calcite you cannot freely
choose the  timestamp attribute for streaming queries (it must be
a monotone or  quasi-monotone attribute) which is hard to reason
about (and
after a few operators have been applied. Streaming tables in
Flink will  likely have a time attribute which is identical to
the initial
However, Flink does modify timestamps internally, e.g., for
records that  are emitted from time windows, in order to ensure
that consecutive  windows  perform as expected. Modify or
reassign timestamps in the middle of a  job  can result in
unexpected results which are very hard to reason about. Do  you
have a concrete use case in mind for reassigning timestamps?

- The idea to represent rowtime and systime as object is good.
Our  motivation to go for reserved Scala symbols was to have a
uniform syntax  with windows over streaming and batch tables. On
batch tables you can  compute time windows basically over every
time attribute (they are  treated  similar to grouping attributes
with a bit of extra logic to extract the  grouping key for
sliding and session windows). If you write  window(Tumble  over
10.minutes on 'rowtime) on a streaming table, 'rowtime would
indicate  event-time. On a batch table with a 'rowtime attribute,
the same  operator  would be internally converted into a group
by. By going for the object  approach we would lose this
compatibility (or would need to introduce an  additional column
attribute to specifiy the window attribute for batch  tables).

As usual some of the design decisions are based on preferences.
Do they make sense to you? Let me know what you think.

Best, Fabian

2016-09-07 5:12 GMT+02:00 Jark Wu <wuchong...@alibaba-inc.com<ma
ilto:wuchong...@alibaba-inc.com> <mailto:

Hi all,
I'm on vacation for about five days , sorry to have missed this
great  FLIP.

Yes, the non-windowed aggregates is a special case of row-window.
proposal looks really good.  Can we have a simplified form for
the  special  case? Such as : table.groupBy(‘a).rowWindow(Sl
can be simplified to  table.groupBy(‘a).select(…). The latter
will  actually  call the former.

Another question is about the rowtime. As the FLIP said,
DataStream and  StreamTableSource is responsible to assign
timestamps and watermarks,  furthermore “rowtime” and
“systemtime” are not real column. IMO, it is  different with
Calcite’s rowtime, which is a real column in the
FLIP's way, we will lose some flexibility. Because the timestamp
column may
be created after some transformations or join operation, not
created at
beginning. So why do we have to define rowtime at beginning?
watermark?)     Can we have a way to define rowtime after source

Regarding to “allowLateness” method. I come up a trick that we can
‘rowtime and ‘system to be a Scala object, not a symbol expression.
will looks like this :

window(Tumble over 10.minutes on rowtime allowLateness as ‘w)

The implementation will look like this:

class TumblingWindow(size: Expression) extends Window {
   def on(time: rowtime.type): TumblingEventTimeWindow =
       new TumblingEventTimeWindow(alias, ‘rowtime, size)        //
allowLateness() method

   def on(time: systemtime.type): TumblingProcessingTimeWindow=
      new TumblingProcessingTimeWindow(alias, ‘systemtime, size)
// hasn’t allowLateness() method
object rowtime
object systemtime

What do you think about this?

- Jark Wu

在 2016年9月6日,下午11:00,Timo Walther <twal...@apache.org<mailto:twa
l...@apache.org> <mailto:
twal...@apache.org<mailto:twal...@apache.org>>> 写道:

Hi all,

I thought about the API of the FLIP again. If we allow the

attribute, we cannot implement a nice method chaining where the
define a "allowLateness" only on event time. So even if the user
that "systemtime" is used we have to offer a "allowLateness"
we have to assume that this attribute can also be the batch event
column, which is not very nice.

class TumblingWindow(size: Expression) extends Window {
def on(timeField: Expression): TumblingEventTimeWindow =
    new TumblingEventTimeWindow(alias, timeField, size) // has

allowLateness() method


What do you think?


Am 05/09/16 um 10:41 schrieb Fabian Hueske:

Hi Jark,

you had asked for non-windowed aggregates in the Table API a few
FLIP-11 proposes row-window aggregates which are a
generalization of
running aggregates (SlideRow unboundedPreceding).

Can you have a look at the FLIP and give feedback whether this

are looking for?
Improvement suggestions are very welcome as well.

Thank you,

2016-09-01 16<tel:2016-09-01%C2%A016>:12 GMT+02:00 Timo Walther <
twal...@apache.org<mailto:twal...@apache.org> <mailto:

Hi all!
Fabian and I worked on a FLIP for Stream Aggregations in the
You can find the FLIP-11 here:

ttps://cwiki.apache.org/confluence/display/FLINK/FLIP-11%25> <

Motivation for the FLIP:

The Table API is a declarative API to define queries on static
streaming tables. So far, only projection, selection, and union
supported operations on streaming tables.

This FLIP proposes to add support for different types of

top of streaming tables. In particular, we seek to support:
- Group-window aggregates, i.e., aggregates which are computed
for a

of elements. A (time or row-count) window is required to bound
input stream into a finite group.
- Row-window aggregates, i.e., aggregates which are computed

based on a window (range) of preceding and succeeding rows.
Each type of aggregate shall be supported on keyed/grouped or
non-keyed/grouped data streams for streaming tables as well as

We are looking forward to your feedback.

Freundliche Grüße / Kind Regards

Timo Walther

Follow me: @twalthr

Freundliche Grüße / Kind Regards

Timo Walther

Follow me: @twalthr

Reply via email to