Thanks everybody for the input. I updated the FLIP-11 document [1] and did the following changes:
- made over() mandatory also for single row windows - removed allowedLateness() from windows which would mean to add some kind of retraction output mode. This is out-of-scope for FLIP-11 and is addressed in the Stream SQL design doc [2]. - removed the 'systemtime keyword to distinguish event-time and processing-time on a syntactical level (proposed by Timo in PR #2562 [3]). I did *not* do the following changes that were discussed in this thread: - change groupBy() to partitionBy() for row windows - allow window specification in groupBy() - allow row window specification in over() - allow groupBy() without group window on streaming tables Best, Fabian [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations [2] https://docs.google.com/document/d/1qVVt_16kdaZQ8RTfA_f4konQPW4tnl8THw6rzGUdaqU [3] https://github.com/apache/flink/pull/2562 2016-11-02 4:49 GMT+01:00 Sean Wang <wshaox...@gmail.com>: > Hi Stephan and Fabian, > Thanks for the replies and very happy to see that my questions have raised > a good discussion here. This is more than just FLIP-11, but the future of > flink SQL (glad all of us agree to use the standard compliant SQL > interface) and the roll of Table API. > The syntax of TableAPI is just one's favour, but IMO, the tradeoff between > SQL syntax and TableAPI flexibility should be well considered during the > design. I hope to see a powerful, concise, and compatible TableAPI. > I agree with your other comments on Flip11. > > Regards, > Shaoxuan > > > On Fri, Oct 28, 2016 at 11:09 PM, Fabian Hueske <fhue...@gmail.com> wrote: > > > Thanks for bringing up this point Stephan. > > > > I think it is a good decision to have a standard compliant SQL interface > > that SQL developers and tools can use. The SQL proposal [1] I posted a > few > > days ago follows this approach. > > > > However, SQL was not designed with stream processing in mind and many > > important streaming concepts are either cumbersome to express or not at > all > > expressible with plain SQL (see Tyler Akidau's [2] proposal for > instance). > > I think the Table API has the chance to improve a lot of these issues by > > not following the SQL standard too closely. > > > > +1 for defining window aggregates with the explicit the window() and > > rowWindow() clauses. > > > > +1 for sticking to groupBy() instead of partitionBy(). > > > > There are a few more ideas that can make the Table API better suited for > > streaming such as: > > > > - have built-in emission and refinement strategies (see StreamSQL doc > [1]) > > - easier joins against time-variant tables (Temporal table proposal of > > Julian Hyde [3]) > > - support for DataStream-like UDFs > > > > These issues should be discussed separately though. > > > > Best, > > Fabian > > > > [1] > > https://docs.google.com/document/d/1qVVt_16kdaZQ8RTfA_f4konQ > > PW4tnl8THw6rzGUdaqU > > [2] > > https://docs.google.com/document/d/1tSey4CeTrbb4VjWvtSA78OcU > > 6BERXXDZ3t0HzSLij9Q > > [3] > > https://docs.google.com/document/d/1RvnLEEQK92axdAaZ9XIU5szp > > kbGqFMBtzYiIY4dHe0Q > > > > 2016-10-28 10:05 GMT+02:00 Jark Wu <wuchong...@alibaba-inc.com>: > > > > > Hi , > > > > > > I agree that Table API should be a SQL-like API, that do not need to be > > > strictly consistent with SQL. > > > > > > > - Not put the window definition into the groupBy clause. > > > > > > +1 Putting the window clause out of groupBy, will be easy to learn and > > > easy to discover for users. > > > > > > > I like the idea of having separate ".window()" and ".rowWindow()" > > > > clauses. > > > > > > +1 > > > > > > > I would prefer to not have a "partitionBy" statement. > > > > > > As far as I know, the partitionBy is playing the same role with groupBy > > in > > > Table API. So maybe groupBy is enough, there is no need to introduce a > > new > > > clause. > > > > > > > > > - Jark Wu > > > > > > > 在 2016年10月27日,上午2:22,Stephan Ewen <se...@apache.org> 写道: > > > > > > > > Hi all! > > > > > > > > I think that in order to get a better hold on how we what to build > the > > > > Table API, we need to *decide what the role of the Table API should > > be*. > > > We > > > > touched on that a few times, but I think we still have different > ideas > > > > about that. > > > > > > > > To get there, let me take back a step and look at the design of > Stream > > > SQL > > > > again. There were basically two competing approaches: > > > > > > > > (1) Keep SQL as it is and make it run on infinite streams via > > introducing > > > > dynamic tables > > > > (2) Do a new language that is similar to SQL, but designed with > > streaming > > > > concepts in mind (first class support for time and windows) > > > > > > > > Both approaches had good points. The Stream SQL design doc posted > > > followed > > > > approach (1) - keep SQL as it is. > > > > > > > > > > > > > > > > Now, for the Table API, we seem to be having a similar discussion > > again. > > > > > > > > (1) Let the Table API be as similar to SQL as possible, simply make > it > > > feel > > > > "fluently embedded" in Scala. > > > > (2) Define the Table API as one would define a new and clean DSL for > > > > streaming. SQL inspired, of course. Where SQL syntax feels natural, > use > > > the > > > > SQL syntax, but make it very accessible to Java/Scala (non-SQL) > > > programmers. > > > > > > > > > > > > I am personally more in favor of variant (2) for the following > reasons: > > > > > > > > - We already have SQL compliant with the standards and tools. > > > > > > > > - Mirroring SQL too closely into the Table API has the marginal > > benefit > > > > that someone close to SQL will find it a bit more familiar. Not sure > if > > > > that is even the case, as they have to re-learn the fluent DSL and > > Scala > > > > concepts > > > > > > > > - We are making it more difficult for all those that come from a > more > > > > Scala/Java DataStream background and simply want to move "a layer > up", > > > > getting schema and more optimizations into the equation. > > > > > > > > > > > > > > > > > > > > What would that mean for the specific issues that are discussed in > > > FLIP-11? > > > > Based on interpreting the Table API as re-imaged streaming DSL, I > would > > > > suggest to > > > > > > > > - Not put the window definition into the groupBy clause. It just is > > > > unexpected for all that are not super familiar with SQL and hard to > > > > discover in the IDE. A separate window clause is simpler for users > > coming > > > > from the DataStream background (or other streaming APIs) and it is > more > > > > discoverable in the IDE. > > > > > > > > - I like the idea of having separate ".window()" and ".rowWindow()" > > > > clauses. Makes it more explicit that very different things will > happen. > > > > > > > > - I would prefer to not have a "partitionBy" statement. When we > > restrain > > > > the Table API at least initially to having one partitioning for the > > > > windows, we should be able to express the partitioning by simply > adding > > > it > > > > to the fields in the "groupBy" clause. That would make the API easier > > > > accessible to users that not SQL powerusers. > > > > > > > > > > > > What do others think? > > > > > > > > Greetings, > > > > Stephan > > > > > > > > > > > > On Sat, Oct 15, 2016 at 1:02 AM, Fabian Hueske <fhue...@gmail.com> > > > wrote: > > > > > > > >> Thanks for your reply Shaoxuan! > > > >> > > > >> Please see my replies below. > > > >> > > > >> Best, Fabian > > > >> > > > >> 2016-10-14 11:34 GMT+02:00 Sean Wang <wshaox...@gmail.com>: > > > >> > > > >>> Thanks for your quick reply, Fabian. > > > >>> > > > >>> I have a few minor comments&suggestions: > > > >>> > > > >>> <GroupBy with window> > > > >>> - Agree that we should consider GroupBy without window after the > new > > > SQL > > > >>> proposal is settled down. > > > >>> > > > >>> > > > >> OK, so we keep this as it is for now. GroupBy without windows will > be > > > >> supported later when we are able to "guard" the memory requirements > of > > > that > > > >> operation. > > > >> > > > >> > > > >>> <GroupBy with window> > > > >>> - For Java API, we can keep window() call, and put window alias > into > > > >>> Groupby clause. This can be also applied to rowwindow case. > > > >>> > > > >>> > > > >> Referring to the window alias in the groupBy clause would require to > > > invert > > > >> the methods, i.e., groupBy().window() -> window().groupBy(). I am > not > > > sure > > > >> if that is more intuitive. Also, Scala and Java are using the same > > class > > > >> (Table) but different methods (Java uses String parameter, Scala > > > Expression > > > >> parameters). In my opinion it makes sense to have both APIs closely > > > synced. > > > >> So I would either keep window() (after groupBy) for Scala and Java > or > > > >> remove it for both. > > > >> > > > >> > > > >>> <RowWindows> & <partitionby() for rowwindow> > > > >>> -+1 to support replace groupby() by partitionby(). BTW, in the case > > of > > > >>> over, instead of partitionby, are we going to support orderby? If > > yes, > > > I > > > >>> would suggest to define rowwindow as rowwindow(PartionByParaType, > > > >> OrderBy > > > >>> ParaType, WindowParaType). > > > >>> > > > >>> > > > >> The current FLIP-11 proposal supports defining both partitionBy and > > > orderBy > > > >> (with a few restrictions). > > > >> PartitionBy is defined for all windows alike by calling > > > >> > > > >> table.partitionBy(...).rowWindow(Window1 as w1, Window2 as > > > >> w2).select(count() over w1). > > > >> > > > >> Allowing windows with different partitioning would mean that data is > > > >> shuffled to different nodes and that we need a distributed join to > > > assemble > > > >> the result rows. In principle this could be done but would be very > > > >> expensive to execute (applies to batch and streaming). In my > opinion, > > we > > > >> should not support this case. > > > >> > > > >> OrderBy is implicitly supported by the on() clause of RowWindows, > > e.g., > > > >> > > > >> rowWindow(TumbleRows over 10.minutes on ‘rowtime as ‘w) > > > >> > > > >> says that the window is ordered on the rowtime, i.e., event-time, > > > >> attribute. For streaming we can only allow event-time order (or > > > >> processing-time order which is always given). Orders on other > > attributes > > > >> would not be possible (for infinite dynamic input tables) or very > > > expensive > > > >> (memory and computation wise) to maintain (for finite dynamic input > > > >> tables). For queries on batch tables, in principle all orders are > > > possible. > > > >> With the current proposal only count windows are supported for > > arbitrary > > > >> attribute types and time windows for timestamp attributes. > > > >> > > > >> So > > > >>> - moving windows into the groupBy() call : +1 > > > >>> > > > >> > > > >> +1 > > > >> > > > >> > > > >>> - making over() for rowWindow() with a single window definition. > > > >>> > > > >> > > > >> +1 > > > >> > > > >> > > > >>> - additionally allowing window definitions in over(): +1 yes for > > > scala, > > > >>> but use alias for java API. > > > >>> > > > >> > > > >> If we have the parser code for Java group windows in groupBy() it > > > should be > > > >> easy to adapt this for over(). But we should also keep the rowWindow > > > method > > > >> to define aliases. > > > >> > > > >> > > > >>> - using partitionBy() instead of groupBy() for row windows?: +1, > but > > > >>> better to consider orderby, it may be even better to move > > partitionBy() > > > >>> into rowwindow. > > > >>> > > > >>> > > > >> +1 to change groupBy() to partitionBy(). > > > >> I would not move partitionBy() into the RowWindow definition but > keep > > it > > > >> outside to ensure only one partitioning is defined. The orderBy > > > definition > > > >> is already fluently included in the RowWindow via the on() method. > > > >> > > > >> > > > >> > > > >>> Regards, > > > >>> Shaoxuan > > > >>> > > > >>> > > > >>> On Thu, Oct 13, 2016 at 6:05 PM, Fabian Hueske <fhue...@gmail.com> > > > >> wrote: > > > >>> > > > >>>> Hi everybody, > > > >>>> > > > >>>> happy to see a good discussion here :-) > > > >>>> I'll reply to Shaoxuan's mail first and comment on Zhangrucong > > > question > > > >>>> in a separate mail. > > > >>>> > > > >>>> Shaoxuan, thanks for the suggestions! I think we all agree that > for > > > SQL > > > >>>> we should definitely follow the standard (batch) SQL syntax. > > > >>>> In my opinion, the Table API does not necessarily have to be as > > close > > > as > > > >>>> possible to SQL but should try to make a few things easier and > also > > > >> safer > > > >>>> (easier is of course subjective). > > > >>>> > > > >>>> - GroupBy without windows: These are currently intentionally not > > > >>>> supported and also not part of FLIP-11. Our motivation for not > > > >> supporting > > > >>>> this, is to guard the user from defining a query that fails when > > being > > > >>>> executed due to a very memory consuming operation. FLIP-11 > provides > > a > > > >> way > > > >>>> to define such a query as a sliding row window with unbounded > > > preceding > > > >>>> rows. With the upcoming SQL proposal, queries that consume > unbounded > > > >> memory > > > >>>> should be identified and rejected. I would be in favor of allowing > > > >> groupBy > > > >>>> without windows once the guarding mechanism are in place. > > > >>>> > > > >>>> - GroupBy with window: I think this is a question of taste. > Having a > > > >>>> window() call, makes the feature more explicit in my opinion. > > However, > > > >> I'm > > > >>>> not opposed to move the windows into the groupBy clause. > > > >>>> Implementation-wise it should be easy to move the window > definition > > > >> into to > > > >>>> groupBy clause for the Scala Table API. For the Java Table API we > > > would > > > >>>> need to extend the parser quite a bit because windows would need > to > > be > > > >>>> defined as Strings and not via objects. > > > >>>> > > > >>>> - RowWindows: The rowWindow() call mimics the standard SQL WINDOW > > > clause > > > >>>> (implemented by PostgreSQL and Calcite) which allows to have > > > "reusable" > > > >>>> window definitions. I think this is a desirable feature. In the > > > FLIP-11 > > > >>>> proposal the over() clause in select() refers to the predefined > > > windows > > > >>>> with aliases. In case only one window is defined, the over() > clause > > is > > > >>>> optional and the same (and only) window is applied to all > > aggregates. > > > I > > > >>>> think we can make the over() call mandatory to have the windowing > > more > > > >>>> explicit. It should also be possible to extend the over clause to > > > >> directly > > > >>>> accept RowWindows instead of window aliases. I would not make > this a > > > >>>> priority at the moment, but a feature that could be later added, > > > because > > > >>>> rowWindow() and over() cover all cases. Similar as for GroupBy > with > > > >>>> windows, we would need to extend the parser for the Java Table API > > > >> though. > > > >>>> > > > >>>> Finally, I have an own suggestion: > > > >>>> In FLIP-11, groupBy() is used to define the partitioning of > > > RowWindows. > > > >>>> I think this should be changed to partitionBy() because groupBy() > > > groups > > > >>>> data and applies an aggregation to all rows of a group which is > not > > > >>>> happening here. In original SQL, the OVER clause features a > > PARTITION > > > BY > > > >>>> clause. We are moving this out of the window definition, i.e., > OVER > > > >> clause, > > > >>>> to enforce the same partitioning for all windows (different > > > >> partitionings > > > >>>> would be a challenge to execute in a parallel system). > > > >>>> > > > >>>> @Timo and all: What do you think about: > > > >>>> > > > >>>> - moving windows into the groupBy() call > > > >>>> - making over() for rowWindow() with a single window definition > > > >>>> - additionally allowing window definitions in over() > > > >>>> - using partitionBy() instead of groupBy() for row windows? > > > >>>> > > > >>>> Best, Fabian > > > >>>> > > > >>>> 2016-10-13 11:10 GMT+02:00 Zhangrucong <zhangruc...@huawei.com>: > > > >>>> > > > >>>>> Hi shaoxuan: > > > >>>>> > > > >>>>> I think, the streamsql must be excuted in table environment. So > I > > > call > > > >>>>> this table API ‘s StreamSQL. What do you call for this, stream > > Table > > > >> API or > > > >>>>> streamsql? It is fu > > > >>>>> > > > >>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment > > > >>>>> val tblEnv = TableEnvironment.getTableEnvironment(env) > > > >>>>> val ds: DataStream[(String,Long, Long)] = > > > >> env.readTextFile("/home/demo") > > > >>>>> tblEnv.registerDataStream("Order", ds, 'userID, 'count, 'num) > > > >>>>> .map(f=>(f, 1L, 1L)) > > > >>>>> val sql = tblEnv.sql("SELECT Stream * FROM Order WHERE > userID='A'") > > > >>>>> > > > >>>>> So in my opinion, the grammar which is marked red should be > > > compatible > > > >>>>> with calcite's StreamSQL grammar. > > > >>>>> > > > >>>>> By the way, thanks very much for telling me the modified content > > in > > > >>>>> Flink StreamSQL. I will look the new proposal . > > > >>>>> > > > >>>>> Thanks! > > > >>>>> 发件人: Sean Wang [mailto:wshaox...@gmail.com] > > > >>>>> 发送时间: 2016年10月13日 16:29 > > > >>>>> 收件人: dev@flink.apache.org; Zhangrucong > > > >>>>> 主题: Re: RE:[DISCUSS] FLIP-11: Table API Stream Aggregations > > > >>>>> > > > >>>>> Hi zhangrucong, > > > >>>>> I am not sure what you mean by "table API'S StreamSQL", I guess > you > > > >> mean > > > >>>>> "stream TableAPI"? > > > >>>>> TableAPI should be compatible with calcite SQL. (By compatible, > My > > > >>>>> understanding is that both TableAPI and SQL will be translated to > > the > > > >> same > > > >>>>> logical plan - the same set of REL and REX). > > > >>>>> BTW, please note that we recently have merged a change to remove > > > STREAM > > > >>>>> keyword for flink stream SQL(FLINK-4546). In our opinion, batch > and > > > >> stream > > > >>>>> are not necessarily to be differentiated at the SQL level. The > > major > > > >>>>> difference between batch and stream is "WHEN and HOW to emit the > > > >> result". > > > >>>>> We have been working on a new proposal with Fabian on this > change. > > I > > > >>>>> guess it will be sent out for review very soon. > > > >>>>> > > > >>>>> Regards, > > > >>>>> Shaoxuan > > > >>>>> > > > >>>>> > > > >>>>> On Thu, Oct 13, 2016 at 2:29 PM, Zhangrucong < > > zhangruc...@huawei.com > > > >>>>> <mailto:zhangruc...@huawei.com>> wrote: > > > >>>>> Hi shaoxuan: > > > >>>>> Does the table API'S StreamSQL grammar is compatible with > calcite's > > > >>>>> StreamSQL grammar? > > > >>>>> > > > >>>>> > > > >>>>> 1、In calcite, the tumble window is realized by using function > > tumble > > > or > > > >>>>> hop. And the function must be used with group by, like this: > > > >>>>> > > > >>>>> SELECT > > > >>>>> TUMBLE_END(rowtime, INTERVAL '30' MINUTE, TIME '0:12') AS > rowtime, > > > >>>>> productId, > > > >>>>> COUNT(*) AS c, > > > >>>>> SUM(units) AS units > > > >>>>> FROM Orders > > > >>>>> GROUP BY TUMBLE(rowtime, INTERVAL '30' MINUTE, TIME '0:12'), > > > >>>>> productId; > > > >>>>> > > > >>>>> 2、 The sliding window uses keywords "window" and "over". Like > this: > > > >>>>> > > > >>>>> SELECT * > > > >>>>> FROM ( > > > >>>>> SELECT STREAM rowtime, > > > >>>>> productId, > > > >>>>> units, > > > >>>>> AVG(units) OVER product (RANGE INTERVAL '10' MINUTE PRECEDING) > > AS > > > >>>>> m10, > > > >>>>> AVG(units) OVER product (RANGE INTERVAL '7' DAY PRECEDING) AS > d7 > > > >>>>> FROM Orders > > > >>>>> WINDOW product AS ( > > > >>>>> ORDER BY rowtime > > > >>>>> PARTITION BY productId)) > > > >>>>> > > > >>>>> > > > >>>>> > > > >>>>> Thanks! > > > >>>>> > > > >>>>> -----邮件原件----- > > > >>>>> 发件人: 王绍翾(大沙) [mailto:shaoxuan....@alibaba-inc.com<mailto: > > > >>>>> shaoxuan....@alibaba-inc.com>] > > > >>>>> 发送时间: 2016年10月13日 2:03 > > > >>>>> 收件人: dev@flink.apache.org<mailto:dev@flink.apache.org> > > > >>>>> 主题: RE:[DISCUSS] FLIP-11: Table API Stream Aggregations > > > >>>>> > > > >>>>> Hi Fabian, Timo, and Jark.Thanks for kicking off this FLIP. This > > is a > > > >>>>> really great and promising proposal. I have a few comments to the > > > >> "window" > > > >>>>> operator proposed in this FLIP (I am hoping it is not too late to > > > >> bring up > > > >>>>> this). First, window is not always needed for the stream > > aggregation. > > > >> There > > > >>>>> are cases where we want do an aggreation on a stream, while the > > > >> query/emit > > > >>>>> strategy decides when to emit a streaming output. Second, window > is > > > >> needed > > > >>>>> when we want do an aggregation for a certain rage, but window is > > not > > > an > > > >>>>> operator. We basically use window to define the range for > > > aggregation. > > > >> In > > > >>>>> tableAPI, a window should be defined together with "groupby" and > > > >> "select" > > > >>>>> operators, either inside a "groupby" operator or after an "over" > > > >> clause in > > > >>>>> "select" operator. This will make the TableAPI in the similar > > manner > > > >> as SQL. > > > >>>>> For instance,[A groupby without window] > > > >>>>> <Table API> > > > >>>>> val res = tab > > > >>>>> .groupBy(‘a) > > > >>>>> .select(‘a, ‘b.sum) > > > >>>>> <SQL> > > > >>>>> SELECT a, SUM(b) > > > >>>>> FROM tab > > > >>>>> GROUP BY a > > > >>>>> [A tumble window inside groupby] > > > >>>>> <Table API>val res = tab > > > >>>>> .groupBy(‘a, tumble(10.minutes, ‘rowtime)) .select(‘a, ‘b.sum) > > > >>>>> <SQL>SELECT a, SUM(b)FROM tab GROUP BY a, TUMBLE(10.minutes , > > > >> ‘rowtime) [A > > > >>>>> row tumble window after OVER] <Table API>.groupby('a) //optional > > > >>>>> .select(‘a, ‘b.count over rowTumble(10.minutes, > > ‘rowtime))<SQL>SELECT > > > >> a, > > > >>>>> COUNT(b) OVER ROWTUMBLE(10.minutes, ‘rowtime)FROM tab GROUP BY a > > > >> Please let > > > >>>>> me know what you think. > > > >>>>> Regards,Shaoxuan > > > >>>>> ------------------------------------------------------------ > > > >> ------发件人:Fabian > > > >>>>> Hueske <fhue...@gmail.com<mailto:fhue...@gmail.com > > >>发送时间:2016年9月26日 > > > >> (星期一) > > > >>>>> 21:13收件人:dev@flink.apache.org<mailto:dev@flink.apache.org> < > > > >>>>> dev@flink.apache.org<mailto:dev@flink.apache.org>>主 题:Re: > > [DISCUSS] > > > >>>>> FLIP-11: Table API Stream Aggregations Hi everybody, > > > >>>>> > > > >>>>> Timo proposed our FLIP-11 a bit more than three weeks ago. > > > >>>>> I will update the status of the FLIP to accepted. > > > >>>>> > > > >>>>> Thanks, > > > >>>>> Fabian > > > >>>>> > > > >>>>> 2016-09-19 9:16 GMT+02:00 Timo Walther <twal...@apache.org > <mailto: > > > twa > > > >>>>> l...@apache.org>>: > > > >>>>> > > > >>>>>> Hi Jark, > > > >>>>>> > > > >>>>>> yes I think enough time has passed. We can start implementing > the > > > >>>>> changes. > > > >>>>>> What do you think Fabian? > > > >>>>>> > > > >>>>>> If there are no objections, I will create the subtasks in Jira > > > today. > > > >>>>>> For > > > >>>>>> FLIP-11/1 I already have implemented a prototype, I just have to > > do > > > >>>>>> some > > > >>>>>> refactoring/documentation before opening a PR. > > > >>>>>> > > > >>>>>> Timo > > > >>>>>> > > > >>>>>> > > > >>>>>> Am 18/09/16 um 04:46 schrieb Jark Wu: > > > >>>>>> > > > >>>>>> Hi all, > > > >>>>>>> > > > >>>>>>> It seems that there’s no objections to the window design. So > > could > > > >> we > > > >>>>>>> open subtasks to start working on it now ? > > > >>>>>>> > > > >>>>>>> - Jark Wu > > > >>>>>>> > > > >>>>>>> 在 2016年9月7日,下午4:29,Jark Wu <wuchong...@alibaba-inc.com<mailto: > > > >>>>> wuchong...@alibaba-inc.com>> 写道: > > > >>>>>>>> > > > >>>>>>>> Hi Fabian, > > > >>>>>>>> > > > >>>>>>>> Thanks for sharing your ideas. > > > >>>>>>>> > > > >>>>>>>> They all make sense to me. Regarding to reassigning > timestamp, I > > > do > > > >>>>>>>> not > > > >>>>>>>> have an use case. I come up with this because DataStream has a > > > >>>>>>>> TimestampAssigner :) > > > >>>>>>>> > > > >>>>>>>> +1 for this FLIP. > > > >>>>>>>> > > > >>>>>>>> - Jark Wu > > > >>>>>>>> > > > >>>>>>>> 在 2016年9月7日,下午2:59,Fabian Hueske <fhue...@gmail.com<mailto: > fhue > > > >>>>> s...@gmail.com> <mailto: > > > >>>>> > > > >>>>>>>>> fhue...@gmail.com<mailto:fhue...@gmail.com>>> 写道: > > > >>>>>>>>> > > > >>>>>>>>> Hi, > > > >>>>>>>>> > > > >>>>>>>>> thanks for your comments and questions! > > > >>>>>>>>> Actually, you are bringing up the points that Timo and I > > > discussed > > > >>>>>>>>> the > > > >>>>>>>>> most > > > >>>>>>>>> when designing the FLIP ;-) > > > >>>>>>>>> > > > >>>>>>>>> - We also thought about the syntactic shortcut for running > > > >>>>>>>>> aggregates > > > >>>>>>>>> like > > > >>>>>>>>> you proposed (table.groupBy(‘a).select(…)). Our motivation to > > not > > > >>>>>>>>> allow > > > >>>>>>>>> this shortcut is to prevent users from accidentally > performing > > a > > > >>>>>>>>> "dangerous" operation. The problem with unbounded sliding > > > >>>>>>>>> row-windows is > > > >>>>>>>>> that their state does never expire. If you have an evolving > key > > > >>>>>>>>> space, > > > >>>>>>>>> you > > > >>>>>>>>> will likely run into problems at some point because the > > operator > > > >>>>>>>>> state > > > >>>>>>>>> grows too large. IMO, a row-window session is a better > > approach, > > > >>>>>>>>> because it > > > >>>>>>>>> defines a timeout after which state can be discarded. > > > >>>>>>>>> groupBy.select is > > > >>>>>>>>> a > > > >>>>>>>>> very common operation in batch but its semantics in streaming > > are > > > >>>>>>>>> very > > > >>>>>>>>> different. In my opinion it makes sense to make users aware > of > > > >>>>>>>>> these > > > >>>>>>>>> differences through the API. > > > >>>>>>>>> > > > >>>>>>>>> - Reassigning timestamps and watermarks is a very delicate > > issue. > > > >>>>>>>>> You > > > >>>>>>>>> are > > > >>>>>>>>> right, that Calcite exposes this field which is necessary due > > to > > > >>>>>>>>> the > > > >>>>>>>>> semantics of SQL. However, also in Calcite you cannot freely > > > >> choose > > > >>>>>>>>> the > > > >>>>>>>>> timestamp attribute for streaming queries (it must be a > > monotone > > > >> or > > > >>>>>>>>> quasi-monotone attribute) which is hard to reason about (and > > > >>>>>>>>> guarantee) > > > >>>>>>>>> after a few operators have been applied. Streaming tables in > > > Flink > > > >>>>>>>>> will > > > >>>>>>>>> likely have a time attribute which is identical to the > initial > > > >>>>> rowtime. > > > >>>>>>>>> However, Flink does modify timestamps internally, e.g., for > > > >> records > > > >>>>>>>>> that > > > >>>>>>>>> are emitted from time windows, in order to ensure that > > > consecutive > > > >>>>>>>>> windows > > > >>>>>>>>> perform as expected. Modify or reassign timestamps in the > > middle > > > >> of > > > >>>>>>>>> a > > > >>>>>>>>> job > > > >>>>>>>>> can result in unexpected results which are very hard to > reason > > > >>>>>>>>> about. Do > > > >>>>>>>>> you have a concrete use case in mind for reassigning > > timestamps? > > > >>>>>>>>> > > > >>>>>>>>> - The idea to represent rowtime and systime as object is > good. > > > Our > > > >>>>>>>>> motivation to go for reserved Scala symbols was to have a > > uniform > > > >>>>>>>>> syntax > > > >>>>>>>>> with windows over streaming and batch tables. On batch tables > > you > > > >>>>>>>>> can > > > >>>>>>>>> compute time windows basically over every time attribute > (they > > > are > > > >>>>>>>>> treated > > > >>>>>>>>> similar to grouping attributes with a bit of extra logic to > > > >> extract > > > >>>>>>>>> the > > > >>>>>>>>> grouping key for sliding and session windows). If you write > > > >>>>>>>>> window(Tumble > > > >>>>>>>>> over 10.minutes on 'rowtime) on a streaming table, 'rowtime > > would > > > >>>>>>>>> indicate > > > >>>>>>>>> event-time. On a batch table with a 'rowtime attribute, the > > same > > > >>>>>>>>> operator > > > >>>>>>>>> would be internally converted into a group by. By going for > the > > > >>>>>>>>> object > > > >>>>>>>>> approach we would lose this compatibility (or would need to > > > >>>>>>>>> introduce an > > > >>>>>>>>> additional column attribute to specifiy the window attribute > > for > > > >>>>>>>>> batch > > > >>>>>>>>> tables). > > > >>>>>>>>> > > > >>>>>>>>> As usual some of the design decisions are based on > preferences. > > > >>>>>>>>> Do they make sense to you? Let me know what you think. > > > >>>>>>>>> > > > >>>>>>>>> Best, Fabian > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> 2016-09-07 5:12 GMT+02:00 Jark Wu < > wuchong...@alibaba-inc.com > > <ma > > > >>>>> ilto:wuchong...@alibaba-inc.com> <mailto: > > > >>>>> > > > >>>>>>>>> wuchong...@alibaba-inc.com<mailto:wuchong...@alibaba-inc.com > > >>>: > > > >>>>>>>>> > > > >>>>>>>>> Hi all, > > > >>>>>>>>>> > > > >>>>>>>>>> I'm on vacation for about five days , sorry to have missed > > this > > > >>>>>>>>>> great > > > >>>>>>>>>> FLIP. > > > >>>>>>>>>> > > > >>>>>>>>>> Yes, the non-windowed aggregates is a special case of > > > row-window. > > > >>>>>>>>>> And > > > >>>>>>>>>> the > > > >>>>>>>>>> proposal looks really good. Can we have a simplified form > for > > > >> the > > > >>>>>>>>>> special > > > >>>>>>>>>> case? Such as : table.groupBy(‘a).rowWindow(Sl > > > >>>>>>>>>> ideRows.unboundedPreceding).select(…) > > > >>>>>>>>>> can be simplified to table.groupBy(‘a).select(…). The > latter > > > >> will > > > >>>>>>>>>> actually > > > >>>>>>>>>> call the former. > > > >>>>>>>>>> > > > >>>>>>>>>> Another question is about the rowtime. As the FLIP said, > > > >>>>>>>>>> DataStream and > > > >>>>>>>>>> StreamTableSource is responsible to assign timestamps and > > > >>>>>>>>>> watermarks, > > > >>>>>>>>>> furthermore “rowtime” and “systemtime” are not real column. > > IMO, > > > >>>>>>>>>> it is > > > >>>>>>>>>> different with Calcite’s rowtime, which is a real column in > > the > > > >>>>> table. > > > >>>>>>>>>> In > > > >>>>>>>>>> FLIP's way, we will lose some flexibility. Because the > > timestamp > > > >>>>>>>>>> column may > > > >>>>>>>>>> be created after some transformations or join operation, not > > > >>>>>>>>>> created at > > > >>>>>>>>>> beginning. So why do we have to define rowtime at beginning? > > > >>>>>>>>>> (because > > > >>>>>>>>>> of > > > >>>>>>>>>> watermark?) Can we have a way to define rowtime after > > source > > > >>>>>>>>>> table > > > >>>>>>>>>> like > > > >>>>>>>>>> TimestampAssinger? > > > >>>>>>>>>> > > > >>>>>>>>>> Regarding to “allowLateness” method. I come up a trick that > we > > > >> can > > > >>>>>>>>>> make > > > >>>>>>>>>> ‘rowtime and ‘system to be a Scala object, not a symbol > > > >> expression. > > > >>>>>>>>>> The API > > > >>>>>>>>>> will looks like this : > > > >>>>>>>>>> > > > >>>>>>>>>> window(Tumble over 10.minutes on rowtime allowLateness as > ‘w) > > > >>>>>>>>>> > > > >>>>>>>>>> The implementation will look like this: > > > >>>>>>>>>> > > > >>>>>>>>>> class TumblingWindow(size: Expression) extends Window { > > > >>>>>>>>>> def on(time: rowtime.type): TumblingEventTimeWindow = > > > >>>>>>>>>> new TumblingEventTimeWindow(alias, ‘rowtime, size) > > > >> // > > > >>>>>>>>>> has > > > >>>>>>>>>> allowLateness() method > > > >>>>>>>>>> > > > >>>>>>>>>> def on(time: systemtime.type): > TumblingProcessingTimeWindow= > > > >>>>>>>>>> new TumblingProcessingTimeWindow(alias, ‘systemtime, > > size) > > > >>>>>>>>>> // hasn’t allowLateness() method > > > >>>>>>>>>> } > > > >>>>>>>>>> object rowtime > > > >>>>>>>>>> object systemtime > > > >>>>>>>>>> > > > >>>>>>>>>> What do you think about this? > > > >>>>>>>>>> > > > >>>>>>>>>> - Jark Wu > > > >>>>>>>>>> > > > >>>>>>>>>> 在 2016年9月6日,下午11:00,Timo Walther <twal...@apache.org > <mailto: > > twa > > > >>>>> l...@apache.org> <mailto: > > > >>>>> > > > >>>>>>>>>>> twal...@apache.org<mailto:twal...@apache.org>>> 写道: > > > >>>>>>>>>>> > > > >>>>>>>>>>> Hi all, > > > >>>>>>>>>>> > > > >>>>>>>>>>> I thought about the API of the FLIP again. If we allow the > > > >>>>>>>>>>> "systemtime" > > > >>>>>>>>>>> > > > >>>>>>>>>> attribute, we cannot implement a nice method chaining where > > the > > > >>>>>>>>>> user > > > >>>>>>>>>> can > > > >>>>>>>>>> define a "allowLateness" only on event time. So even if the > > user > > > >>>>>>>>>> expressed > > > >>>>>>>>>> that "systemtime" is used we have to offer a "allowLateness" > > > >>>>>>>>>> method > > > >>>>>>>>>> because > > > >>>>>>>>>> we have to assume that this attribute can also be the batch > > > event > > > >>>>>>>>>> time > > > >>>>>>>>>> column, which is not very nice. > > > >>>>>>>>>> > > > >>>>>>>>>>> class TumblingWindow(size: Expression) extends Window { > > > >>>>>>>>>>> def on(timeField: Expression): TumblingEventTimeWindow = > > > >>>>>>>>>>> new TumblingEventTimeWindow(alias, timeField, size) // > has > > > >>>>>>>>>>> > > > >>>>>>>>>> allowLateness() method > > > >>>>>>>>>> > > > >>>>>>>>>>> } > > > >>>>>>>>>>> > > > >>>>>>>>>>> What do you think? > > > >>>>>>>>>>> > > > >>>>>>>>>>> Timo > > > >>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>>> Am 05/09/16 um 10:41 schrieb Fabian Hueske: > > > >>>>>>>>>>> > > > >>>>>>>>>>>> Hi Jark, > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> you had asked for non-windowed aggregates in the Table > API a > > > >> few > > > >>>>>>>>>>>> times. > > > >>>>>>>>>>>> FLIP-11 proposes row-window aggregates which are a > > > >>>>>>>>>>>> generalization of > > > >>>>>>>>>>>> running aggregates (SlideRow unboundedPreceding). > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Can you have a look at the FLIP and give feedback whether > > this > > > >>>>>>>>>>>> is > > > >>>>>>>>>>>> what > > > >>>>>>>>>>>> > > > >>>>>>>>>>> you > > > >>>>>>>>>> > > > >>>>>>>>>>> are looking for? > > > >>>>>>>>>>>> Improvement suggestions are very welcome as well. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Thank you, > > > >>>>>>>>>>>> Fabian > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> 2016-09-01 16<tel:2016-09-01%C2%A016>:12 GMT+02:00 Timo > > > Walther > > > >>>>> <twal...@apache.org<mailto:twal...@apache.org> <mailto: > > > >>>>>>>>>>>> twal...@apache.org<mailto:twal...@apache.org>>>: > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Hi all! > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Fabian and I worked on a FLIP for Stream Aggregations in > > the > > > >>>>>>>>>>>>> Table > > > >>>>>>>>>>>>> API. > > > >>>>>>>>>>>>> You can find the FLIP-11 here: > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP- > 11% > > <h > > > >>>>> ttps://cwiki.apache.org/confluence/display/FLINK/FLIP-11%25> < > > > >>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP- > 11% > > <h > > > >>>>> ttps://cwiki.apache.org/confluence/display/FLINK/FLIP-11%25>> > > > >>>>> > > > >>>>>>>>>>>>> 3A+Table+API+Stream+Aggregations > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Motivation for the FLIP: > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> The Table API is a declarative API to define queries on > > > static > > > >>>>>>>>>>>>> and > > > >>>>>>>>>>>>> streaming tables. So far, only projection, selection, and > > > >> union > > > >>>>>>>>>>>>> are > > > >>>>>>>>>>>>> supported operations on streaming tables. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> This FLIP proposes to add support for different types of > > > >>>>>>>>>>>>> aggregations > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>> on > > > >>>>>>>>>> > > > >>>>>>>>>>> top of streaming tables. In particular, we seek to support: > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> - Group-window aggregates, i.e., aggregates which are > > > computed > > > >>>>>>>>>>>>> for a > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>> group > > > >>>>>>>>>> > > > >>>>>>>>>>> of elements. A (time or row-count) window is required to > > bound > > > >>>>>>>>>>> the > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>> infinite > > > >>>>>>>>>> > > > >>>>>>>>>>> input stream into a finite group. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> - Row-window aggregates, i.e., aggregates which are > > computed > > > >>>>>>>>>>>>> for > > > >>>>>>>>>>>>> each > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>> row, > > > >>>>>>>>>> > > > >>>>>>>>>>> based on a window (range) of preceding and succeeding rows. > > > >>>>>>>>>>>>> Each type of aggregate shall be supported on > keyed/grouped > > or > > > >>>>>>>>>>>>> non-keyed/grouped data streams for streaming tables as > well > > > as > > > >>>>>>>>>>>>> batch > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>> tables. > > > >>>>>>>>>> > > > >>>>>>>>>>> We are looking forward to your feedback. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Timo > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>> -- > > > >>>>>>>>>>> Freundliche Grüße / Kind Regards > > > >>>>>>>>>>> > > > >>>>>>>>>>> Timo Walther > > > >>>>>>>>>>> > > > >>>>>>>>>>> Follow me: @twalthr > > > >>>>>>>>>>> https://www.linkedin.com/in/twalthr > > > >>>>>>>>>>> <https://www.linkedin.com/in/t > > > >>>>>>>>>>> walthr> > > > >>>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>> > > > >>>>>> > > > >>>>>> -- > > > >>>>>> Freundliche Grüße / Kind Regards > > > >>>>>> > > > >>>>>> Timo Walther > > > >>>>>> > > > >>>>>> Follow me: @twalthr > > > >>>>>> https://www.linkedin.com/in/twalthr > > > >>>>>> > > > >>>>>> > > > >>>>> > > > >>>>> > > > >>>> > > > >>> > > > >> > > > > > > > > >