Hi Timo, I had a look at your branch. Thanks for all the refactoring and work you put into this.
I like the proposal a lot. I think indicating the time attribute in the schema is a good idea. I'm not sure if we should support the rowtime expression for batch table though. For batch, any long or timestamp attribute can be used as a time attribute. So marking a single one as time attribute is not really necessary, IMO. If others think that this is needed to make the batch and stream cases identical I would be OK with having it. OTOH I would not consider the schema to be part of the query. I think we should continue with this work as it is likely to take more time until we can merge it into the master. @Timo: would the next step be to open a PR for this? Best, Fabian 2017-03-20 14:15 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com>: > Hi, > > I am not sure if it is not about setting the timestamp within the query > but you can imagine that there examples where you have different timestamps > as mentioned. Take for example the case when we do a purchase online. You > have: > -time of purchase (when the payment was input/triggered) > -time of executing the transaction at bank (when the payment is processed > from account) > -time of settlement (when the payment is executed at merchant bank - when > money are received by the seller) > > In such a scenario you can imagine that over the same stream of online > payments you might want to run different queries, each that might be driven > by one of these times. Supporting such a scenario would mean that we have > one input stream that enters flink engine via a table source and then in > the query we can start running different queries: > e.g. SELECT SUM(amount) ORDER BY rowtime(time_purchase) LIMIT 10 //you > want the amount over your last 10 orders > e.g. SELECT SUM(amount) ORDER BY rowtime(time_settlement) LIMIT 10 //you > want the amount over your last 10 income > > Best regards, > > > -----Original Message----- > From: Timo Walther [mailto:twal...@apache.org] > Sent: Monday, March 20, 2017 2:05 PM > To: dev@flink.apache.org > Subject: Re: FW: [DISCUSS] Table API / SQL indicators for event and > processing time > > Yes, you are right. In the current design the user cannot assign > timestamp and watermarks in a table program. Operators (such as windows) > might adapt the metatimestamp, if this is the case this adaption might > need to be expressed in the query itself too. > > E.g. for a tumbling windows we could limit the select part to > table.select('rowtime.ceil(DAY) as 'newRowtime) (so that logical rowtime > matches the physical metatimestamp) > > Do you have a good example use case that needs the assignment of rowtime > within a query? > > Am 20/03/17 um 13:39 schrieb Radu Tudoran: > > Hi, > > > > As suggested by Timo - I am forwarding this to the mailing list. Sorry > for not having the conversation directly here - I initially thought it > might not be of interest... > > > > @Timo - thanks for the clarification. I get the main point now which is > that the rowtime is encoded within the metadata of the record. I think > this is key. My view on the matter was maybe a bit updated in the sense > that I saw the processing pipeline as an input source (as you exemplify - a > table scan) and from there you have a timestamp and water mark assigner > before the processing actually starts. So by overriding the timestamp > extractor you match the field that carries the eventtime/rowtime with the > mechanism from flink. But as far as I understand this would not be the case > anymore...am I right? In case the assignment of the rowtime to the metadata > of the record is done differently - what would be the way to do it? > > > > > > Dr. Radu Tudoran > > Senior Research Engineer - Big Data Expert > > IT R&D Division > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > > European Research Center > > Riesstrasse 25, 80992 München > > > > E-mail: radu.tudo...@huawei.com > > Mobile: +49 15209084330 > > Telephone: +49 891588344173 > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com > > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, > > Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN > > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, > > Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN > > This e-mail and its attachments contain confidential information from > HUAWEI, which is intended only for the person or entity whose address is > listed above. Any use of the information contained herein in any way > (including, but not limited to, total or partial disclosure, reproduction, > or dissemination) by persons other than the intended recipient(s) is > prohibited. If you receive this e-mail in error, please notify the sender > by phone or email immediately and delete it! > > > > > > -----Original Message----- > > From: Timo Walther [mailto:twal...@apache.org] > > Sent: Monday, March 20, 2017 12:29 PM > > To: Radu Tudoran > > Subject: Re: [DISCUSS] Table API / SQL indicators for event and > processing time > > > > You are not bothering me, it is very interesting to compare the design > > with real world use cases. > > > > In your use case we would create table like: tEnv.toTable('date, 'time1, > > 'time2, 'data, 'myrowtime.rowtime) > > > > We would not "overwrite" an actual attribute of the record but only add > > logical "myrowtime". In general, just to make it clear again, the > > rowtime must be in the metatimestamp of the record (by using a timestamp > > extractor before). The Table API assumes that records that enter the > > Table API are timestamped correctly. So in your use case, you would > > create your own TableSource extract the timestamp based on your 3 time > > fields and define an attribute that represents the rowtime logically. In > > the current design we want that the Table API relies on Flink's time > > handling, because time handling can be very tricky.So we only support > > one event-time time field. > > > > But would it be possible to post our discussion on the ML? It might be > > interesting for others as well. If yes, can you forward our conversion > > to the ML? > > > > Timo > > > > > > > > Am 20/03/17 um 12:11 schrieb Radu Tudoran: > >> Thanks for the replies. > >> > >> Regarding the ""It might be sometimes that this is not explicit to be > guessed" That is > >> why I added the RelTimeConverter. After this conversion step it should > >> be as explicit as possible (by using the special types). And we can add > >> special handling of functions (i.e. ceil) that preserve the > monotonicity." > >> > >> ..maybe I am missing something so sorry if I just bother you for > nothing (it is just to make sure we think of all cases before hand). I saw > examples of applications where you have multiple fields of the same type. > For example an event can have 3 time fields of TIMESTAMP, 1 of DATE and 2 > of TIME (this is actually from a real application with some sort fo > standard communication schema). I was referring to such cases that it is > unclear to me how the code will identify the exact field to use as rowtime > for example. This is what I meant about how are we passing indicators to > spot the row time field as well as what would happen with the code in such > a situation as it can identify multiple time fields. > >> > >> Dr. Radu Tudoran > >> Senior Research Engineer - Big Data Expert > >> IT R&D Division > >> > >> > >> HUAWEI TECHNOLOGIES Duesseldorf GmbH > >> European Research Center > >> Riesstrasse 25, 80992 München > >> > >> E-mail: radu.tudo...@huawei.com > >> Mobile: +49 15209084330 > >> Telephone: +49 891588344173 > >> > >> HUAWEI TECHNOLOGIES Duesseldorf GmbH > >> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com > >> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, > >> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN > >> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, > >> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN > >> This e-mail and its attachments contain confidential information from > HUAWEI, which is intended only for the person or entity whose address is > listed above. Any use of the information contained herein in any way > (including, but not limited to, total or partial disclosure, reproduction, > or dissemination) by persons other than the intended recipient(s) is > prohibited. If you receive this e-mail in error, please notify the sender > by phone or email immediately and delete it! > >> > >> > >> -----Original Message----- > >> From: Timo Walther [mailto:twal...@apache.org] > >> Sent: Monday, March 20, 2017 12:00 PM > >> To: Radu Tudoran > >> Subject: Re: [DISCUSS] Table API / SQL indicators for event and > processing time > >> > >> Hi Radu, > >> > >> you are right. It is hard to give line specific comments without having > >> a PR. I will open a work-in-progress PR. > >> > >> Regarding the visit functions: Yes you are right. We will need to define > >> how time is handled for every new operator and override/implement visit > >> functions for it. > >> > >> The RexTimeIndicatorMaterializer wraps concrete time accesses in a > >> materialization function. This changes the field type to an regular > >> timestamp. The CodeGenerator then knows that he needs to access the > >> getTimestamp() method of the ProcessFunction. > >> Sorry for the bad documentation. This code is still work-in-progress, > >> that's why I didn't add much comments so far. > >> > >> The RexInputRefUpdater basically updates the references when a logical > >> row type is converted to physical row type. I haven't written test for > >> this code yet, so there might be bugs in it. > >> > >> We have to think about how we integrate join etc. in follow up issues. > >> What this code ensures is, that operators know what is time and what is > >> just a "regular field". A join operator knows from which side the time > >> comes from and which type it has (proc or rowtime). > >> > >> "It might be sometimes that this is not explicit to be guessed" That is > >> why I added the RelTimeConverter. After this conversion step it should > >> be as explicit as possible (by using the special types). And we can add > >> special handling of functions (i.e. ceil) that preserve the > monotonicity. > >> > >> "Also in the same class, the 2 function look the same" there is a > >> difference in the last argument ;-) > >> > >> Yes, you are right this is a large change, but we should integrate it in > >> 1.3 as it is also API breaking and a necessary concept for future PR. > >> The longer we wait, the harder it is to rebase it ontop of all the > >> windows in the pipeline. But of course we need to discuss this with the > >> community. > >> > >> Thanks for looking into the code! > >> > >> Timo > >> > >> > >> Am 20/03/17 um 11:28 schrieb Radu Tudoran: > >>> Hi Timo, > >>> > >>> As I did not see a pull request I did not know exactly what is the > best way to give you feedback over the code...so as a last resort I thought > to write you an email. If you can recommend better tools for the future > please do so. > >>> > >>> In your class > >>> org.apache.flink.table.api.scala.stream.table.AggregationsITCase.scala > >>> => I guess there are several others overrides to be done for visit > function for other types of logical operators (join,window,...). Does this > mean that from now when we implement a new operator we need to add the > corresponding implementation here as well? > >>> > >>> In the RexTimeIndicatorMaterializer class I am not sure I understand > the logic of materializing the time field. From my basic understanding this > should mean that if we had before materializing it 4 fields for example we > need to add a 5th one for the time. It is a bit hard to follow at first > glance how this is implemented. If you could add some more comments it > would be great > >>> > >>> In the RexInputRefUpdater.scala I would expect that you add the number > of times to be materialized to the input rather than removing it > >>> new RexInputRef(inputRef.getIndex - countTimeIndicators, > inputRef.getType) > >>> my question is shouldn't it be "+" (I a guess you tested and it works > - case in which please take my remark more in the direction of what whould > be the logic of having it with minus and not plus...maybe not really the > kind of feedback you needed) > >>> =>new RexInputRef(inputRef.getIndex + countTimeIndicators, > inputRef.getType) > >>> Also - I see that you iterate over the input reference to look over > time. Is it possible to have more than one? Can we have/should we have for > example both proctime and rowtime...or multiple times rowtime?. Or perhaps > this is for the case of Join/Unions where each side of the biRel comes with > its own time. In this case how do you want to support this? Will you > preserve both or create one single new one that you preserve (I would argue > for this later option as regardless if the events worked before on proctime > or rowtime - after the union/join a new event is created and the timestamp > should be reflected as such based on the moment of creation). > >>> > >>> In the FlinkTypeFactory.scala in def createRowtimeIndicatorType() > >>> Shouldn't you access somehow the field that is indicated to contain > the rowtime from the event. It might be sometimes that this is not explicit > to be guessed. Or do you make some implicit assumption that this is > uniquely identifiable by some indicators (name, type,...) > >>> > >>> Also in the same class, the 2 function look the same. Is this a > mistake?...i would assume there must be some differentiation... > >>> def createProctimeIndicatorType(): RelDataType = { > >>> 88 + val originalType = createTypeFromTypeInfo( > SqlTimeTypeInfo.TIMESTAMP) > >>> 89 + new TimeIndicatorRelDataType(getTypeSystem, > originalType.asInstanceOf[BasicSqlType], false) > >>> 90 + } > >>> 91 + > >>> 92 + def createRowtimeIndicatorType(): RelDataType = { > >>> 93 + val originalType = createTypeFromTypeInfo( > SqlTimeTypeInfo.TIMESTAMP) > >>> 94 + new TimeIndicatorRelDataType(getTypeSystem, > originalType.asInstanceOf[BasicSqlType], true) > >>> 95 + } > >>> > >>> > >>> I saw in the buildRowDataType the thing I was expected with adding to > the event types fields corresponding to proctime and rowtime. I am still a > bit confused what was happening before :) > >>> > >>> > >>> It looks there are many modifications that would be affected by this > change. I would propose this to be included only after/right before the > feature freeze as it would create a lot of rebase and reimplementation for > the ongoing working features > >>> > >>> > >>> ...not sure if this was helpful... > >>> > >>> Dr. Radu Tudoran > >>> Senior Research Engineer - Big Data Expert > >>> IT R&D Division > >>> > >>> > >>> HUAWEI TECHNOLOGIES Duesseldorf GmbH > >>> European Research Center > >>> Riesstrasse 25, 80992 München > >>> > >>> E-mail: radu.tudo...@huawei.com > >>> Mobile: +49 15209084330 > >>> Telephone: +49 891588344173 > >>> > >>> HUAWEI TECHNOLOGIES Duesseldorf GmbH > >>> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com > >>> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, > >>> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN > >>> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, > >>> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN > >>> This e-mail and its attachments contain confidential information from > HUAWEI, which is intended only for the person or entity whose address is > listed above. Any use of the information contained herein in any way > (including, but not limited to, total or partial disclosure, reproduction, > or dissemination) by persons other than the intended recipient(s) is > prohibited. If you receive this e-mail in error, please notify the sender > by phone or email immediately and delete it! > >>> > >>> > >>> -----Original Message----- > >>> From: Timo Walther [mailto:twal...@apache.org] > >>> Sent: Monday, March 20, 2017 10:48 AM > >>> To: dev@flink.apache.org > >>> Subject: Re: [DISCUSS] Table API / SQL indicators for event and > processing time > >>> > >>> Hi Radu, > >>> > >>> we differentiate rowtime and processing time fields by their field > types. Both indicators extend the timestamp type. In my protoype I added > the functions FlinkTypeFactory.isRowtime() and > >>> FlinkTypeFactory.isProctime() for checking this. If a time indicator > has been materiatized (e.g. long.cast(STRING)), it becomes a regular > timestamp (or in this case a string after evaluation). So we cannot > differentiate between rowtime and proctime anymore. However, we can add > some exceptions for certain functions (e.g. for ceil() in combination with > windows) that preserve the time attributes. > >>> > >>> Count windows have to be defined over a time attribute. If you take a > look at the tests of > org.apache.flink.table.api.scala.stream.table.AggregationsITCase, > you can see that countWindows are still supported as before. As I said, in > most of the user-facing API does not change. It only tries to make time > more explicit. > >>> > >>> Timo > >>> > >>> > >>> Am 20/03/17 um 10:34 schrieb Radu Tudoran: > >>>> Hi Timo, > >>>> > >>>> I have some questions regarding your implementation: > >>>> > >>>> " The timestamp (not an indicator anymore) becomes part of the > physical row. E.g. > >>>> long.cast(STRING) would require a materialization " > >>>> => If we have this how are we going to make a difference between > rowtime and processtime? For supporting some queries/operators you only > need to use these time indications as markers to have something like below. > If you do not get access to any sort of unique markers to indicate these > than we will have hard time to support many implementations. What would be > the option to support this condition in your implementation > >>>> if(rowtime) > >>>> ... > >>>> else if(proctime) > >>>> ...some other implemenetation > >>>> > >>>> "- Windows are only valid if they work on time indicators." > >>>> => Does this mean we can no longer work with count windows? There are > a lot of queries where windows would be defined based on cardinality of > elements. > >>>> > >>>> > >>>> > >>>> -----Original Message----- > >>>> From: Timo Walther [mailto:twal...@apache.org] > >>>> Sent: Monday, March 20, 2017 10:08 AM > >>>> To: dev@flink.apache.org > >>>> Subject: Re: [DISCUSS] Table API / SQL indicators for event and > processing time > >>>> > >>>> Hi everyone, > >>>> > >>>> for the last two weeks I worked on a solution for the time indicator > issue. I have implemented a prototype[1] which shows how we can express, > track, and access time in a consistent way for batch and stream tables. > >>>> > >>>> Main changes of my current solution: > >>>> > >>>> - Processing and rowtime time indicators can be named arbitrarily > >>>> - They can be defined as follows: stream.toTable(tEnv, 'long, 'int, > 'string, 'proctime.proctime) or stream.toTable(tEnv, 'long.rowtime, 'int, > 'string) > >>>> - In a streaming environment: if the "long" field is already defined > in the record, it will not be read by the runtime. "long" always represents > the timestamp of the row. > >>>> - In batch environment: "long" must be present in the record and will > be read by the runtime. > >>>> - The table definition looks equivalent in both batch and streaming > (better unification than current state) > >>>> - Internally row types are split up in a logical and a physical row > type. > >>>> - The logical row type contains time indicators, the physical rowtime > never contains time indicators (the pure "long" will never be in a record) > >>>> - After validation and query decorrelation, a special time indicator > converter traverses the RelNodes and analyzes if the a time indicator is > accessed or only forwarded. > >>>> - An access to a time indicator means that we need to materialize the > rowtime using a ProcessFunction (not yet implemented). The timestamp (not > an indicator anymore) becomes part of the physical row. E.g. > >>>> long.cast(STRING) would require a materialization > >>>> - Forwarding of time indicators does not materialize the rowtime. It > remains a logical attribute. E.g. .select('long) > >>>> - Windows are only valid if they work on time indicators. > >>>> > >>>> There are still a lot of open question that we can discuss and/or fix > in future PRs. For now it would be great if you could give some feedback > about the current implementation. With some exceptions my branch can be > built successfully. > >>>> > >>>> Regards, > >>>> Timo > >>>> > >>>> > >>>> [1] https://github.com/twalthr/flink/tree/FLINK-5884 > >>>> > >>>> > >>>> Am 02/03/17 um 07:22 schrieb jincheng sun: > >>>>> Hi, > >>>>> @Timo, thanks for your replay, and congratulations on your job. > >>>>> @Fibian, No matter what way to achieve, as long as when the table is > >>>>> generated or created, identity the field attributes, that is what we > want. > >>>>> I think at this point we are on the same page. We can go ahead. > >>>>> And very glad to hear That: `the 'rowtime keyword would be removed`, > >>>>> which is a very important step for keeping Stream and Batch > consistent. > >>>>> > >>>>> Best, > >>>>> SunJincheng > >>>>> > >>>>> > >>>>> 2017-03-01 17:24 GMT+08:00 Fabian Hueske <fhue...@gmail.com>: > >>>>> > >>>>>> Hi, > >>>>>> > >>>>>> @Xingcan > >>>>>> Yes that is right. It is not (easily) possible to change the > >>>>>> watermarks of a stream. All attributes which are used as event-time > >>>>>> timestamps must be aligned with these watermarks. This are only > >>>>>> attributes which are derived from the original rowtime attribute, > >>>>>> i.e., the one that was specified when the Table was created. > >>>>>> > >>>>>> @SunJincheng > >>>>>> Regarding your points: > >>>>>> > >>>>>> 1. Watermarks can only be generated for (almost) sorted attributes. > >>>>>> Since a stream has only one sort order and cannot be sorted before > it > >>>>>> is converted into Table, there will be hardly a case where n > 1 is > >>>>>> possible. The only possibility I see are two attributes which are in > >>>>>> almost the same order but with a certain distance (think of > orderDate > >>>>>> and shipDate, but values would always be 1 day apart). However, this > >>>>>> requirement is very limiting and to be honest, I don't see how > >>>>>> assigning different watermarks for different attributes would work > reliably in practice. > >>>>>> The ORDER BY clause in an OVER window can only be used because the > >>>>>> stream is already sorted on that attribute (that's also why it is > >>>>>> restricted to rowtime and proctime in streaming) > >>>>>> > >>>>>> 2. Since a stream can only have one sort order, we so far assumed > >>>>>> that streams would already have watermarks and timestamps assigned. > I > >>>>>> think this is a fair assumption, because a stream can only have one > >>>>>> order and hence only one timestamped & watermarked attribute (except > >>>>>> for the corner case I discussed above). As Timo said, .rowtime would > >>>>>> only add an attribute which refers to the already assigned > timestamp of a row. > >>>>>> > >>>>>> 3. I completely agree that the difference between batch and > streaming > >>>>>> should be overcome. This is actually the goal of Timo's work. So > yes, > >>>>>> the 'rowtime keyword would be removed because any attribute can be > >>>>>> marked as event-time attribute (by calling 't.rowtime). > >>>>>> > >>>>>> Btw. A table source could still make the watermark configurable by > >>>>>> offering a respective interface. However, I'm not yet convinced that > >>>>>> this needs to be part of the Table API. > >>>>>> > >>>>>> What do you think? > >>>>>> > >>>>>> Best, Fabian > >>>>>> > >>>>>> 2017-03-01 7:55 GMT+01:00 jincheng sun <sunjincheng...@gmail.com>: > >>>>>> > >>>>>>> Hi,Fabian, > >>>>>>> > >>>>>>> Thanks for your attention to this discussion. Let me share > some > >>>>>>> ideas about this. :) > >>>>>>> > >>>>>>> 1. Yes, the solution I have proposed can indeed be extended to > >>>>>>> support multi-watermarks. A single watermark is a special case of > >>>>>>> multiple watermarks (n = 1). I agree that for the realization of > the > >>>>>>> simple, that > >>>>>> we > >>>>>>> currently only support single watermark. Our idea is consistent. > >>>>>>> > >>>>>>> BTW. I think even if we only use one attribute to generate > >>>>>>> watermark we also need to sort, because in OVER window(Event-time) > >>>>>>> we must know the exact data order, is that right? > >>>>>>> > >>>>>>> 2. I think our difference is how to register the watermark? > >>>>>>> Now we see two ways: > >>>>>>> A. t.rowtime; > >>>>>>> If I understand correctly, in the current design when > we use > >>>>>>> the expression 'rowtime, The system defaults based on user data to > >>>>>>> export timestamps; > >>>>>>> B. registeredWatermarks ('t, waterMarkFunction1): > >>>>>>> We are explicitly registered to generate watermarks and > >>>>>>> extract timestamps in user-defined ways; > >>>>>>> > >>>>>>> These two ways are characterized by: > >>>>>>> Approach A: The system defaults to export the value of the > t > >>>>>>> field as > >>>>>> a > >>>>>>> timestamp, which is simple for the system. > >>>>>>> Approach B: the user can develop the logic of the export > >>>>>>> timestamp, > >>>>>> for > >>>>>>> the user has been very flexible. For example: the field `t` is a > >>>>>>> complex field (value is:` xxx # 20170302111129 # yyy`), the user > can > >>>>>>> press a certain logic export timestamp (20170302111129). > >>>>>>> > >>>>>>> So i tend to approach B. What do you think? > >>>>>>> > >>>>>>> 3. We are very concerned about the unity of Stream and > Batch, such > >>>>>>> as > >>>>>> the > >>>>>>> current TableAPI: > >>>>>>> Batch: > >>>>>>> Table > >>>>>>> .window (Tumble over 2.rows on 'long as' w) //' long is > the > >>>>>>> normal field > >>>>>>> .groupBy ('w) > >>>>>>> .select ('int.count) > >>>>>>> > >>>>>>> Stream: > >>>>>>> Table > >>>>>>> .window (Tumble over 5.milli on 'rowtime as' w) //' > rowtime > >>>>>>> is the keyword > >>>>>>> .groupBy ('w) > >>>>>>> .select ('int.count) > >>>>>>> > >>>>>>> As mentioned above, the two example are event-time > aggregation > >>>>>>> window, but the writing did not do the same way, batch we have a > >>>>>>> specific column, stream need 'rowtime keyword. I think we need to > >>>>>>> try to eliminate this difference. What do you think? > >>>>>>> > >>>>>>> In the current google doc I see `table.window (tumble over > >>>>>>> 1.hour on > >>>>>> 't > >>>>>>> as' w) .groupBy ('a,' w) .select ('w.start,' b.count)`, Does this > >>>>>>> mean > >>>>>> that > >>>>>>> in FLINK-5884 will remove the tableAPI 'rowtime keyword? > >>>>>>> > >>>>>>> So I am currently talking on the event-time in the SQL > >>>>>>> indicators, in > >>>>>> the > >>>>>>> table registered column attributes, does this mean that the batch > >>>>>>> and stream SQL in the writing and use of the same? > >>>>>>> > >>>>>>> Very appreciated for your feedback. > >>>>>>> > >>>>>>> Best, > >>>>>>> SunJincheng > >>>>>>> > >>>>>>> 2017-03-01 10:40 GMT+08:00 Xingcan Cui <xingc...@gmail.com>: > >>>>>>> > >>>>>>>> Hi all, > >>>>>>>> > >>>>>>>> I have a question about the designate time for `rowtime`. The > >>>>>>>> current design do this during the DataStream to Table conversion. > >>>>>>>> Does this > >>>>>> mean > >>>>>>>> that `rowtime` is only valid for the source streams and can not be > >>>>>>>> designated after a subquery? (That's why I considered using alias > >>>>>>>> to dynamically designate it in a SQL before) > >>>>>>>> > >>>>>>>> Best, > >>>>>>>> Xingcan > >>>>>>>> > >>>>>>>> On Wed, Mar 1, 2017 at 5:35 AM, Fabian Hueske <fhue...@gmail.com> > >>>>>> wrote: > >>>>>>>>> Hi Jincheng Sun, > >>>>>>>>> > >>>>>>>>> registering watermark functions for different attributes to allow > >>>>>> each > >>>>>>> of > >>>>>>>>> them to be used in a window is an interesting idea. > >>>>>>>>> > >>>>>>>>> However, watermarks only work well if the streaming data is > >>>>>>>>> (almost) > >>>>>> in > >>>>>>>>> timestamp order. Since it is not possible to sort a stream, all > >>>>>>>> attributes > >>>>>>>>> that would qualify as event-time attributes need to be in almost > >>>>>>>>> the > >>>>>>> same > >>>>>>>>> order. I think this limits the benefits of having multiple > >>>>>>>>> watermark functions quite significantly. But maybe you have a > good > >>>>>>>>> use case > >>>>>> that > >>>>>>>> you > >>>>>>>>> can share where multiple event-time attributes would work well. > >>>>>>>>> > >>>>>>>>> So far our approach has been that a DataStream which is converted > >>>>>> into > >>>>>>> a > >>>>>>>>> Table has already timestamps and watermarks assigned. We also > >>>>>>>>> assumed > >>>>>>>> that > >>>>>>>>> a StreamTableSource would provide watermarks and timestamps and > >>>>>>> indicate > >>>>>>>>> the name of the attribute that carries the timestamp. > >>>>>>>>> > >>>>>>>>> @Stefano: That's great news. I'd suggest to open a pull request > >>>>>>>>> and > >>>>>>> have > >>>>>>>> a > >>>>>>>>> look at PR #3397 which handles the (partitioned) unbounded case. > >>>>>> Would > >>>>>>> be > >>>>>>>>> good to share some code between these approaches. > >>>>>>>>> > >>>>>>>>> Thanks, Fabian > >>>>>>>>> > >>>>>>>>> 2017-02-28 18:17 GMT+01:00 Stefano Bortoli < > >>>>>> stefano.bort...@huawei.com > >>>>>>>> : > >>>>>>>>>> Hi all, > >>>>>>>>>> > >>>>>>>>>> I have completed a first implementation that works for the SQL > >>>>>> query > >>>>>>>>>> SELECT a, SUM(b) OVER (PARTITION BY c ORDER BY a RANGE BETWEEN 2 > >>>>>>>>>> PRECEDING) AS sumB FROM MyTable > >>>>>>>>>> > >>>>>>>>>> I have SUM, MAX, MIN, AVG, COUNT implemented but I could test it > >>>>>> just > >>>>>>>> on > >>>>>>>>>> simple queries such as the one above. Is there any specific case > >>>>>>>>>> I > >>>>>>>> should > >>>>>>>>>> be looking at? > >>>>>>>>>> > >>>>>>>>>> Regards, > >>>>>>>>>> Stefano > >>>>>>>>>> > >>>>>>>>>> -----Original Message----- > >>>>>>>>>> From: jincheng sun [mailto:sunjincheng...@gmail.com] > >>>>>>>>>> Sent: Tuesday, February 28, 2017 12:26 PM > >>>>>>>>>> To: dev@flink.apache.org > >>>>>>>>>> Subject: Re: [DISCUSS] Table API / SQL indicators for event and > >>>>>>>>> processing > >>>>>>>>>> time > >>>>>>>>>> > >>>>>>>>>> Hi everyone, thanks for sharing your thoughts. I really like > >>>>>>>>>> Timo’s proposal, and I have a few thoughts want to share. > >>>>>>>>>> > >>>>>>>>>> We want to keep the query same for batch and streaming. IMO. > >>>>>> “process > >>>>>>>>> time” > >>>>>>>>>> is something special to dataStream while it is not a well > defined > >>>>>>> term > >>>>>>>>> for > >>>>>>>>>> batch query. So it is kind of free to create something new for > >>>>>>>>> processTime. > >>>>>>>>>> I think it is a good idea to add a proctime as a reserved > keyword > >>>>>> for > >>>>>>>>> SQL. > >>>>>>>>>> Regarding to “event time”, it is well defined for batch > query. > >>>>>>>>>> So > >>>>>>> IMO, > >>>>>>>>> we > >>>>>>>>>> should keep the way of defining a streaming window exactly same > >>>>>>>>>> as > >>>>>>>> batch > >>>>>>>>>> window. Therefore, the row for event time is nothing special, > but > >>>>>>> just > >>>>>>>> a > >>>>>>>>>> normal column. The major difference between batch and stream is > >>>>>> that > >>>>>>> in > >>>>>>>>>> dataStream the event time column must be associated with a > >>>>>> watermark > >>>>>>>>>> function. I really like the way Timo proposed, that we can > select > >>>>>> any > >>>>>>>>>> column as rowtime. But I think instead of just clarify a column > >>>>>>>>>> is > >>>>>> a > >>>>>>>>>> rowtime (actually I do not think we need this special rowtime > >>>>>>> keyword), > >>>>>>>>> it > >>>>>>>>>> is better to register/associate the waterMark function to this > >>>>>> column > >>>>>>>>> when > >>>>>>>>>> creating the table. For dataStream, we will validate a rowtime > >>>>>> column > >>>>>>>>> only > >>>>>>>>>> if it has been associated with the waterMark function. A > >>>>>>>>>> prototype > >>>>>>> code > >>>>>>>>> to > >>>>>>>>>> explain how it looks like is shown as below: > >>>>>>>>>> > >>>>>>>>>> TableAPI: > >>>>>>>>>> toTable(tEnv, 'a, 'b, 'c) > >>>>>>>>>> .registeredWatermarks('a, waterMarkFunction1) > >>>>>>>>>> > >>>>>>>>>> batchOrStreamTable > >>>>>>>>>> .window(Tumble over 5.milli on 'a as 'w) > >>>>>>>>>> .groupBy('w, 'b) > >>>>>>>>>> .select('b, 'a.count as cnt1, 'c.sum as cnt2) > >>>>>>>>>> > >>>>>>>>>> SQL: > >>>>>>>>>> addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c) > >>>>>>>>>> .registeredWatermarks('a, waterMarkFunction1) > >>>>>>>>>> > >>>>>>>>>> SELECT a, SUM(b) OVER (PARTITION BY c ORDER BY a RANGE > >>>>>>>>>> BETWEEN > >>>>>> 2 > >>>>>>>>>> PRECEDING) AS sumB FROM MyTable > >>>>>>>>>> > >>>>>>>>>> What do you think ? > >>>>>>>>>> > >>>>>>>>>> 2017-02-22 23:44 GMT+08:00 Timo Walther <twal...@apache.org>: > >>>>>>>>>> > >>>>>>>>>>> Hi everyone, > >>>>>>>>>>> > >>>>>>>>>>> I have create an issue [1] to track the progress of this topic. > >>>>>>>>>>> I > >>>>>>>> have > >>>>>>>>>>> written a little design document [2] how we could implement the > >>>>>>>>>>> indicators and which parts have to be touched. I would suggest > >>>>>>>>>>> to implement a prototype, also to see what is possible and can > >>>>>>>>>>> be integrated both in Flink and Calcite. Feedback is welcome. > >>>>>>>>>>> > >>>>>>>>>>> Regards, > >>>>>>>>>>> Timo > >>>>>>>>>>> > >>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-5884 > >>>>>>>>>>> [2] https://docs.google.com/document/d/1JRXm09x_ > wKst6z6UXdCGF9tg > >>>>>>>>>>> F1ueOAsFiQwahR72vbc/edit?usp=sharing > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> Am 21/02/17 um 15:06 schrieb Fabian Hueske: > >>>>>>>>>>> > >>>>>>>>>>> Hi Xingcan, > >>>>>>>>>>>> thanks for your thoughts. > >>>>>>>>>>>> In principle you are right that the monotone attribute > property > >>>>>>>> would > >>>>>>>>>>>> be sufficient, however there are more aspects to consider than > >>>>>>> that. > >>>>>>>>>>>> Flink is a parallel stream processor engine which means that > >>>>>> data > >>>>>>> is > >>>>>>>>>>>> processed in separate processes and shuffle across them. > >>>>>>>>>>>> Maintaining a strict order when merging parallel streams would > >>>>>> be > >>>>>>>>>>>> prohibitively expensive. > >>>>>>>>>>>> Flink's watermark mechanism helps operators to deal with > >>>>>>>> out-of-order > >>>>>>>>>>>> data (due to out-of-order input or shuffles). > >>>>>>>>>>>> I don't think we can separate the discussion about time > >>>>>> attributes > >>>>>>>>>>>> from watermarks if we want to use Flink as a processing engine > >>>>>> and > >>>>>>>>>>>> not reimplement large parts from scratch. > >>>>>>>>>>>> > >>>>>>>>>>>> When transforming a time attribute, we have to either align it > >>>>>>> with > >>>>>>>>>>>> existing watermarks or generate new watermarks. > >>>>>>>>>>>> If we want to allow all kinds of monotone transformations, we > >>>>>> have > >>>>>>>> to > >>>>>>>>>>>> adapt the watermarks which is not trivial. > >>>>>>>>>>>> Instead, I think we should initially only allow very few > >>>>>> monotone > >>>>>>>>>>>> transformations which are aligned with the existing > watermarks. > >>>>>> We > >>>>>>>>>>>> might later relax this condition if we see that users request > >>>>>> this > >>>>>>>>>> feature. > >>>>>>>>>>>> You are right, that we need to track which attribute can be > >>>>>>>>>>>> used > >>>>>>> as > >>>>>>>> a > >>>>>>>>>>>> time attribute (i.e., is increasing and guarded by > watermarks). > >>>>>>>>>>>> For that we need to expose the time attribute when a Table is > >>>>>>>> created > >>>>>>>>>>>> (either when a DataStream is converted like: > >>>>>> stream.toTable(tEnv, > >>>>>>>> 'a, > >>>>>>>>>>>> 'b, > >>>>>>>>>>>> 't.rowtime) or in a StreamTableSource) and track how it is > used > >>>>>> in > >>>>>>>>>>>> queries. > >>>>>>>>>>>> I am not sure if the monotone property would be the right > >>>>>>>>>>>> choice here, since data is only quasi-monotone and a monotone > >>>>>> annotation > >>>>>>>>>>>> might trigger some invalid optimizations which change the > >>>>>>> semantics > >>>>>>>> of > >>>>>>>>>> a query. > >>>>>>>>>>>> Right now, Calcite does not offer a quasi-monotone property > (at > >>>>>>>> least > >>>>>>>>>>>> I haven't found it). > >>>>>>>>>>>> > >>>>>>>>>>>> Best, Fabian > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> 2017-02-21 4:41 GMT+01:00 Xingcan Cui <xingc...@gmail.com>: > >>>>>>>>>>>> > >>>>>>>>>>>> Hi all, > >>>>>>>>>>>>> As I said in another thread, the main difference between > >>>>>>>>>>>>> stream > >>>>>>> and > >>>>>>>>>>>>> table is that a stream is an ordered list while a table is an > >>>>>>>>>> unordered set. > >>>>>>>>>>>>> Without considering the out-of-order problem in practice, > >>>>>> whether > >>>>>>>>>>>>> event-time or processing-time can be just taken as a > >>>>>>> monotonically > >>>>>>>>>>>>> increasing field and that's why the given query[1] would > work. > >>>>>> In > >>>>>>>>>>>>> other words, we must guarantee the "SELECT MAX(t22.rowtime) > >>>>>> ..." > >>>>>>>>>>>>> subquery returns a single value that can be retrieved from > the > >>>>>>>>>>>>> cached dynamic table since it's dangerous to join two > >>>>>> un-windowed > >>>>>>>>>>>>> streams. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Under this circumstance, I just consider adding a "monotonic > >>>>>>>>>>>>> hint"(INC or > >>>>>>>>>>>>> DEC) to the field of a (generalized) table (maybe using an > >>>>>>>>>>>>> annotation on the registerDataXX method) that can be used to > >>>>>>>>>>>>> indicate whether a field is monotonically increasing or > >>>>>>> decreasing. > >>>>>>>>>>>>> Then by taking rowtime as common (monotonically increasing) > >>>>>>> field, > >>>>>>>>>>>>> there are several benefits: > >>>>>>>>>>>>> > >>>>>>>>>>>>> 1) This can uniform the table and stream by importing total > >>>>>>>> ordering > >>>>>>>>>>>>> relation to an unordered set. > >>>>>>>>>>>>> > >>>>>>>>>>>>> 2) These fields can be modified arbitrarily as long as they > >>>>>> keep > >>>>>>>> the > >>>>>>>>>>>>> declared monotonic feature and the watermark problem does not > >>>>>>> exist > >>>>>>>>>>>>> any more. > >>>>>>>>>>>>> > >>>>>>>>>>>>> 3) The monotonic hint will be useful in the query > optimization > >>>>>>>>> process. > >>>>>>>>>>>>> What do you think? > >>>>>>>>>>>>> > >>>>>>>>>>>>> Best, > >>>>>>>>>>>>> Xingcan > >>>>>>>>>>>>> > >>>>>>>>>>>>> [1] > >>>>>>>>>>>>> SELECT t1.amount, t2.rate FROM > >>>>>>>>>>>>> table1 AS t1, > >>>>>>>>>>>>> table2 AS t2 > >>>>>>>>>>>>> WHERE > >>>>>>>>>>>>> t1.currency = t2.currency AND > >>>>>>>>>>>>> t2.rowtime = ( > >>>>>>>>>>>>> SELECT MAX(t22.rowtime) FROM table2 AS > >>>>>>>>>>>>> t22 > >>>>>>>>>>>>> AND t22.rowtime <= t1.rowtime) > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Tue, Feb 21, 2017 at 2:52 AM, Fabian Hueske < > >>>>>>> fhue...@gmail.com> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>> Hi everybody, > >>>>>>>>>>>>>> When Timo wrote to the Calcite mailing list, Julian Hyde > >>>>>> replied > >>>>>>>>>>>>>> and gave good advice and explained why a system attribute > for > >>>>>>>>>>>>>> event-time would be > >>>>>>>>>>>>>> > >>>>>>>>>>>>> a > >>>>>>>>>>>>> > >>>>>>>>>>>>>> problem [1]. > >>>>>>>>>>>>>> I thought about this and agree with Julian. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Here is a document to describe the problem, constraints in > >>>>>> Flink > >>>>>>>>>>>>>> and a proposal how to handle processing time and event time > >>>>>>>>>>>>>> in Table API and > >>>>>>>>>>>>>> > >>>>>>>>>>>>> SQL: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> -> > >>>>>>>>>>>>>> https://docs.google.com/document/d/1MDGViWA_ > >>>>>>>>>>>>>> > >>>>>>>>>>>>> TCqpaVoWub7u_GY4PMFSbT8TuaNl- > >>>>>>>>>>>>> > >>>>>>>>>>>>>> EpbTHQ > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Please have a look, comment and ask questions. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Thank you, > >>>>>>>>>>>>>> Fabian > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> [1] > >>>>>>>>>>>>>> https://lists.apache.org/thread.html/ > >>>>>>>> 6397caf0ca37f97f2cd27d96f7a12c > >>>>>>>>>>>>>> 6fa845d6fd0870214fdce18d96@%3Cdev.calcite.apache.org%3E > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 2017-02-16 1:18 GMT+01:00 Fabian Hueske <fhue...@gmail.com > >: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Thanks everybody for the comments. > >>>>>>>>>>>>>>> Actually, I think we do not have much choice when deciding > >>>>>>>> whether > >>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> use > >>>>>>>>>>>>>> attributes or functions. > >>>>>>>>>>>>>>> Consider the following join query: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> SELECT t1.amount, t2.rate FROM > >>>>>>>>>>>>>>> table1 AS t1, > >>>>>>>>>>>>>>> table2 AS t2 > >>>>>>>>>>>>>>> WHERE > >>>>>>>>>>>>>>> t1.currency = t2.currency AND > >>>>>>>>>>>>>>> t2.rowtime = ( > >>>>>>>>>>>>>>> SELECT MAX(t22.rowtime) FROM table2 > AS > >>>>>>>>>>>>>>> t22 > >>>>>>>>>>>>>>> AND t22.rowtime <= t1.rowtime) > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> The query joins two streaming tables. Table 1 is a > streaming > >>>>>>>> table > >>>>>>>>>>>>>>> with amounts in a certain currency. Table 2 is a (slowly > >>>>>>>> changing) > >>>>>>>>>>>>>>> streaming table of currency exchange rates. > >>>>>>>>>>>>>>> We want to join the amounts stream with the exchange rate > of > >>>>>>> the > >>>>>>>>>>>>>>> corresponding currency that is valid (i.e., last received > >>>>>> value > >>>>>>>> -> > >>>>>>>>>>>>>>> MAX(rowtime)) at the rowtime of the amounts row. > >>>>>>>>>>>>>>> In order to specify the query, we need to refer to the > >>>>>> rowtime > >>>>>>> of > >>>>>>>>>>>>>>> the different tables. Hence, we need a way to relate the > >>>>>>> rowtime > >>>>>>>>>>>>>>> expression > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> (or > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> marker) to a table. > >>>>>>>>>>>>>>> This is not possible with a parameterless scalar function. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> I'd like to comment on the concerns regarding the > >>>>>> performance: > >>>>>>>>>>>>>>> In fact, the columns could be completely virtual and only > >>>>>> exist > >>>>>>>>>>>>>>> during query parsing and validation. > >>>>>>>>>>>>>>> During execution, we can directly access the rowtime > >>>>>>>>>>>>>>> metadata > >>>>>>> of > >>>>>>>> a > >>>>>>>>>>>>>> Flink > >>>>>>>>>>>>>> streaming record (which is present anyway) or look up the > >>>>>>> current > >>>>>>>>>>>>>>> processing time from the machine clock. So the processing > >>>>>>>> overhead > >>>>>>>>>>>>>> would > >>>>>>>>>>>>>> actually be the same as with a marker function. > >>>>>>>>>>>>>>> Regarding the question on what should be allowed with a > >>>>>> system > >>>>>>>>>>>>>> attribute: > >>>>>>>>>>>>>> IMO, it could be used as any other attribute. We need it at > >>>>>>> least > >>>>>>>>>>>>>> in > >>>>>>>>>>>>>> GROUP > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> BY, ORDER BY, and WHERE to define windows and joins. We > >>>>>>>>>>>>>>> could > >>>>>>>> also > >>>>>>>>>>>>>> allow > >>>>>>>>>>>>>> to > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> access it in SELECT if we want users to give access to > >>>>>> rowtime > >>>>>>>> and > >>>>>>>>>>>>>>> processing time. So @Haohui, your query could be supported. > >>>>>>>>>>>>>>> However, what would not be allowed is to modify the value > of > >>>>>>> the > >>>>>>>>>>>>>>> rows, i.e., by naming another column rowtime, i.e., "SELECT > >>>>>>>>>>>>>>> sometimestamp AS rowtime" would not be allowed, because > >>>>>>>>>>>>>>> Flink > >>>>>>>> does > >>>>>>>>>>>>>>> not support to modify > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> event time of a row (for good reasons) and processing time > >>>>>>> should > >>>>>>>>>>>>>>> not > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> be > >>>>>>>>>>>>>> modifiable anyway. > >>>>>>>>>>>>>>> @Timo: > >>>>>>>>>>>>>>> I think the approach to only use the system columns during > >>>>>>>> parsing > >>>>>>>>>>>>>>> and validation and converting them to expressions > afterwards > >>>>>>>> makes > >>>>>>>>>>>>>>> a lot of sense. > >>>>>>>>>>>>>>> The question is how this approach could be nicely > integrated > >>>>>>> with > >>>>>>>>>>>>>> Calcite. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Best, Fabian > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 2017-02-15 16:50 GMT+01:00 Radu Tudoran < > >>>>>>> radu.tudo...@huawei.com > >>>>>>>>> : > >>>>>>>>>>>>>>> Hi, > >>>>>>>>>>>>>>>> My initial thought would be that it makes more sense to > >>>>>> thave > >>>>>>>>>>>>>>> procTime() > >>>>>>>>>>>>>> and rowTime() only as functions which in fact are to be used > >>>>>> as > >>>>>>>>>>>>>>> markers. > >>>>>>>>>>>>>> Having the value (even from special system attributes does > >>>>>>>>>>>>>> not > >>>>>>>> make > >>>>>>>>>>>>>>> sense > >>>>>>>>>>>>>>> in some scenario such as the ones for creating windows, > >>>>>>>>>>>>>>> e.g., > >>>>>>>>>>>>>>>> If you have SELECT Count(*) OVER (ORDER BY procTime()...) > >>>>>>>>>>>>>>>> If > >>>>>>> you > >>>>>>>>>>>>>>>> get the value of procTime you cannot do anything as you > >>>>>>>>>>>>>>>> need > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>> marker to know how to construct the window logic. > >>>>>>>>>>>>>>>> However, your final idea of having " implement some > >>>>>> rule/logic > >>>>>>>>>>>>>>>> that translates the attributes to special RexNodes > >>>>>> internally > >>>>>>> " > >>>>>>>> I > >>>>>>>>>>>>>>>> believe > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> is > >>>>>>>>>>>>>> good and gives a solution to both problems. One the one hand > >>>>>> for > >>>>>>>>>>>>>> those > >>>>>>>>>>>>>>>> scenarios where you need the value you can access the > >>>>>>>>>>>>>>>> value, while for others you can see the special type of > the > >>>>>>>>>>>>>>>> RexNode > >>>>>>> and > >>>>>>>>>>>>>>>> use it as a > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> marker. > >>>>>>>>>>>>>>> Regarding keeping this data in a table...i am not sure as > >>>>>>>>>>>>>>> you would say > >>>>>>>>>>>>>> we need to augment the data with two fields whether needed > >>>>>>>>>>>>>> or > >>>>>>>>>>>>>>> not...this > >>>>>>>>>>>>>>> is nto necessary very efficient > >>>>>>>>>>>>>>>> Dr. Radu Tudoran > >>>>>>>>>>>>>>>> Senior Research Engineer - Big Data Expert IT R&D Division > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research > >>>>>>>>>>>>>>>> Center Riesstrasse 25, 80992 München > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> E-mail: radu.tudo...@huawei.com > >>>>>>>>>>>>>>>> Mobile: +49 15209084330 > >>>>>>>>>>>>>>>> Telephone: +49 891588344173 > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 > >>>>>>>>>>>>>>>> Düsseldorf, Germany, www.huawei.com Registered Office: > >>>>>>>>>>>>>>>> Düsseldorf, Register Court Düsseldorf, > >>>>>> HRB > >>>>>>>>> 56063, > >>>>>>>>>>>>>>>> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz > >>>>>>>>>>>>>>>> der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, > >>>>>> HRB > >>>>>>>>>> 56063, > >>>>>>>>>>>>>>>> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN This > >>>>>>>>>>>>>>>> e-mail and its attachments contain confidential > >>>>>>> information > >>>>>>>>>> from > >>>>>>>>>>>>>>>> HUAWEI, which is intended only for the person or entity > >>>>>> whose > >>>>>>>>>> address > >>>>>>>>>>>>>>> is > >>>>>>>>>>>>>> listed above. Any use of the information contained herein in > >>>>>> any > >>>>>>>> way > >>>>>>>>>>>>>>>> (including, but not limited to, total or partial > >>>>>>>>>>>>>>>> disclosure, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> reproduction, > >>>>>>>>>>>>>>> or dissemination) by persons other than the intended > >>>>>>> recipient(s) > >>>>>>>>> is > >>>>>>>>>>>>>>>> prohibited. If you receive this e-mail in error, please > >>>>>> notify > >>>>>>>> the > >>>>>>>>>>>>>>> sender > >>>>>>>>>>>>>>> by phone or email immediately and delete it! > >>>>>>>>>>>>>>>> -----Original Message----- > >>>>>>>>>>>>>>>> From: Timo Walther [mailto:twal...@apache.org] > >>>>>>>>>>>>>>>> Sent: Wednesday, February 15, 2017 9:33 AM > >>>>>>>>>>>>>>>> To: dev@flink.apache.org > >>>>>>>>>>>>>>>> Subject: Re: [DISCUSS] Table API / SQL indicators for > event > >>>>>>> and > >>>>>>>>>>>>>>>> processing time > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Hi all, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> at first I also thought that built-in functions (rowtime() > >>>>>> and > >>>>>>>>>>>>>>>> proctime()) are the easiest solution. However, I think to > >>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> future-proof > >>>>>>>>>>>>>>> we should make them system attributes; esp. to relate them > >>>>>> to a > >>>>>>>>>>>>>>>> corresponding table in case of multiple tables. Logically > >>>>>> they > >>>>>>>> are > >>>>>>>>>>>>>>>> attributes of each row, which is already done in Table > API. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I will ask on the Calcite ML if there is a good way for > >>>>>>>>> integrating > >>>>>>>>>>>>>>>> system attributes. Right now, I would propose the > following > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> implementation: > >>>>>>>>>>>>>>> - we introduce a custom row type (extending RelDataType) > >>>>>>>>>>>>>>>> - in a streaming environment every row has two attributes > >>>>>>>>>>>>>>>> by > >>>>>>>>> default > >>>>>>>>>>>>>>>> (rowtime and proctime) > >>>>>>>>>>>>>>>> - we do not allow creating a row type with those > attributes > >>>>>>>> (this > >>>>>>>>>>>>>>> should > >>>>>>>>>>>>>> already prevent `SELECT field AS rowtime FROM ...`) > >>>>>>>>>>>>>>>> - we need to ensure that these attributes are not part of > >>>>>>>>> expansion > >>>>>>>>>>>>>>> like > >>>>>>>>>>>>>> `SELECT * FROM ...` > >>>>>>>>>>>>>>>> - implement some rule/logic that translates the attributes > >>>>>> to > >>>>>>>>>> special > >>>>>>>>>>>>>>>> RexNodes internally, such that the opimizer does not > modify > >>>>>>>> these > >>>>>>>>>>>>>>> attributes > >>>>>>>>>>>>>>> What do you think? > >>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>> Timo > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Am 15/02/17 um 03:36 schrieb Xingcan Cui: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Hi all, > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> thanks for this thread. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> @Fabian If I didn't miss the point, the main difference > >>>>>>> between > >>>>>>>>> the > >>>>>>>>>>>>>>>>> two approaches is whether or not taking these time > >>>>>> attributes > >>>>>>>> as > >>>>>>>>>>>>>>>>> common table fields that are directly available to users. > >>>>>>>>> Whatever, > >>>>>>>>>>>>>>>>> these time attributes should be attached to records > >>>>>> (right?), > >>>>>>>> and > >>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>> discussion lies in whether give them public qualifiers like > >>>>>>> other > >>>>>>>>>>>>>>>>> common fields or private qualifiers and related get/set > >>>>>>>> methods. > >>>>>>>>>>>>>>>>> The former (system attributes) approach will be more > >>>>>>> compatible > >>>>>>>>>> with > >>>>>>>>>>>>>>>>> existing SQL read-only operations (e.g., select, join), > >>>>>>>>>>>>>>>>> but > >>>>>>> we > >>>>>>>>> need > >>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>> add restrictions on SQL modification operation (like what?). > >>>>>>>>>>>>>> I > >>>>>>>> think > >>>>>>>>>>>>>>>>> there are no needs to forbid users modifying these > >>>>>> attributes > >>>>>>>> via > >>>>>>>>>>>>>>>>> table APIs (like map function). Just inform them about > >>>>>> these > >>>>>>>>>> special > >>>>>>>>>>>>>>>>> attribute names like system built in aggregator names in > >>>>>>>>> iteration. > >>>>>>>>>>>>>>>>> As for the built in function approach, I don't know if, > >>>>>>>>>>>>>>>>> for > >>>>>>>> now, > >>>>>>>>>>>>>>>> there > >>>>>>>>>>>>>> are functions applied on a single row (maybe the value > access > >>>>>>>>>>>>>>>>> functions like COMPOSITE.get(STRING)?). It seems that > most > >>>>>> of > >>>>>>>> the > >>>>>>>>>>>>>>>>> built in functions work for a single field or on columns > >>>>>> and > >>>>>>>> thus > >>>>>>>>>> it > >>>>>>>>>>>>>>>>> will be mountains of work if we want to add a new kind of > >>>>>>>>> function > >>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>> SQL. Maybe all existing operations should be modified to > >>>>>> support > >>>>>>>> it. > >>>>>>>>>>>>>>>>> All in all, if there are existing supports for single row > >>>>>>>>> function, > >>>>>>>>>>>>>>>> I > >>>>>>>>>>>>>> prefer the built in function approach. Otherwise the system > >>>>>>>>>>>>>>>> attributes > >>>>>>>>>>>>>> approach should be better. After all there are not so much > >>>>>>>>>>>>>>>>> modification operations in SQL and maybe we can use alias > >>>>>> to > >>>>>>>>>> support > >>>>>>>>>>>>>>>>> time attributes setting (just hypothesis, not sure if > it's > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> feasible). > >>>>>>>>>>>>>> @Haohui I think the given query is valid if we add a > >>>>>>>>>>>>>> aggregate > >>>>>>>>>>>>>>>>> function to (PROCTIME() > >>>>>>>>>>>>>>>>> - ROWTIME()) / 1000 and it should be executed > efficiently. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>>> Xingcan > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On Wed, Feb 15, 2017 at 6:17 AM, Haohui Mai < > >>>>>>>> ricet...@gmail.com> > >>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>> Hi, > >>>>>>>>>>>>>>>>>> Thanks for starting the discussion. I can see there are > >>>>>>>> multiple > >>>>>>>>>>>>>>>>>> trade-offs in these two approaches. One question I have > >>>>>>>>>>>>>>>>>> is > >>>>>>>> that > >>>>>>>>> to > >>>>>>>>>>>>>>>>>> which extent Flink wants to open its APIs to allow users > >>>>>> to > >>>>>>>>> access > >>>>>>>>>>>>>>>>>> both processing and event time. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Before we talk about joins, my understanding for the two > >>>>>>>>>> approaches > >>>>>>>>>>>>>>>>>> that you mentioned are essentially (1) treating the > value > >>>>>> of > >>>>>>>>> event > >>>>>>>>>>>>>>>>> / > >>>>>>>>>>>>>> processing time as first-class fields for each row, (2) > >>>>>> limiting > >>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>> scope of time indicators to only specifying windows. Take > the > >>>>>>>>>>>>>>>>>> following query as an > >>>>>>>>>>>>>>>>>> example: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> SELECT (PROCTIME() - ROWTIME()) / 1000 AS latency FROM > >>>>>> table > >>>>>>>>> GROUP > >>>>>>>>>>>>>>>>> BY > >>>>>>>>>>>>>> FLOOR(PROCTIME() TO MINUTES) > >>>>>>>>>>>>>>>>>> There are several questions we can ask: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> (1) Is it a valid query? > >>>>>>>>>>>>>>>>>> (2) How efficient the query will be? > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> For this query I can see arguments from both sides. I > >>>>>> think > >>>>>>> at > >>>>>>>>> the > >>>>>>>>>>>>>>>>>> end of the day it really comes down to what Flink wants > >>>>>>>>>>>>>>>>>> to > >>>>>>>>>> support. > >>>>>>>>>>>>>>>>>> After working on FLINK-5624 I'm more inclined to support > >>>>>> the > >>>>>>>>>> second > >>>>>>>>>>>>>>>>>> approach (i.e., built-in functions). The main reason why > >>>>>> is > >>>>>>>> that > >>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>> APIs of Flink are designed to separate times from the real > >>>>>>>>>>>>>>>>> payloads. > >>>>>>>>>>>>>> It probably makes sense for the Table / SQL APIs to have the > >>>>>>> same > >>>>>>>>>>>>>>>>> designs. > >>>>>>>>>>>>>>>>> For joins I don't have a clear answer on top of my head. > >>>>>>> Flink > >>>>>>>>>>>>>>>>>> requires two streams to be put in the same window before > >>>>>>> doing > >>>>>>>>> the > >>>>>>>>>>>>>>>>>> joins. This is essentially a subset of what SQL can > >>>>>>> express. I > >>>>>>>>>>>>>>>>> don't > >>>>>>>>>>>>>> know what would be the best approach here. > >>>>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>>>> Haohui > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> On Tue, Feb 14, 2017 at 12:26 AM Fabian Hueske < > >>>>>>>>> fhue...@gmail.com > >>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>> Hi, > >>>>>>>>>>>>>>>>>>> It would as in the query I gave as an example before: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> SELECT > >>>>>>>>>>>>>>>>>>> a, > >>>>>>>>>>>>>>>>>>> SUM(b) OVER (PARTITION BY c ORDER BY proctime > ROWS > >>>>>>>> BETWEEN > >>>>>>>>> 2 > >>>>>>>>>>>>>>>>>>> PRECEDING AND CURRENT ROW) AS sumB, FROM myStream > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Here "proctime" would be a system attribute of the > table > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> "myStream". > >>>>>>>>>>>>>> The table would also have another system attribute called > >>>>>>>>>>>>>>>>>> "rowtime" > >>>>>>>>>>>>>> which would be used to indicate event time semantics. > >>>>>>>>>>>>>>>>>>> These attributes would always be present in tables > which > >>>>>>> are > >>>>>>>>>>>>>>>>>> derived > >>>>>>>>>>>>>> from streams. > >>>>>>>>>>>>>>>>>>> Because we still require that streams have timestamps > >>>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> watermarks > >>>>>>>>>>>>>> assigned (either by the StreamTableSource or the somewhere > >>>>>>>>>>>>>>>>>>> downstream the DataStream program) when they are > >>>>>> converted > >>>>>>>>> into a > >>>>>>>>>>>>>>>>>>> table, there is no > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> need > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> to register anything. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Does that answer your questions? > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Best, Fabian > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> 2017-02-14 2:04 GMT+01:00 Radu Tudoran < > >>>>>>>>> radu.tudo...@huawei.com > >>>>>>>>>>> : > >>>>>>>>>>>>>>>>>>> Hi Fabian, > >>>>>>>>>>>>>>>>>>>> Thanks for starting the discussion. Before I give my > >>>>>>>> thoughts > >>>>>>>>> on > >>>>>>>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> can > >>>>>>>>>>>>>>>>>>> you please give some examples of how would you see > >>>>>>>>>>>>>>>>>>> option > >>>>>>> of > >>>>>>>>>>>>>>>>>>> using > >>>>>>>>>>>>>> "system > >>>>>>>>>>>>>>>>>>>> attributes"? > >>>>>>>>>>>>>>>>>>>> Do you use this when you register the stream as a > >>>>>>>>>>>>>>>>>>>> table, > >>>>>>> do > >>>>>>>>> you > >>>>>>>>>>>>>>>>>>> use > >>>>>>>>>>>>>> if when you call an SQL query, do you use it when you > >>>>>> translate > >>>>>>>>>>>>>>>>>>>> back a > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> table > >>>>>>>>>>>>>>>>>>> to a stream / write it to a dynamic table? > >>>>>>>>>>>>>>>>>>>> Dr. Radu Tudoran > >>>>>>>>>>>>>>>>>>>> Senior Research Engineer - Big Data Expert IT R&D > >>>>>> Division > >>>>>>>>>>>>>>>>>>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research > >>>>>>>>>>>>>>>>>>>> Center Riesstrasse 25, 80992 München > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> E-mail: radu.tudo...@huawei.com > >>>>>>>>>>>>>>>>>>>> Mobile: +49 15209084330 <+49%201520%209084330> > >>>>>>>>>>>>>>>>>>>> Telephone: +49 891588344173 <+49%2089%201588344173> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, > >>>>>>>>>>>>>>>>>>>> 40549 Düsseldorf, Germany, > >>>>>> www.huawei.com > >>>>>>>>>>>>>>>>>>>> Registered Office: Düsseldorf, Register Court > >>>>>> Düsseldorf, > >>>>>>>> HRB > >>>>>>>>>>>>>>>>>>> 56063, > >>>>>>>>>>>>>>> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN > >>>>>>>>>>>>>>>>>>>> Sitz der Gesellschaft: Düsseldorf, Amtsgericht > >>>>>> Düsseldorf, > >>>>>>>> HRB > >>>>>>>>>>>>>>>>>>> 56063, > >>>>>>>>>>>>>>> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN > >>>>>>>>>>>>>>>>>>>> This e-mail and its attachments contain confidential > >>>>>>>>> information > >>>>>>>>>>>>>>>>>>> from > >>>>>>>>>>>>>>> HUAWEI, which is intended only for the person or entity > >>>>>>>>>>>>>>> whose > >>>>>>>>>>>>>>>>>>> address > >>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>> listed above. Any use of the information contained > >>>>>>>>>>>>>>>>>>> herein > >>>>>>> in > >>>>>>>>> any > >>>>>>>>>>>>>>>>>>> way > >>>>>>>>>>>>>>> (including, but not limited to, total or partial > disclosure, > >>>>>>>>>>>>>>>>>>> reproduction, > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> or dissemination) by persons other than the intended > >>>>>>>>>> recipient(s) > >>>>>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>> prohibited. If you receive this e-mail in error, please > >>>>>> notify > >>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>> sender > >>>>>>>>>>>>>>>>>>> by phone or email immediately and delete it! > >>>>>>>>>>>>>>>>>>>> -----Original Message----- > >>>>>>>>>>>>>>>>>>>> From: Fabian Hueske [mailto:fhue...@gmail.com] > >>>>>>>>>>>>>>>>>>>> Sent: Tuesday, February 14, 2017 1:01 AM > >>>>>>>>>>>>>>>>>>>> To: dev@flink.apache.org > >>>>>>>>>>>>>>>>>>>> Subject: [DISCUSS] Table API / SQL indicators for > event > >>>>>>> and > >>>>>>>>>>>>>>>>>>> processing > >>>>>>>>>>>>>>>>> time > >>>>>>>>>>>>>>>>>>>> Hi, > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> I'd like to start an discussion about how Table API / > >>>>>> SQL > >>>>>>>>>> queries > >>>>>>>>>>>>>>>>>>> indicate > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> whether an operation is done in event or processing > >>>>>> time. > >>>>>>>>>>>>>>>>>>>> 1) Why do we need to indicate the time mode? > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> We need to distinguish event time and processing time > >>>>>> mode > >>>>>>>> for > >>>>>>>>>>>>>>>>>>> operations > >>>>>>>>>>>>>>>>>>> in queries in order to have the semantics of a query > >>>>>> fully > >>>>>>>>>>>>>>>>>>> defined. > >>>>>>>>>>>>>> This cannot be globally done in the TableEnvironment because > >>>>>>> some > >>>>>>>>>>>>>>>>>>> queries > >>>>>>>>>>>>>>>>>>> explicitly request an expression such as the ORDER BY > >>>>>>> clause > >>>>>>>> of > >>>>>>>>>>>>>>>>>>> an > >>>>>>>>>>>>>> OVER > >>>>>>>>>>>>>>>>> window with PRECEDING / FOLLOWING clauses. > >>>>>>>>>>>>>>>>>>>> So we need a way to specify something like the > >>>>>>>>>>>>>>>>>>>> following > >>>>>>>>> query: > >>>>>>>>>>>>>>>>>>>> SELECT > >>>>>>>>>>>>>>>>>>>> a, > >>>>>>>>>>>>>>>>>>>> SUM(b) OVER (PARTITION BY c ORDER BY > proctime ROWS > >>>>>>>>> BETWEEN 2 > >>>>>>>>>>>>>>>>>>> PRECEDING > >>>>>>>>>>>>>>>>>>> AND CURRENT ROW) AS sumB, FROM myStream > >>>>>>>>>>>>>>>>>>>> where "proctime" indicates processing time. > >>>>>>>>>>>>>>>>>>>> Equivalently > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> "rowtime" > >>>>>>>>>>>>>> would > >>>>>>>>>>>>>>>>>>> indicate event time. > >>>>>>>>>>>>>>>>>>>> 2) Current state > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> The current master branch implements time support only > >>>>>> for > >>>>>>>>>>>>>>>>>>> grouping > >>>>>>>>>>>>>> windows in the Table API. > >>>>>>>>>>>>>>>>>>>> Internally, the Table API converts a 'rowtime symbol > >>>>>>> (which > >>>>>>>>>> looks > >>>>>>>>>>>>>>>>>>> like > >>>>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>> regular attribute) into a special expression which > >>>>>>> indicates > >>>>>>>>>>>>>>>>>>> event-time. > >>>>>>>>>>>>>>>>>>> For example: > >>>>>>>>>>>>>>>>>>>> table > >>>>>>>>>>>>>>>>>>>> .window(Tumble over 5.milli on 'rowtime as > 'w) > >>>>>>>>>>>>>>>>>>>> .groupBy('a, 'w) > >>>>>>>>>>>>>>>>>>>> .select(...) > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> defines a tumbling event-time window. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Processing-time is indicated by omitting a time > >>>>>> attribute > >>>>>>>>>>>>>>>>>>>> (table.window(Tumble over 5.milli as 'w) ). > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> 3) How can we do that in SQL? > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> In SQL we cannot add special expressions without > >>>>>> touching > >>>>>>>> the > >>>>>>>>>>>>>>>>>>> parser > >>>>>>>>>>>>>>> which > >>>>>>>>>>>>>>>>>>>> we don't want to do because we want to stick to the > SQL > >>>>>>>>>> standard. > >>>>>>>>>>>>>>>>>>>> Therefore, I see only two options: adding system > >>>>>>> attributes > >>>>>>>> or > >>>>>>>>>>>>>>>>>>>> (parameterless) built-in functions. I list some pros > >>>>>>>>>>>>>>>>>>>> and > >>>>>>>> cons > >>>>>>>>> of > >>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>> approaches below: > >>>>>>>>>>>>>>>>>>>> 1. System Attributes: > >>>>>>>>>>>>>>>>>>>> + most natural way to access a property of a record. > >>>>>>>>>>>>>>>>>>>> + works with joins, because time attributes can be > >>>>>> related > >>>>>>>> to > >>>>>>>>>>>>>>>>>>> tables > >>>>>>>>>>>>>>> - We need to ensure the attributes are not writable and > >>>>>> always > >>>>>>>>>>>>>>>>>>> present > >>>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>> streaming tables (i.e., they should be system defined > >>>>>>>>>>>>>>>>>>> attributes). > >>>>>>>>>>>>>> - Need to adapt existing Table API expressions (will not > >>>>>> change > >>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>> API > >>>>>>>>>>>>>>>>> but some parts of the internal translation) > >>>>>>>>>>>>>>>>>>>> - Event time value must be set when the stream is > >>>>>>> converted, > >>>>>>>>>>>>>>>>>>> processing > >>>>>>>>>>>>>>>>> time is evaluated on the fly > >>>>>>>>>>>>>>>>>>>> 2. Built-in Functions > >>>>>>>>>>>>>>>>>>>> + Users could try to modify time attributes which is > >>>>>>>>>>>>>>>>>>>> + not > >>>>>>>>>> possible > >>>>>>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>> functions > >>>>>>>>>>>>>>>>>>>> - do not work with joins, because we need to address > >>>>>>>> different > >>>>>>>>>>>>>>>>>>> relations > >>>>>>>>>>>>>>>>>>> - not a natural way to access a property of a record > >>>>>>>>>>>>>>>>>>>> I think the only viable choice are system attributes, > >>>>>>>> because > >>>>>>>>>>>>>>>>>>> built-in > >>>>>>>>>>>>>>>>> functions cannot be used for joins. > >>>>>>>>>>>>>>>>>>>> However, system attributes are the more complex > >>>>>>>>>>>>>>>>>>>> solution > >>>>>>>>> because > >>>>>>>>>>>>>>>>>>> they > >>>>>>>>>>>>>>> need > >>>>>>>>>>>>>>>>>>>> a better integration with Calcite's SQL validator > >>>>>>>> (preventing > >>>>>>>>>>>>>>>>>>> user > >>>>>>>>>>>>>> attributes which are named rowtime for instance). > >>>>>>>>>>>>>>>>>>>> Since there are currently a several contributions on > >>>>>>>>>>>>>>>>>>>> the > >>>>>>> way > >>>>>>>>>>>>>>>>>>> (such > >>>>>>>>>>>>>> as > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> SQL > >>>>>>>>>>>>>>>>>>> OVER windows FLINK-5653 to FLINK-5658) that need time > >>>>>>>>> indicators, > >>>>>>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>> need a > >>>>>>>>>>>>>>>>>>>> solution soon to be able to make progress. > >>>>>>>>>>>>>>>>>>>> There are two PRs, #3252 and #3271, which implement > the > >>>>>>>>> built-in > >>>>>>>>>>>>>>>>>>> marker > >>>>>>>>>>>>>>>>> functions proctime() and rowtime() and which could serve > >>>>>> as a > >>>>>>>>>>>>>>>>>>> temporary > >>>>>>>>>>>>>>>>> solution (since we do not work on joins yet). > >>>>>>>>>>>>>>>>>>>> I would like to suggest to use these functions as a > >>>>>>> starting > >>>>>>>>>>>>>>>>>>> point > >>>>>>>>>>>>>> (once > >>>>>>>>>>>>>>>>>>> the PRs are merged) and later change to the system > >>>>>>> attribute > >>>>>>>>>>>>>>>>>>> solution > >>>>>>>>>>>>>>> which > >>>>>>>>>>>>>>>>>>>> needs a bit more time to be implemented. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> I talked with Timo today about this issue and he said > >>>>>>>>>>>>>>>>>>>> he > >>>>>>>> would > >>>>>>>>>>>>>>>>>>> like > >>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>> investigate how we can implement this as system functions > >>>>>>>>>>>>>>>>>>> properly > >>>>>>>>>>>>>> integrated with Calcite and the SQL Validator. > >>>>>>>>>>>>>>>>>>>> What do others think? > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Best, Fabian > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >