Hi Timo,

I had a look at your branch. Thanks for all the refactoring and work you
put into this.

I like the proposal a lot. I think indicating the time attribute in the
schema is a good idea. I'm not sure if we should support the rowtime
expression for batch table though. For batch, any long or timestamp
attribute can be used as a time attribute. So marking a single one as time
attribute is not really necessary, IMO. If others think that this is needed
to make the batch and stream cases identical I would be OK with having it.
OTOH I would not consider the schema to be part of the query.

I think we should continue with this work as it is likely to take more time
until we can merge it into the master.
@Timo: would the next step be to open a PR for this?

Best, Fabian


2017-03-20 14:15 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com>:

> Hi,
>
> I am not sure if it is not about setting the timestamp within the query
> but you can imagine that there examples where you have different timestamps
> as mentioned. Take for example the case when we do a purchase online. You
> have:
> -time of purchase  (when the payment was input/triggered)
> -time of executing the transaction at bank (when the payment is processed
> from account)
> -time of settlement (when the payment is executed at merchant bank - when
> money are received by the seller)
>
> In such a scenario you can imagine that over the same stream of online
> payments you might want to run different queries, each that might be driven
> by one of these times. Supporting such a scenario would mean that we have
> one input stream that enters flink engine via a table source and then in
> the query we can start running different queries:
> e.g. SELECT SUM(amount) ORDER BY rowtime(time_purchase) LIMIT 10   //you
> want the amount over your last 10 orders
> e.g. SELECT SUM(amount) ORDER BY rowtime(time_settlement) LIMIT 10 //you
> want the amount over your last 10 income
>
> Best regards,
>
>
> -----Original Message-----
> From: Timo Walther [mailto:twal...@apache.org]
> Sent: Monday, March 20, 2017 2:05 PM
> To: dev@flink.apache.org
> Subject: Re: FW: [DISCUSS] Table API / SQL indicators for event and
> processing time
>
> Yes, you are right. In the current design the user cannot assign
> timestamp and watermarks in a table program. Operators (such as windows)
> might adapt the metatimestamp, if this is the case this adaption might
> need to be expressed in the query itself too.
>
> E.g. for a tumbling windows we could limit the select part to
> table.select('rowtime.ceil(DAY) as 'newRowtime) (so that logical rowtime
> matches the physical metatimestamp)
>
> Do you have a good example use case that needs the assignment of rowtime
> within a query?
>
> Am 20/03/17 um 13:39 schrieb Radu Tudoran:
> > Hi,
> >
> > As suggested by Timo - I am forwarding this to the mailing list. Sorry
> for not having the conversation directly here - I initially thought it
> might not be of interest...
> >
> > @Timo - thanks for the clarification. I get the main point now which is
> that the rowtime is encoded within the  metadata of the record. I think
> this is key. My view on the matter was maybe a bit updated in the sense
> that I saw the processing pipeline as an input source (as you exemplify - a
> table scan) and from there you have a timestamp and water mark assigner
> before the processing actually starts. So by overriding the timestamp
> extractor you match the field that carries the eventtime/rowtime with the
> mechanism from flink. But as far as I understand this would not be the case
> anymore...am I right? In case the assignment of the rowtime to the metadata
> of the record is done differently - what would be the way to do it?
> >
> >
> > Dr. Radu Tudoran
> > Senior Research Engineer - Big Data Expert
> > IT R&D Division
> >
> >
> > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > European Research Center
> > Riesstrasse 25, 80992 München
> >
> > E-mail: radu.tudo...@huawei.com
> > Mobile: +49 15209084330
> > Telephone: +49 891588344173
> >
> > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> > Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
> > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> > Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
> > This e-mail and its attachments contain confidential information from
> HUAWEI, which is intended only for the person or entity whose address is
> listed above. Any use of the information contained herein in any way
> (including, but not limited to, total or partial disclosure, reproduction,
> or dissemination) by persons other than the intended recipient(s) is
> prohibited. If you receive this e-mail in error, please notify the sender
> by phone or email immediately and delete it!
> >
> >
> > -----Original Message-----
> > From: Timo Walther [mailto:twal...@apache.org]
> > Sent: Monday, March 20, 2017 12:29 PM
> > To: Radu Tudoran
> > Subject: Re: [DISCUSS] Table API / SQL indicators for event and
> processing time
> >
> > You are not bothering me, it is very interesting to compare the design
> > with real world use cases.
> >
> > In your use case we would create table like: tEnv.toTable('date, 'time1,
> > 'time2, 'data, 'myrowtime.rowtime)
> >
> > We would not "overwrite" an actual attribute of the record but only add
> > logical "myrowtime". In general, just to make it clear again, the
> > rowtime must be in the metatimestamp of the record (by using a timestamp
> > extractor before). The Table API assumes that records that enter the
> > Table API are timestamped correctly. So in your use case, you would
> > create your own TableSource extract the timestamp based on your 3 time
> > fields and define an attribute that represents the rowtime logically. In
> > the current design we want that the Table API relies on Flink's time
> > handling, because time handling can be very tricky.So we only support
> > one event-time time field.
> >
> > But would it be possible to post our discussion on the ML? It might be
> > interesting for others as well. If yes, can you forward our conversion
> > to the ML?
> >
> > Timo
> >
> >
> >
> > Am 20/03/17 um 12:11 schrieb Radu Tudoran:
> >> Thanks for the replies.
> >>
> >> Regarding the ""It might be sometimes that this is not explicit to be
> guessed" That is
> >> why I added the RelTimeConverter. After this conversion step it should
> >> be as explicit as possible (by using the special types). And we can add
> >> special handling of functions (i.e. ceil) that preserve the
> monotonicity."
> >>
> >> ..maybe I am missing something so sorry if I just bother you for
> nothing (it is just to make sure we think of all cases before hand). I saw
> examples of applications where you have multiple fields of the same type.
> For example an event can have 3 time fields of TIMESTAMP, 1 of DATE and 2
> of TIME (this is actually from a real application with some sort fo
> standard communication schema). I was referring to such cases that it is
> unclear to me how the code will identify the exact field to use as rowtime
> for example. This is what I meant about how are we passing indicators to
> spot the row time field as well as what would happen with the code in such
> a situation as it can identify multiple time fields.
> >>
> >> Dr. Radu Tudoran
> >> Senior Research Engineer - Big Data Expert
> >> IT R&D Division
> >>
> >>
> >> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> >> European Research Center
> >> Riesstrasse 25, 80992 München
> >>
> >> E-mail: radu.tudo...@huawei.com
> >> Mobile: +49 15209084330
> >> Telephone: +49 891588344173
> >>
> >> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> >> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> >> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> >> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
> >> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> >> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
> >> This e-mail and its attachments contain confidential information from
> HUAWEI, which is intended only for the person or entity whose address is
> listed above. Any use of the information contained herein in any way
> (including, but not limited to, total or partial disclosure, reproduction,
> or dissemination) by persons other than the intended recipient(s) is
> prohibited. If you receive this e-mail in error, please notify the sender
> by phone or email immediately and delete it!
> >>
> >>
> >> -----Original Message-----
> >> From: Timo Walther [mailto:twal...@apache.org]
> >> Sent: Monday, March 20, 2017 12:00 PM
> >> To: Radu Tudoran
> >> Subject: Re: [DISCUSS] Table API / SQL indicators for event and
> processing time
> >>
> >> Hi Radu,
> >>
> >> you are right. It is hard to give line specific comments without having
> >> a PR. I will open a work-in-progress PR.
> >>
> >> Regarding the visit functions: Yes you are right. We will need to define
> >> how time is handled for every new operator and override/implement visit
> >> functions for it.
> >>
> >> The RexTimeIndicatorMaterializer wraps concrete time accesses in a
> >> materialization function. This changes the field type to an regular
> >> timestamp. The CodeGenerator then knows that he needs to access the
> >> getTimestamp() method of the ProcessFunction.
> >> Sorry for the bad documentation. This code is still work-in-progress,
> >> that's why I didn't add much comments so far.
> >>
> >> The RexInputRefUpdater basically updates the references when a logical
> >> row type is converted to physical row type. I haven't written test for
> >> this code yet, so there might be bugs in it.
> >>
> >> We have to think about how we integrate join etc. in follow up issues.
> >> What this code ensures is, that operators know what is time and what is
> >> just a "regular field". A join operator knows from which side the time
> >> comes from and which type it has (proc or rowtime).
> >>
> >> "It might be sometimes that this is not explicit to be guessed" That is
> >> why I added the RelTimeConverter. After this conversion step it should
> >> be as explicit as possible (by using the special types). And we can add
> >> special handling of functions (i.e. ceil) that preserve the
> monotonicity.
> >>
> >> "Also in the same class, the 2 function look the same" there is a
> >> difference in the last argument ;-)
> >>
> >> Yes, you are right this is a large change, but we should integrate it in
> >> 1.3 as it is also API breaking and a necessary concept for future PR.
> >> The longer we wait, the harder it is to rebase it ontop of all the
> >> windows in the pipeline. But of course we need to discuss this with the
> >> community.
> >>
> >> Thanks for looking into the code!
> >>
> >> Timo
> >>
> >>
> >> Am 20/03/17 um 11:28 schrieb Radu Tudoran:
> >>> Hi Timo,
> >>>
> >>> As I did not see a pull request I did not know exactly what is the
> best way to give you feedback over the code...so as a last resort I thought
> to write you an email. If you can recommend better tools for the future
> please do so.
> >>>
> >>> In your class
> >>> org.apache.flink.table.api.scala.stream.table.AggregationsITCase.scala
> >>> => I guess there are several others overrides to be done for visit
> function for other types of logical operators (join,window,...). Does this
> mean that from now when we implement a new operator we need to add the
> corresponding implementation here as well?
> >>>
> >>> In the  RexTimeIndicatorMaterializer class I am not sure I understand
> the logic of materializing the time field. From my basic understanding this
> should mean that if we had before materializing it 4 fields for example we
> need to add a 5th one for the time. It is a bit hard to follow at first
> glance how this is implemented. If you could add some more comments it
> would be great
> >>>
> >>> In the RexInputRefUpdater.scala I would expect that you add the number
> of times to be materialized to the input rather than removing it
> >>> new RexInputRef(inputRef.getIndex - countTimeIndicators,
> inputRef.getType)
> >>> my question is shouldn't it be "+" (I a guess you tested and it works
> - case in which please take my remark more in the direction of what whould
> be the logic of having it with minus and not plus...maybe not really the
> kind of feedback you needed)
> >>> =>new RexInputRef(inputRef.getIndex + countTimeIndicators,
> inputRef.getType)
> >>> Also - I see that you iterate over the input reference to look over
> time. Is it possible to have more than one? Can we have/should we have for
> example both proctime and rowtime...or multiple times rowtime?. Or perhaps
> this is for the case of Join/Unions where each side of the biRel comes with
> its own time. In this case how do you want to support this? Will you
> preserve both or create one single new one that you preserve (I would argue
> for this later option as regardless if the events worked before on proctime
> or rowtime - after the union/join a new event is created and the timestamp
> should be reflected as such based on the moment of creation).
> >>>
> >>> In the FlinkTypeFactory.scala in def createRowtimeIndicatorType()
> >>> Shouldn't you access somehow the field that is indicated to contain
> the rowtime from the event. It might be sometimes that this is not explicit
> to be guessed. Or do you make some implicit assumption that this is
> uniquely identifiable by some indicators (name, type,...)
> >>>
> >>> Also in the same class, the 2 function look the same. Is this a
> mistake?...i would assume there must be some differentiation...
> >>>      def createProctimeIndicatorType(): RelDataType = {
> >>>     88 +    val originalType = createTypeFromTypeInfo(
> SqlTimeTypeInfo.TIMESTAMP)
> >>>     89 +    new TimeIndicatorRelDataType(getTypeSystem,
> originalType.asInstanceOf[BasicSqlType], false)
> >>>     90 +  }
> >>>     91 +
> >>>     92 +  def createRowtimeIndicatorType(): RelDataType = {
> >>>     93 +    val originalType = createTypeFromTypeInfo(
> SqlTimeTypeInfo.TIMESTAMP)
> >>>     94 +    new TimeIndicatorRelDataType(getTypeSystem,
> originalType.asInstanceOf[BasicSqlType], true)
> >>>     95 +  }
> >>>
> >>>
> >>> I saw in the buildRowDataType the thing I was expected with adding to
> the event types fields corresponding to proctime and rowtime. I am still a
> bit confused what was happening before :)
> >>>
> >>>
> >>> It looks there are many modifications that would be affected by this
> change. I would propose this to be included only after/right before the
> feature freeze as it would create a lot of rebase and reimplementation for
> the ongoing working features
> >>>
> >>>
> >>> ...not sure if this was helpful...
> >>>
> >>> Dr. Radu Tudoran
> >>> Senior Research Engineer - Big Data Expert
> >>> IT R&D Division
> >>>
> >>>
> >>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> >>> European Research Center
> >>> Riesstrasse 25, 80992 München
> >>>
> >>> E-mail: radu.tudo...@huawei.com
> >>> Mobile: +49 15209084330
> >>> Telephone: +49 891588344173
> >>>
> >>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> >>> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> >>> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> >>> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
> >>> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> >>> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
> >>> This e-mail and its attachments contain confidential information from
> HUAWEI, which is intended only for the person or entity whose address is
> listed above. Any use of the information contained herein in any way
> (including, but not limited to, total or partial disclosure, reproduction,
> or dissemination) by persons other than the intended recipient(s) is
> prohibited. If you receive this e-mail in error, please notify the sender
> by phone or email immediately and delete it!
> >>>
> >>>
> >>> -----Original Message-----
> >>> From: Timo Walther [mailto:twal...@apache.org]
> >>> Sent: Monday, March 20, 2017 10:48 AM
> >>> To: dev@flink.apache.org
> >>> Subject: Re: [DISCUSS] Table API / SQL indicators for event and
> processing time
> >>>
> >>> Hi Radu,
> >>>
> >>> we differentiate rowtime and processing time fields by their field
> types. Both indicators extend the timestamp type. In my protoype I added
> the functions FlinkTypeFactory.isRowtime() and
> >>> FlinkTypeFactory.isProctime() for checking this. If a time indicator
> has been materiatized (e.g. long.cast(STRING)), it becomes a regular
> timestamp (or in this case a string after evaluation). So we cannot
> differentiate between rowtime and proctime anymore. However, we can add
> some exceptions for certain functions (e.g. for ceil() in combination with
> windows) that preserve the time attributes.
> >>>
> >>> Count windows have to be defined over a time attribute. If you take a
> look at the tests of 
> org.apache.flink.table.api.scala.stream.table.AggregationsITCase,
> you can see that countWindows are still supported as before. As I said, in
> most of the user-facing API does not change. It only tries to make time
> more explicit.
> >>>
> >>> Timo
> >>>
> >>>
> >>> Am 20/03/17 um 10:34 schrieb Radu Tudoran:
> >>>> Hi Timo,
> >>>>
> >>>> I have some questions regarding your implementation:
> >>>>
> >>>> " The timestamp (not an indicator anymore) becomes part of the
> physical row. E.g.
> >>>> long.cast(STRING) would require a materialization "
> >>>> => If we have this how are we going to make a difference between
> rowtime and processtime? For supporting some queries/operators you only
> need to use these time indications as markers to have something like below.
> If you do not get access to any sort of unique markers to indicate these
> than we will have hard time to support many implementations. What would be
> the option to support this condition in your implementation
> >>>>       if(rowtime)
> >>>>    ...
> >>>>       else if(proctime)
> >>>>    ...some other implemenetation
> >>>>
> >>>> "- Windows are only valid if they work on time indicators."
> >>>> => Does this mean we can no longer work with count windows? There are
> a lot of queries where windows would be defined based on cardinality of
> elements.
> >>>>
> >>>>
> >>>>
> >>>> -----Original Message-----
> >>>> From: Timo Walther [mailto:twal...@apache.org]
> >>>> Sent: Monday, March 20, 2017 10:08 AM
> >>>> To: dev@flink.apache.org
> >>>> Subject: Re: [DISCUSS] Table API / SQL indicators for event and
> processing time
> >>>>
> >>>> Hi everyone,
> >>>>
> >>>> for the last two weeks I worked on a solution for the time indicator
> issue. I have implemented a prototype[1] which shows how we can express,
> track, and access time in a consistent way for batch and stream tables.
> >>>>
> >>>> Main changes of my current solution:
> >>>>
> >>>> - Processing and rowtime time indicators can be named arbitrarily
> >>>> - They can be defined as follows: stream.toTable(tEnv, 'long, 'int,
> 'string, 'proctime.proctime) or stream.toTable(tEnv, 'long.rowtime, 'int,
> 'string)
> >>>> - In a streaming environment: if the "long" field is already defined
> in the record, it will not be read by the runtime. "long" always represents
> the timestamp of the row.
> >>>> - In batch environment: "long" must be present in the record and will
> be read by the runtime.
> >>>> - The table definition looks equivalent in both batch and streaming
> (better unification than current state)
> >>>> - Internally row types are split up in a logical and a physical row
> type.
> >>>> - The logical row type contains time indicators, the physical rowtime
> never contains time indicators (the pure "long" will never be in a record)
> >>>> - After validation and query decorrelation, a special time indicator
> converter traverses the RelNodes and analyzes if the a time indicator is
> accessed or only forwarded.
> >>>> - An access to a time indicator means that we need to materialize the
> rowtime using a ProcessFunction (not yet implemented). The timestamp (not
> an indicator anymore) becomes part of the physical row. E.g.
> >>>> long.cast(STRING) would require a materialization
> >>>> - Forwarding of time indicators does not materialize the rowtime. It
> remains a logical attribute. E.g. .select('long)
> >>>> - Windows are only valid if they work on time indicators.
> >>>>
> >>>> There are still a lot of open question that we can discuss and/or fix
> in future PRs. For now it would be great if you could give some feedback
> about the current implementation. With some exceptions my branch can be
> built successfully.
> >>>>
> >>>> Regards,
> >>>> Timo
> >>>>
> >>>>
> >>>> [1] https://github.com/twalthr/flink/tree/FLINK-5884
> >>>>
> >>>>
> >>>> Am 02/03/17 um 07:22 schrieb jincheng sun:
> >>>>> Hi,
> >>>>> @Timo, thanks for your replay, and congratulations on your job.
> >>>>> @Fibian, No matter what way to achieve, as long as when the table is
> >>>>> generated or created, identity the field attributes, that is what we
> want.
> >>>>> I think at this point we are on the same page. We can go ahead.
> >>>>> And very glad to hear That: `the 'rowtime keyword would be removed`,
> >>>>> which is a very important step for keeping Stream and Batch
> consistent.
> >>>>>
> >>>>> Best,
> >>>>> SunJincheng
> >>>>>
> >>>>>
> >>>>> 2017-03-01 17:24 GMT+08:00 Fabian Hueske <fhue...@gmail.com>:
> >>>>>
> >>>>>> Hi,
> >>>>>>
> >>>>>> @Xingcan
> >>>>>> Yes that is right. It is not (easily) possible to change the
> >>>>>> watermarks of a stream. All attributes which are used as event-time
> >>>>>> timestamps must be aligned with these watermarks. This are only
> >>>>>> attributes which are derived from the original rowtime attribute,
> >>>>>> i.e., the one that was specified when the Table was created.
> >>>>>>
> >>>>>> @SunJincheng
> >>>>>> Regarding your points:
> >>>>>>
> >>>>>> 1. Watermarks can only be generated for (almost) sorted attributes.
> >>>>>> Since a stream has only one sort order and cannot be sorted before
> it
> >>>>>> is converted into Table, there will be hardly a case where n > 1 is
> >>>>>> possible. The only possibility I see are two attributes which are in
> >>>>>> almost the same order but with a certain distance (think of
> orderDate
> >>>>>> and shipDate, but values would always be 1 day apart). However, this
> >>>>>> requirement is very limiting and to be honest, I don't see how
> >>>>>> assigning different watermarks for different attributes would work
> reliably in practice.
> >>>>>> The ORDER BY clause in an OVER window can only be used because the
> >>>>>> stream is already sorted on that attribute (that's also why it is
> >>>>>> restricted to rowtime and proctime in streaming)
> >>>>>>
> >>>>>> 2. Since a stream can only have one sort order, we so far assumed
> >>>>>> that streams would already have watermarks and timestamps assigned.
> I
> >>>>>> think this is a fair assumption, because a stream can only have one
> >>>>>> order and hence only one timestamped & watermarked attribute (except
> >>>>>> for the corner case I discussed above). As Timo said, .rowtime would
> >>>>>> only add an attribute which refers to the already assigned
> timestamp of a row.
> >>>>>>
> >>>>>> 3. I completely agree that the difference between batch and
> streaming
> >>>>>> should be overcome. This is actually the goal of Timo's work. So
> yes,
> >>>>>> the 'rowtime keyword would be removed because any attribute can be
> >>>>>> marked as event-time attribute (by calling 't.rowtime).
> >>>>>>
> >>>>>> Btw. A table source could still make the watermark configurable by
> >>>>>> offering a respective interface. However, I'm not yet convinced that
> >>>>>> this needs to be part of the Table API.
> >>>>>>
> >>>>>> What do you think?
> >>>>>>
> >>>>>> Best, Fabian
> >>>>>>
> >>>>>> 2017-03-01 7:55 GMT+01:00 jincheng sun <sunjincheng...@gmail.com>:
> >>>>>>
> >>>>>>> Hi,Fabian,
> >>>>>>>
> >>>>>>>       Thanks for your attention to this discussion. Let me share
> some
> >>>>>>> ideas about this. :)
> >>>>>>>
> >>>>>>> 1. Yes, the solution I have proposed can indeed be extended to
> >>>>>>> support multi-watermarks. A single watermark is a special case of
> >>>>>>> multiple watermarks (n = 1). I agree that for the realization of
> the
> >>>>>>> simple, that
> >>>>>> we
> >>>>>>> currently only support single watermark. Our idea is consistent.
> >>>>>>>
> >>>>>>>        BTW. I think even if we only use one attribute to generate
> >>>>>>> watermark we also need to sort, because in OVER window(Event-time)
> >>>>>>> we must know the exact data order, is that right?
> >>>>>>>
> >>>>>>> 2. I think our difference is how to register the watermark?
> >>>>>>>         Now we see two ways:
> >>>>>>>         A. t.rowtime;
> >>>>>>>             If I understand correctly, in the current design when
> we use
> >>>>>>> the expression 'rowtime, The system defaults based on user data to
> >>>>>>> export timestamps;
> >>>>>>>         B. registeredWatermarks ('t, waterMarkFunction1):
> >>>>>>>             We are explicitly registered to generate watermarks and
> >>>>>>> extract timestamps in user-defined ways;
> >>>>>>>
> >>>>>>>        These two ways are characterized by:
> >>>>>>>         Approach A: The system defaults to export the value of the
> t
> >>>>>>> field as
> >>>>>> a
> >>>>>>> timestamp, which is simple for the system.
> >>>>>>>         Approach B: the user can develop the logic of the export
> >>>>>>> timestamp,
> >>>>>> for
> >>>>>>> the user has been very flexible. For example: the field `t` is a
> >>>>>>> complex field (value is:` xxx # 20170302111129 # yyy`), the user
> can
> >>>>>>> press a certain logic export timestamp (20170302111129).
> >>>>>>>
> >>>>>>>         So i tend to approach B. What do you think?
> >>>>>>>
> >>>>>>>       3. We are very concerned about the unity of Stream and
> Batch, such
> >>>>>>> as
> >>>>>> the
> >>>>>>> current TableAPI:
> >>>>>>>          Batch:
> >>>>>>>           Table
> >>>>>>>            .window (Tumble over 2.rows on 'long as' w) //' long is
> the
> >>>>>>> normal field
> >>>>>>>            .groupBy ('w)
> >>>>>>>            .select ('int.count)
> >>>>>>>
> >>>>>>>          Stream:
> >>>>>>>           Table
> >>>>>>>            .window (Tumble over 5.milli on 'rowtime as' w) //'
> rowtime
> >>>>>>> is the keyword
> >>>>>>>            .groupBy ('w)
> >>>>>>>            .select ('int.count)
> >>>>>>>
> >>>>>>>         As mentioned above, the two example are event-time
> aggregation
> >>>>>>> window, but the writing did not do the same way, batch we have a
> >>>>>>> specific column, stream need 'rowtime keyword. I think we need to
> >>>>>>> try to eliminate this difference. What do you think?
> >>>>>>>
> >>>>>>>         In the current google doc I see `table.window (tumble over
> >>>>>>> 1.hour on
> >>>>>> 't
> >>>>>>> as' w) .groupBy ('a,' w) .select ('w.start,' b.count)`, Does this
> >>>>>>> mean
> >>>>>> that
> >>>>>>> in FLINK-5884 will remove the tableAPI 'rowtime keyword?
> >>>>>>>
> >>>>>>>        So I am currently talking on the event-time in the SQL
> >>>>>>> indicators, in
> >>>>>> the
> >>>>>>> table registered column attributes, does this mean that the batch
> >>>>>>> and stream SQL in the writing and use of the same?
> >>>>>>>
> >>>>>>> Very appreciated for your feedback.
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> SunJincheng
> >>>>>>>
> >>>>>>> 2017-03-01 10:40 GMT+08:00 Xingcan Cui <xingc...@gmail.com>:
> >>>>>>>
> >>>>>>>> Hi all,
> >>>>>>>>
> >>>>>>>> I have a question about the designate time for `rowtime`. The
> >>>>>>>> current design do this during the DataStream to Table conversion.
> >>>>>>>> Does this
> >>>>>> mean
> >>>>>>>> that `rowtime` is only valid for the source streams and can not be
> >>>>>>>> designated after a subquery? (That's why I considered using alias
> >>>>>>>> to dynamically designate it in a SQL before)
> >>>>>>>>
> >>>>>>>> Best,
> >>>>>>>> Xingcan
> >>>>>>>>
> >>>>>>>> On Wed, Mar 1, 2017 at 5:35 AM, Fabian Hueske <fhue...@gmail.com>
> >>>>>> wrote:
> >>>>>>>>> Hi Jincheng Sun,
> >>>>>>>>>
> >>>>>>>>> registering watermark functions for different attributes to allow
> >>>>>> each
> >>>>>>> of
> >>>>>>>>> them to be used in a window is an interesting idea.
> >>>>>>>>>
> >>>>>>>>> However, watermarks only work well if the streaming data is
> >>>>>>>>> (almost)
> >>>>>> in
> >>>>>>>>> timestamp order. Since it is not possible to sort a stream, all
> >>>>>>>> attributes
> >>>>>>>>> that would qualify as event-time attributes need to be in almost
> >>>>>>>>> the
> >>>>>>> same
> >>>>>>>>> order. I think this limits the benefits of having multiple
> >>>>>>>>> watermark functions quite significantly. But maybe you have a
> good
> >>>>>>>>> use case
> >>>>>> that
> >>>>>>>> you
> >>>>>>>>> can share where multiple event-time attributes would work well.
> >>>>>>>>>
> >>>>>>>>> So far our approach has been that a DataStream which is converted
> >>>>>> into
> >>>>>>> a
> >>>>>>>>> Table has already timestamps and watermarks assigned. We also
> >>>>>>>>> assumed
> >>>>>>>> that
> >>>>>>>>> a StreamTableSource would provide watermarks and timestamps and
> >>>>>>> indicate
> >>>>>>>>> the name of the attribute that carries the timestamp.
> >>>>>>>>>
> >>>>>>>>> @Stefano: That's great news. I'd suggest to open a pull request
> >>>>>>>>> and
> >>>>>>> have
> >>>>>>>> a
> >>>>>>>>> look at PR #3397 which handles the (partitioned) unbounded case.
> >>>>>> Would
> >>>>>>> be
> >>>>>>>>> good to share some code between these approaches.
> >>>>>>>>>
> >>>>>>>>> Thanks, Fabian
> >>>>>>>>>
> >>>>>>>>> 2017-02-28 18:17 GMT+01:00 Stefano Bortoli <
> >>>>>> stefano.bort...@huawei.com
> >>>>>>>> :
> >>>>>>>>>> Hi all,
> >>>>>>>>>>
> >>>>>>>>>> I have completed a first implementation that works for the SQL
> >>>>>> query
> >>>>>>>>>> SELECT a, SUM(b) OVER (PARTITION BY c ORDER BY a RANGE BETWEEN 2
> >>>>>>>>>> PRECEDING) AS sumB FROM MyTable
> >>>>>>>>>>
> >>>>>>>>>> I have SUM, MAX, MIN, AVG, COUNT implemented but I could test it
> >>>>>> just
> >>>>>>>> on
> >>>>>>>>>> simple queries such as the one above. Is there any specific case
> >>>>>>>>>> I
> >>>>>>>> should
> >>>>>>>>>> be looking at?
> >>>>>>>>>>
> >>>>>>>>>> Regards,
> >>>>>>>>>> Stefano
> >>>>>>>>>>
> >>>>>>>>>> -----Original Message-----
> >>>>>>>>>> From: jincheng sun [mailto:sunjincheng...@gmail.com]
> >>>>>>>>>> Sent: Tuesday, February 28, 2017 12:26 PM
> >>>>>>>>>> To: dev@flink.apache.org
> >>>>>>>>>> Subject: Re: [DISCUSS] Table API / SQL indicators for event and
> >>>>>>>>> processing
> >>>>>>>>>> time
> >>>>>>>>>>
> >>>>>>>>>> Hi everyone, thanks for sharing your thoughts. I really like
> >>>>>>>>>> Timo’s proposal, and I have a few thoughts want to share.
> >>>>>>>>>>
> >>>>>>>>>> We want to keep the query same for batch and streaming. IMO.
> >>>>>> “process
> >>>>>>>>> time”
> >>>>>>>>>> is something special to dataStream while it is not a well
> defined
> >>>>>>> term
> >>>>>>>>> for
> >>>>>>>>>> batch query. So it is kind of free to create something new for
> >>>>>>>>> processTime.
> >>>>>>>>>> I think it is a good idea to add a proctime as a reserved
> keyword
> >>>>>> for
> >>>>>>>>> SQL.
> >>>>>>>>>>       Regarding to “event time”, it is well defined for batch
> query.
> >>>>>>>>>> So
> >>>>>>> IMO,
> >>>>>>>>> we
> >>>>>>>>>> should keep the way of defining a streaming window exactly same
> >>>>>>>>>> as
> >>>>>>>> batch
> >>>>>>>>>> window. Therefore, the row for event time is nothing special,
> but
> >>>>>>> just
> >>>>>>>> a
> >>>>>>>>>> normal column. The major difference between batch and stream is
> >>>>>> that
> >>>>>>> in
> >>>>>>>>>> dataStream the event time column must be associated with a
> >>>>>> watermark
> >>>>>>>>>> function. I really like the way Timo proposed, that we can
> select
> >>>>>> any
> >>>>>>>>>> column as rowtime. But I think instead of just clarify a column
> >>>>>>>>>> is
> >>>>>> a
> >>>>>>>>>> rowtime (actually I do not think we need this special rowtime
> >>>>>>> keyword),
> >>>>>>>>> it
> >>>>>>>>>> is better to register/associate the waterMark function to this
> >>>>>> column
> >>>>>>>>> when
> >>>>>>>>>> creating the table. For dataStream, we will validate a rowtime
> >>>>>> column
> >>>>>>>>> only
> >>>>>>>>>> if it has been associated with the waterMark function. A
> >>>>>>>>>> prototype
> >>>>>>> code
> >>>>>>>>> to
> >>>>>>>>>> explain how it looks like is shown as below:
> >>>>>>>>>>
> >>>>>>>>>>        TableAPI:
> >>>>>>>>>>           toTable(tEnv, 'a, 'b, 'c)
> >>>>>>>>>>            .registeredWatermarks('a, waterMarkFunction1)
> >>>>>>>>>>
> >>>>>>>>>>           batchOrStreamTable
> >>>>>>>>>>            .window(Tumble over 5.milli on 'a as 'w)
> >>>>>>>>>>            .groupBy('w, 'b)
> >>>>>>>>>>            .select('b, 'a.count as cnt1, 'c.sum as cnt2)
> >>>>>>>>>>
> >>>>>>>>>>        SQL:
> >>>>>>>>>>          addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c)
> >>>>>>>>>>            .registeredWatermarks('a, waterMarkFunction1)
> >>>>>>>>>>
> >>>>>>>>>>          SELECT a, SUM(b) OVER (PARTITION BY c ORDER BY a RANGE
> >>>>>>>>>> BETWEEN
> >>>>>> 2
> >>>>>>>>>> PRECEDING) AS sumB FROM MyTable
> >>>>>>>>>>
> >>>>>>>>>> What do you think ?
> >>>>>>>>>>
> >>>>>>>>>> 2017-02-22 23:44 GMT+08:00 Timo Walther <twal...@apache.org>:
> >>>>>>>>>>
> >>>>>>>>>>> Hi everyone,
> >>>>>>>>>>>
> >>>>>>>>>>> I have create an issue [1] to track the progress of this topic.
> >>>>>>>>>>> I
> >>>>>>>> have
> >>>>>>>>>>> written a little design document [2] how we could implement the
> >>>>>>>>>>> indicators and which parts have to be touched. I would suggest
> >>>>>>>>>>> to implement a prototype, also to see what is possible and can
> >>>>>>>>>>> be integrated both in Flink and Calcite. Feedback is welcome.
> >>>>>>>>>>>
> >>>>>>>>>>> Regards,
> >>>>>>>>>>> Timo
> >>>>>>>>>>>
> >>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-5884
> >>>>>>>>>>> [2] https://docs.google.com/document/d/1JRXm09x_
> wKst6z6UXdCGF9tg
> >>>>>>>>>>> F1ueOAsFiQwahR72vbc/edit?usp=sharing
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Am 21/02/17 um 15:06 schrieb Fabian Hueske:
> >>>>>>>>>>>
> >>>>>>>>>>> Hi Xingcan,
> >>>>>>>>>>>> thanks for your thoughts.
> >>>>>>>>>>>> In principle you are right that the monotone attribute
> property
> >>>>>>>> would
> >>>>>>>>>>>> be sufficient, however there are more aspects to consider than
> >>>>>>> that.
> >>>>>>>>>>>> Flink is a parallel stream processor engine which means that
> >>>>>> data
> >>>>>>> is
> >>>>>>>>>>>> processed in separate processes and shuffle across them.
> >>>>>>>>>>>> Maintaining a strict order when merging parallel streams would
> >>>>>> be
> >>>>>>>>>>>> prohibitively expensive.
> >>>>>>>>>>>> Flink's watermark mechanism helps operators to deal with
> >>>>>>>> out-of-order
> >>>>>>>>>>>> data (due to out-of-order input or shuffles).
> >>>>>>>>>>>> I don't think we can separate the discussion about time
> >>>>>> attributes
> >>>>>>>>>>>> from watermarks if we want to use Flink as a processing engine
> >>>>>> and
> >>>>>>>>>>>> not reimplement large parts from scratch.
> >>>>>>>>>>>>
> >>>>>>>>>>>> When transforming a time attribute, we have to either align it
> >>>>>>> with
> >>>>>>>>>>>> existing watermarks or generate new watermarks.
> >>>>>>>>>>>> If we want to allow all kinds of monotone transformations, we
> >>>>>> have
> >>>>>>>> to
> >>>>>>>>>>>> adapt the watermarks which is not trivial.
> >>>>>>>>>>>> Instead, I think we should initially only allow very few
> >>>>>> monotone
> >>>>>>>>>>>> transformations which are aligned with the existing
> watermarks.
> >>>>>> We
> >>>>>>>>>>>> might later relax this condition if we see that users request
> >>>>>> this
> >>>>>>>>>> feature.
> >>>>>>>>>>>> You are right, that we need to track which attribute can be
> >>>>>>>>>>>> used
> >>>>>>> as
> >>>>>>>> a
> >>>>>>>>>>>> time attribute (i.e., is increasing and guarded by
> watermarks).
> >>>>>>>>>>>> For that we need to expose the time attribute when a Table is
> >>>>>>>> created
> >>>>>>>>>>>> (either when a DataStream is converted like:
> >>>>>> stream.toTable(tEnv,
> >>>>>>>> 'a,
> >>>>>>>>>>>> 'b,
> >>>>>>>>>>>> 't.rowtime) or in a StreamTableSource) and track how it is
> used
> >>>>>> in
> >>>>>>>>>>>> queries.
> >>>>>>>>>>>> I am not sure if the monotone property would be the right
> >>>>>>>>>>>> choice here, since data is only quasi-monotone and a monotone
> >>>>>> annotation
> >>>>>>>>>>>> might trigger some invalid optimizations which change the
> >>>>>>> semantics
> >>>>>>>> of
> >>>>>>>>>> a query.
> >>>>>>>>>>>> Right now, Calcite does not offer a quasi-monotone property
> (at
> >>>>>>>> least
> >>>>>>>>>>>> I haven't found it).
> >>>>>>>>>>>>
> >>>>>>>>>>>> Best, Fabian
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> 2017-02-21 4:41 GMT+01:00 Xingcan Cui <xingc...@gmail.com>:
> >>>>>>>>>>>>
> >>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>> As I said in another thread, the main difference between
> >>>>>>>>>>>>> stream
> >>>>>>> and
> >>>>>>>>>>>>> table is that a stream is an ordered list while a table is an
> >>>>>>>>>> unordered set.
> >>>>>>>>>>>>> Without considering the out-of-order problem in practice,
> >>>>>> whether
> >>>>>>>>>>>>> event-time or processing-time can be just taken as a
> >>>>>>> monotonically
> >>>>>>>>>>>>> increasing field and that's why the given query[1] would
> work.
> >>>>>> In
> >>>>>>>>>>>>> other words, we must guarantee the "SELECT MAX(t22.rowtime)
> >>>>>> ..."
> >>>>>>>>>>>>> subquery returns a single value that can be retrieved from
> the
> >>>>>>>>>>>>> cached dynamic table since it's dangerous to join two
> >>>>>> un-windowed
> >>>>>>>>>>>>> streams.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Under this circumstance, I just consider adding a "monotonic
> >>>>>>>>>>>>> hint"(INC or
> >>>>>>>>>>>>> DEC) to the field of a (generalized) table (maybe using an
> >>>>>>>>>>>>> annotation on the registerDataXX method) that can be used to
> >>>>>>>>>>>>> indicate whether a field is monotonically increasing or
> >>>>>>> decreasing.
> >>>>>>>>>>>>> Then by taking rowtime as common (monotonically increasing)
> >>>>>>> field,
> >>>>>>>>>>>>> there are several benefits:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 1) This can uniform the table and stream by importing total
> >>>>>>>> ordering
> >>>>>>>>>>>>> relation to an unordered set.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 2) These fields can be modified arbitrarily as long as they
> >>>>>> keep
> >>>>>>>> the
> >>>>>>>>>>>>> declared monotonic feature and the watermark problem does not
> >>>>>>> exist
> >>>>>>>>>>>>> any more.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 3) The monotonic hint will be useful in the query
> optimization
> >>>>>>>>> process.
> >>>>>>>>>>>>> What do you think?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Best,
> >>>>>>>>>>>>> Xingcan
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> [1]
> >>>>>>>>>>>>> SELECT​ ​t1.amount​,​ ​t2.rate FROM​ ​
> >>>>>>>>>>>>>         table1 ​AS​ t1,
> >>>>>>>>>>>>> ​ ​ table2 ​AS​ ​t2
> >>>>>>>>>>>>> WHERE ​
> >>>>>>>>>>>>>         t1.currency = t2.currency AND
> >>>>>>>>>>>>>         t2.rowtime ​=​ ​(
> >>>>>>>>>>>>> ​ ​​ ​  SELECT​ ​MAX(t22.rowtime) ​ ​​ ​  FROM​ ​table2 ​AS​
> >>>>>>>>>>>>> t22
> >>>>>>>>>>>>> ​ ​​   ​AND​ ​t22.rowtime ​<=​ t1.rowtime)
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Tue, Feb 21, 2017 at 2:52 AM, Fabian Hueske <
> >>>>>>> fhue...@gmail.com>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Hi everybody,
> >>>>>>>>>>>>>> When Timo wrote to the Calcite mailing list, Julian Hyde
> >>>>>> replied
> >>>>>>>>>>>>>> and gave good advice and explained why a system attribute
> for
> >>>>>>>>>>>>>> event-time would be
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>> a
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> problem [1].
> >>>>>>>>>>>>>> I thought about this and agree with Julian.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Here is a document to describe the problem, constraints in
> >>>>>> Flink
> >>>>>>>>>>>>>> and a proposal how to handle processing time and event time
> >>>>>>>>>>>>>> in Table API and
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>> SQL:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> ->
> >>>>>>>>>>>>>> https://docs.google.com/document/d/1MDGViWA_
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>> TCqpaVoWub7u_GY4PMFSbT8TuaNl-
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> EpbTHQ
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Please have a look, comment and ask questions.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thank you,
> >>>>>>>>>>>>>> Fabian
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> [1]
> >>>>>>>>>>>>>> https://lists.apache.org/thread.html/
> >>>>>>>> 6397caf0ca37f97f2cd27d96f7a12c
> >>>>>>>>>>>>>> 6fa845d6fd0870214fdce18d96@%3Cdev.calcite.apache.org%3E
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 2017-02-16 1:18 GMT+01:00 Fabian Hueske <fhue...@gmail.com
> >:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks everybody for the comments.
> >>>>>>>>>>>>>>> Actually, I think we do not have much choice when deciding
> >>>>>>>> whether
> >>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>> use
> >>>>>>>>>>>>>> attributes or functions.
> >>>>>>>>>>>>>>> Consider the following join query:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> SELECT​ ​t1.amount​,​ ​t2.rate FROM​ ​
> >>>>>>>>>>>>>>>         table1 ​AS​ t1,
> >>>>>>>>>>>>>>> ​ ​ table2 ​AS​ ​t2
> >>>>>>>>>>>>>>> WHERE ​
> >>>>>>>>>>>>>>>         t1.currency = t2.currency AND
> >>>>>>>>>>>>>>>         t2.rowtime ​=​ ​(
> >>>>>>>>>>>>>>> ​ ​​ ​  SELECT​ ​MAX(t22.rowtime) ​ ​​ ​  FROM​ ​table2
> ​AS​
> >>>>>>>>>>>>>>> t22
> >>>>>>>>>>>>>>> ​ ​​   ​AND​ ​t22.rowtime ​<=​ t1.rowtime)
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> The query joins two streaming tables. Table 1 is a
> streaming
> >>>>>>>> table
> >>>>>>>>>>>>>>> with amounts in a certain currency. Table 2 is a (slowly
> >>>>>>>> changing)
> >>>>>>>>>>>>>>> streaming table of currency exchange rates.
> >>>>>>>>>>>>>>> We want to join the amounts stream with the exchange rate
> of
> >>>>>>> the
> >>>>>>>>>>>>>>> corresponding currency that is valid (i.e., last received
> >>>>>> value
> >>>>>>>> ->
> >>>>>>>>>>>>>>> MAX(rowtime)) at the rowtime of the amounts row.
> >>>>>>>>>>>>>>> In order to specify the query, we need to refer to the
> >>>>>> rowtime
> >>>>>>> of
> >>>>>>>>>>>>>>> the different tables. Hence, we need a way to relate the
> >>>>>>> rowtime
> >>>>>>>>>>>>>>> expression
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>> (or
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> marker) to a table.
> >>>>>>>>>>>>>>> This is not possible with a parameterless scalar function.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I'd like to comment on the concerns regarding the
> >>>>>> performance:
> >>>>>>>>>>>>>>> In fact, the columns could be completely virtual and only
> >>>>>> exist
> >>>>>>>>>>>>>>> during query parsing and validation.
> >>>>>>>>>>>>>>> During execution, we can directly access the rowtime
> >>>>>>>>>>>>>>> metadata
> >>>>>>> of
> >>>>>>>> a
> >>>>>>>>>>>>>> Flink
> >>>>>>>>>>>>>> streaming record (which is present anyway) or look up the
> >>>>>>> current
> >>>>>>>>>>>>>>> processing time from the machine clock. So the processing
> >>>>>>>> overhead
> >>>>>>>>>>>>>> would
> >>>>>>>>>>>>>> actually be the same as with a marker function.
> >>>>>>>>>>>>>>> Regarding the question on what should be allowed with a
> >>>>>> system
> >>>>>>>>>>>>>> attribute:
> >>>>>>>>>>>>>> IMO, it could be used as any other attribute. We need it at
> >>>>>>> least
> >>>>>>>>>>>>>> in
> >>>>>>>>>>>>>> GROUP
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> BY, ORDER BY, and WHERE to define windows and joins. We
> >>>>>>>>>>>>>>> could
> >>>>>>>> also
> >>>>>>>>>>>>>> allow
> >>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> access it in SELECT if we want users to give access to
> >>>>>> rowtime
> >>>>>>>> and
> >>>>>>>>>>>>>>> processing time. So @Haohui, your query could be supported.
> >>>>>>>>>>>>>>> However, what would not be allowed is to modify the value
> of
> >>>>>>> the
> >>>>>>>>>>>>>>> rows, i.e., by naming another column rowtime, i.e., "SELECT
> >>>>>>>>>>>>>>> sometimestamp AS rowtime" would not be allowed, because
> >>>>>>>>>>>>>>> Flink
> >>>>>>>> does
> >>>>>>>>>>>>>>> not support to modify
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> event time of a row (for good reasons) and processing time
> >>>>>>> should
> >>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>> be
> >>>>>>>>>>>>>> modifiable anyway.
> >>>>>>>>>>>>>>> @Timo:
> >>>>>>>>>>>>>>> I think the approach to only use the system columns during
> >>>>>>>> parsing
> >>>>>>>>>>>>>>> and validation and converting them to expressions
> afterwards
> >>>>>>>> makes
> >>>>>>>>>>>>>>> a lot of sense.
> >>>>>>>>>>>>>>> The question is how this approach could be nicely
> integrated
> >>>>>>> with
> >>>>>>>>>>>>>> Calcite.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Best, Fabian
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 2017-02-15 16:50 GMT+01:00 Radu Tudoran <
> >>>>>>> radu.tudo...@huawei.com
> >>>>>>>>> :
> >>>>>>>>>>>>>>> Hi,
> >>>>>>>>>>>>>>>> My initial thought would be that it makes more sense to
> >>>>>> thave
> >>>>>>>>>>>>>>> procTime()
> >>>>>>>>>>>>>> and rowTime() only as functions which in fact are to be used
> >>>>>> as
> >>>>>>>>>>>>>>> markers.
> >>>>>>>>>>>>>> Having the value (even from special system attributes does
> >>>>>>>>>>>>>> not
> >>>>>>>> make
> >>>>>>>>>>>>>>> sense
> >>>>>>>>>>>>>>> in some scenario such as the ones for creating windows,
> >>>>>>>>>>>>>>> e.g.,
> >>>>>>>>>>>>>>>> If you have SELECT Count(*) OVER (ORDER BY procTime()...)
> >>>>>>>>>>>>>>>> If
> >>>>>>> you
> >>>>>>>>>>>>>>>> get the value of procTime you cannot do anything as you
> >>>>>>>>>>>>>>>> need
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>> marker to know how to construct the window logic.
> >>>>>>>>>>>>>>>> However, your final idea of having " implement some
> >>>>>> rule/logic
> >>>>>>>>>>>>>>>> that translates the attributes to special RexNodes
> >>>>>> internally
> >>>>>>> "
> >>>>>>>> I
> >>>>>>>>>>>>>>>> believe
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>> good and gives a solution to both problems. One the one hand
> >>>>>> for
> >>>>>>>>>>>>>> those
> >>>>>>>>>>>>>>>> scenarios where you need the value you can access the
> >>>>>>>>>>>>>>>> value, while for others you can see the special type of
> the
> >>>>>>>>>>>>>>>> RexNode
> >>>>>>> and
> >>>>>>>>>>>>>>>> use it as a
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> marker.
> >>>>>>>>>>>>>>> Regarding keeping this data in a table...i am not sure as
> >>>>>>>>>>>>>>> you would say
> >>>>>>>>>>>>>> we  need to augment the data with two fields whether needed
> >>>>>>>>>>>>>> or
> >>>>>>>>>>>>>>> not...this
> >>>>>>>>>>>>>>> is nto necessary very efficient
> >>>>>>>>>>>>>>>> Dr. Radu Tudoran
> >>>>>>>>>>>>>>>> Senior Research Engineer - Big Data Expert IT R&D Division
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research
> >>>>>>>>>>>>>>>> Center Riesstrasse 25, 80992 München
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> E-mail: radu.tudo...@huawei.com
> >>>>>>>>>>>>>>>> Mobile: +49 15209084330
> >>>>>>>>>>>>>>>> Telephone: +49 891588344173
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549
> >>>>>>>>>>>>>>>> Düsseldorf, Germany, www.huawei.com Registered Office:
> >>>>>>>>>>>>>>>> Düsseldorf, Register Court Düsseldorf,
> >>>>>> HRB
> >>>>>>>>> 56063,
> >>>>>>>>>>>>>>>> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz
> >>>>>>>>>>>>>>>> der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf,
> >>>>>> HRB
> >>>>>>>>>> 56063,
> >>>>>>>>>>>>>>>> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN This
> >>>>>>>>>>>>>>>> e-mail and its attachments contain confidential
> >>>>>>> information
> >>>>>>>>>> from
> >>>>>>>>>>>>>>>> HUAWEI, which is intended only for the person or entity
> >>>>>> whose
> >>>>>>>>>> address
> >>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>> listed above. Any use of the information contained herein in
> >>>>>> any
> >>>>>>>> way
> >>>>>>>>>>>>>>>> (including, but not limited to, total or partial
> >>>>>>>>>>>>>>>> disclosure,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> reproduction,
> >>>>>>>>>>>>>>> or dissemination) by persons other than the intended
> >>>>>>> recipient(s)
> >>>>>>>>> is
> >>>>>>>>>>>>>>>> prohibited. If you receive this e-mail in error, please
> >>>>>> notify
> >>>>>>>> the
> >>>>>>>>>>>>>>> sender
> >>>>>>>>>>>>>>> by phone or email immediately and delete it!
> >>>>>>>>>>>>>>>> -----Original Message-----
> >>>>>>>>>>>>>>>> From: Timo Walther [mailto:twal...@apache.org]
> >>>>>>>>>>>>>>>> Sent: Wednesday, February 15, 2017 9:33 AM
> >>>>>>>>>>>>>>>> To: dev@flink.apache.org
> >>>>>>>>>>>>>>>> Subject: Re: [DISCUSS] Table API / SQL indicators for
> event
> >>>>>>> and
> >>>>>>>>>>>>>>>> processing time
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> at first I also thought that built-in functions (rowtime()
> >>>>>> and
> >>>>>>>>>>>>>>>> proctime()) are the easiest solution. However, I think to
> >>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> future-proof
> >>>>>>>>>>>>>>> we should make them system attributes; esp. to relate them
> >>>>>> to a
> >>>>>>>>>>>>>>>> corresponding table in case of multiple tables. Logically
> >>>>>> they
> >>>>>>>> are
> >>>>>>>>>>>>>>>> attributes of each row, which is already done in Table
> API.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I will ask on the Calcite ML if there is a good way for
> >>>>>>>>> integrating
> >>>>>>>>>>>>>>>> system attributes. Right now, I would propose the
> following
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> implementation:
> >>>>>>>>>>>>>>> - we introduce a custom row type (extending RelDataType)
> >>>>>>>>>>>>>>>> - in a streaming environment every row has two attributes
> >>>>>>>>>>>>>>>> by
> >>>>>>>>> default
> >>>>>>>>>>>>>>>> (rowtime and proctime)
> >>>>>>>>>>>>>>>> - we do not allow creating a row type with those
> attributes
> >>>>>>>> (this
> >>>>>>>>>>>>>>> should
> >>>>>>>>>>>>>> already prevent `SELECT field AS rowtime FROM ...`)
> >>>>>>>>>>>>>>>> - we need to ensure that these attributes are not part of
> >>>>>>>>> expansion
> >>>>>>>>>>>>>>> like
> >>>>>>>>>>>>>> `SELECT * FROM ...`
> >>>>>>>>>>>>>>>> - implement some rule/logic that translates the attributes
> >>>>>> to
> >>>>>>>>>> special
> >>>>>>>>>>>>>>>> RexNodes internally, such that the opimizer does not
> modify
> >>>>>>>> these
> >>>>>>>>>>>>>>> attributes
> >>>>>>>>>>>>>>> What do you think?
> >>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>> Timo
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Am 15/02/17 um 03:36 schrieb Xingcan Cui:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> thanks for this thread.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> @Fabian If I didn't miss the point, the main difference
> >>>>>>> between
> >>>>>>>>> the
> >>>>>>>>>>>>>>>>> two approaches is whether or not taking these time
> >>>>>> attributes
> >>>>>>>> as
> >>>>>>>>>>>>>>>>> common table fields that are directly available to users.
> >>>>>>>>> Whatever,
> >>>>>>>>>>>>>>>>> these time attributes should be attached to records
> >>>>>> (right?),
> >>>>>>>> and
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>> discussion lies in whether give them public qualifiers like
> >>>>>>> other
> >>>>>>>>>>>>>>>>> common fields or private qualifiers and related get/set
> >>>>>>>> methods.
> >>>>>>>>>>>>>>>>> The former (system attributes) approach will be more
> >>>>>>> compatible
> >>>>>>>>>> with
> >>>>>>>>>>>>>>>>> existing SQL read-only operations (e.g., select, join),
> >>>>>>>>>>>>>>>>> but
> >>>>>>> we
> >>>>>>>>> need
> >>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>> add restrictions on SQL modification operation (like what?).
> >>>>>>>>>>>>>> I
> >>>>>>>> think
> >>>>>>>>>>>>>>>>> there are no needs to forbid users modifying these
> >>>>>> attributes
> >>>>>>>> via
> >>>>>>>>>>>>>>>>> table APIs (like map function). Just inform them about
> >>>>>> these
> >>>>>>>>>> special
> >>>>>>>>>>>>>>>>> attribute names like system built in aggregator names in
> >>>>>>>>> iteration.
> >>>>>>>>>>>>>>>>> As for the built in function approach, I don't know if,
> >>>>>>>>>>>>>>>>> for
> >>>>>>>> now,
> >>>>>>>>>>>>>>>> there
> >>>>>>>>>>>>>> are functions applied on a single row (maybe the value
> access
> >>>>>>>>>>>>>>>>> functions like COMPOSITE.get(STRING)?). It seems that
> most
> >>>>>> of
> >>>>>>>> the
> >>>>>>>>>>>>>>>>> built in functions work for a single field or on columns
> >>>>>> and
> >>>>>>>> thus
> >>>>>>>>>> it
> >>>>>>>>>>>>>>>>> will be mountains of work if we want to add a new kind of
> >>>>>>>>> function
> >>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>> SQL. Maybe all existing operations should be modified to
> >>>>>> support
> >>>>>>>> it.
> >>>>>>>>>>>>>>>>> All in all, if there are existing supports for single row
> >>>>>>>>> function,
> >>>>>>>>>>>>>>>> I
> >>>>>>>>>>>>>> prefer the built in function approach. Otherwise the system
> >>>>>>>>>>>>>>>> attributes
> >>>>>>>>>>>>>> approach should be better. After all there are not so much
> >>>>>>>>>>>>>>>>> modification operations in SQL and maybe we can use alias
> >>>>>> to
> >>>>>>>>>> support
> >>>>>>>>>>>>>>>>> time attributes setting (just hypothesis, not sure if
> it's
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> feasible).
> >>>>>>>>>>>>>> @Haohui I think the given query is valid if we add a
> >>>>>>>>>>>>>> aggregate
> >>>>>>>>>>>>>>>>> function to (PROCTIME()
> >>>>>>>>>>>>>>>>> - ROWTIME()) / 1000 and it should be executed
> efficiently.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>> Xingcan
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On Wed, Feb 15, 2017 at 6:17 AM, Haohui Mai <
> >>>>>>>> ricet...@gmail.com>
> >>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>> Hi,
> >>>>>>>>>>>>>>>>>> Thanks for starting the discussion. I can see there are
> >>>>>>>> multiple
> >>>>>>>>>>>>>>>>>> trade-offs in these two approaches. One question I have
> >>>>>>>>>>>>>>>>>> is
> >>>>>>>> that
> >>>>>>>>> to
> >>>>>>>>>>>>>>>>>> which extent Flink wants to open its APIs to allow users
> >>>>>> to
> >>>>>>>>> access
> >>>>>>>>>>>>>>>>>> both processing and event time.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Before we talk about joins, my understanding for the two
> >>>>>>>>>> approaches
> >>>>>>>>>>>>>>>>>> that you mentioned are essentially (1) treating the
> value
> >>>>>> of
> >>>>>>>>> event
> >>>>>>>>>>>>>>>>> /
> >>>>>>>>>>>>>> processing time as first-class fields for each row, (2)
> >>>>>> limiting
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>> scope of time indicators to only specifying windows. Take
> the
> >>>>>>>>>>>>>>>>>> following query as an
> >>>>>>>>>>>>>>>>>> example:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> SELECT (PROCTIME() - ROWTIME()) / 1000 AS latency FROM
> >>>>>> table
> >>>>>>>>> GROUP
> >>>>>>>>>>>>>>>>> BY
> >>>>>>>>>>>>>> FLOOR(PROCTIME() TO MINUTES)
> >>>>>>>>>>>>>>>>>> There are several questions we can ask:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> (1) Is it a valid query?
> >>>>>>>>>>>>>>>>>> (2) How efficient the query will be?
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> For this query I can see arguments from both sides. I
> >>>>>> think
> >>>>>>> at
> >>>>>>>>> the
> >>>>>>>>>>>>>>>>>> end of the day it really comes down to what Flink wants
> >>>>>>>>>>>>>>>>>> to
> >>>>>>>>>> support.
> >>>>>>>>>>>>>>>>>> After working on FLINK-5624 I'm more inclined to support
> >>>>>> the
> >>>>>>>>>> second
> >>>>>>>>>>>>>>>>>> approach (i.e., built-in functions). The main reason why
> >>>>>> is
> >>>>>>>> that
> >>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>> APIs of Flink are designed to separate times from the real
> >>>>>>>>>>>>>>>>> payloads.
> >>>>>>>>>>>>>> It probably makes sense for the Table / SQL APIs to have the
> >>>>>>> same
> >>>>>>>>>>>>>>>>> designs.
> >>>>>>>>>>>>>>>>> For joins I don't have a clear answer on top of my head.
> >>>>>>> Flink
> >>>>>>>>>>>>>>>>>> requires two streams to be put in the same window before
> >>>>>>> doing
> >>>>>>>>> the
> >>>>>>>>>>>>>>>>>> joins. This is essentially a subset of what SQL can
> >>>>>>> express. I
> >>>>>>>>>>>>>>>>> don't
> >>>>>>>>>>>>>> know what would be the best approach here.
> >>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>> Haohui
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> On Tue, Feb 14, 2017 at 12:26 AM Fabian Hueske <
> >>>>>>>>> fhue...@gmail.com
> >>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>> Hi,
> >>>>>>>>>>>>>>>>>>> It would as in the query I gave as an example before:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> SELECT
> >>>>>>>>>>>>>>>>>>>          a,
> >>>>>>>>>>>>>>>>>>>          SUM(b) OVER (PARTITION BY c ORDER BY proctime
> ROWS
> >>>>>>>> BETWEEN
> >>>>>>>>> 2
> >>>>>>>>>>>>>>>>>>> PRECEDING AND CURRENT ROW) AS sumB, FROM myStream
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Here "proctime" would be a system attribute of the
> table
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> "myStream".
> >>>>>>>>>>>>>> The table would also have another system attribute called
> >>>>>>>>>>>>>>>>>> "rowtime"
> >>>>>>>>>>>>>> which would be used to indicate event time semantics.
> >>>>>>>>>>>>>>>>>>> These attributes would always be present in tables
> which
> >>>>>>> are
> >>>>>>>>>>>>>>>>>> derived
> >>>>>>>>>>>>>> from streams.
> >>>>>>>>>>>>>>>>>>> Because we still require that streams have timestamps
> >>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> watermarks
> >>>>>>>>>>>>>> assigned (either by the StreamTableSource or the somewhere
> >>>>>>>>>>>>>>>>>>> downstream the DataStream program) when they are
> >>>>>> converted
> >>>>>>>>> into a
> >>>>>>>>>>>>>>>>>>> table, there is no
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> need
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> to register anything.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Does that answer your questions?
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Best, Fabian
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 2017-02-14 2:04 GMT+01:00 Radu Tudoran <
> >>>>>>>>> radu.tudo...@huawei.com
> >>>>>>>>>>> :
> >>>>>>>>>>>>>>>>>>> Hi Fabian,
> >>>>>>>>>>>>>>>>>>>> Thanks for starting the discussion. Before I give my
> >>>>>>>> thoughts
> >>>>>>>>> on
> >>>>>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>>>>> you please give some examples of how would you see
> >>>>>>>>>>>>>>>>>>> option
> >>>>>>> of
> >>>>>>>>>>>>>>>>>>> using
> >>>>>>>>>>>>>> "system
> >>>>>>>>>>>>>>>>>>>> attributes"?
> >>>>>>>>>>>>>>>>>>>> Do you use this when you register the stream as a
> >>>>>>>>>>>>>>>>>>>> table,
> >>>>>>> do
> >>>>>>>>> you
> >>>>>>>>>>>>>>>>>>> use
> >>>>>>>>>>>>>> if when you call an SQL query, do you use it when you
> >>>>>> translate
> >>>>>>>>>>>>>>>>>>>> back a
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> table
> >>>>>>>>>>>>>>>>>>> to a stream / write it to a dynamic table?
> >>>>>>>>>>>>>>>>>>>> Dr. Radu Tudoran
> >>>>>>>>>>>>>>>>>>>> Senior Research Engineer - Big Data Expert IT R&D
> >>>>>> Division
> >>>>>>>>>>>>>>>>>>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research
> >>>>>>>>>>>>>>>>>>>> Center Riesstrasse 25, 80992 München
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> E-mail: radu.tudo...@huawei.com
> >>>>>>>>>>>>>>>>>>>> Mobile: +49 15209084330 <+49%201520%209084330>
> >>>>>>>>>>>>>>>>>>>> Telephone: +49 891588344173 <+49%2089%201588344173>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205,
> >>>>>>>>>>>>>>>>>>>> 40549 Düsseldorf, Germany,
> >>>>>> www.huawei.com
> >>>>>>>>>>>>>>>>>>>> Registered Office: Düsseldorf, Register Court
> >>>>>> Düsseldorf,
> >>>>>>>> HRB
> >>>>>>>>>>>>>>>>>>> 56063,
> >>>>>>>>>>>>>>> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
> >>>>>>>>>>>>>>>>>>>> Sitz der Gesellschaft: Düsseldorf, Amtsgericht
> >>>>>> Düsseldorf,
> >>>>>>>> HRB
> >>>>>>>>>>>>>>>>>>> 56063,
> >>>>>>>>>>>>>>> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
> >>>>>>>>>>>>>>>>>>>> This e-mail and its attachments contain confidential
> >>>>>>>>> information
> >>>>>>>>>>>>>>>>>>> from
> >>>>>>>>>>>>>>> HUAWEI, which is intended only for the person or entity
> >>>>>>>>>>>>>>> whose
> >>>>>>>>>>>>>>>>>>> address
> >>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>> listed above. Any use of the information contained
> >>>>>>>>>>>>>>>>>>> herein
> >>>>>>> in
> >>>>>>>>> any
> >>>>>>>>>>>>>>>>>>> way
> >>>>>>>>>>>>>>> (including, but not limited to, total or partial
> disclosure,
> >>>>>>>>>>>>>>>>>>> reproduction,
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> or dissemination) by persons other than the intended
> >>>>>>>>>> recipient(s)
> >>>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>> prohibited. If you receive this e-mail in error, please
> >>>>>> notify
> >>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>> sender
> >>>>>>>>>>>>>>>>>>> by phone or email immediately and delete it!
> >>>>>>>>>>>>>>>>>>>> -----Original Message-----
> >>>>>>>>>>>>>>>>>>>> From: Fabian Hueske [mailto:fhue...@gmail.com]
> >>>>>>>>>>>>>>>>>>>> Sent: Tuesday, February 14, 2017 1:01 AM
> >>>>>>>>>>>>>>>>>>>> To: dev@flink.apache.org
> >>>>>>>>>>>>>>>>>>>> Subject: [DISCUSS] Table API / SQL indicators for
> event
> >>>>>>> and
> >>>>>>>>>>>>>>>>>>> processing
> >>>>>>>>>>>>>>>>> time
> >>>>>>>>>>>>>>>>>>>> Hi,
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> I'd like to start an discussion about how Table API /
> >>>>>> SQL
> >>>>>>>>>> queries
> >>>>>>>>>>>>>>>>>>> indicate
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> whether an operation is done in event or processing
> >>>>>> time.
> >>>>>>>>>>>>>>>>>>>> 1) Why do we need to indicate the time mode?
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> We need to distinguish event time and processing time
> >>>>>> mode
> >>>>>>>> for
> >>>>>>>>>>>>>>>>>>> operations
> >>>>>>>>>>>>>>>>>>> in queries in order to have the semantics of a query
> >>>>>> fully
> >>>>>>>>>>>>>>>>>>> defined.
> >>>>>>>>>>>>>> This cannot be globally done in the TableEnvironment because
> >>>>>>> some
> >>>>>>>>>>>>>>>>>>> queries
> >>>>>>>>>>>>>>>>>>> explicitly request an expression such as the ORDER BY
> >>>>>>> clause
> >>>>>>>> of
> >>>>>>>>>>>>>>>>>>> an
> >>>>>>>>>>>>>> OVER
> >>>>>>>>>>>>>>>>> window with PRECEDING / FOLLOWING clauses.
> >>>>>>>>>>>>>>>>>>>> So we need a way to specify something like the
> >>>>>>>>>>>>>>>>>>>> following
> >>>>>>>>> query:
> >>>>>>>>>>>>>>>>>>>> SELECT
> >>>>>>>>>>>>>>>>>>>>          a,
> >>>>>>>>>>>>>>>>>>>>          SUM(b) OVER (PARTITION BY c ORDER BY
> proctime ROWS
> >>>>>>>>> BETWEEN 2
> >>>>>>>>>>>>>>>>>>> PRECEDING
> >>>>>>>>>>>>>>>>>>> AND CURRENT ROW) AS sumB, FROM myStream
> >>>>>>>>>>>>>>>>>>>> where "proctime" indicates processing time.
> >>>>>>>>>>>>>>>>>>>> Equivalently
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> "rowtime"
> >>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>>>> indicate event time.
> >>>>>>>>>>>>>>>>>>>> 2) Current state
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> The current master branch implements time support only
> >>>>>> for
> >>>>>>>>>>>>>>>>>>> grouping
> >>>>>>>>>>>>>> windows in the Table API.
> >>>>>>>>>>>>>>>>>>>> Internally, the Table API converts a 'rowtime symbol
> >>>>>>> (which
> >>>>>>>>>> looks
> >>>>>>>>>>>>>>>>>>> like
> >>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>> regular attribute) into a special expression which
> >>>>>>> indicates
> >>>>>>>>>>>>>>>>>>> event-time.
> >>>>>>>>>>>>>>>>>>> For example:
> >>>>>>>>>>>>>>>>>>>> table
> >>>>>>>>>>>>>>>>>>>>          .window(Tumble over 5.milli on 'rowtime as
> 'w)
> >>>>>>>>>>>>>>>>>>>>          .groupBy('a, 'w)
> >>>>>>>>>>>>>>>>>>>>          .select(...)
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> defines a tumbling event-time window.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Processing-time is indicated by omitting a time
> >>>>>> attribute
> >>>>>>>>>>>>>>>>>>>> (table.window(Tumble over 5.milli as 'w) ).
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> 3) How can we do that in SQL?
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> In SQL we cannot add special expressions without
> >>>>>> touching
> >>>>>>>> the
> >>>>>>>>>>>>>>>>>>> parser
> >>>>>>>>>>>>>>> which
> >>>>>>>>>>>>>>>>>>>> we don't want to do because we want to stick to the
> SQL
> >>>>>>>>>> standard.
> >>>>>>>>>>>>>>>>>>>> Therefore, I see only two options: adding system
> >>>>>>> attributes
> >>>>>>>> or
> >>>>>>>>>>>>>>>>>>>> (parameterless) built-in functions. I list some pros
> >>>>>>>>>>>>>>>>>>>> and
> >>>>>>>> cons
> >>>>>>>>> of
> >>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>> approaches below:
> >>>>>>>>>>>>>>>>>>>> 1. System Attributes:
> >>>>>>>>>>>>>>>>>>>> + most natural way to access a property of a record.
> >>>>>>>>>>>>>>>>>>>> + works with joins, because time attributes can be
> >>>>>> related
> >>>>>>>> to
> >>>>>>>>>>>>>>>>>>> tables
> >>>>>>>>>>>>>>> - We need to ensure the attributes are not writable and
> >>>>>> always
> >>>>>>>>>>>>>>>>>>> present
> >>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>> streaming tables (i.e., they should be system defined
> >>>>>>>>>>>>>>>>>>> attributes).
> >>>>>>>>>>>>>> - Need to adapt existing Table API expressions (will not
> >>>>>> change
> >>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>> API
> >>>>>>>>>>>>>>>>> but some parts of the internal translation)
> >>>>>>>>>>>>>>>>>>>> - Event time value must be set when the stream is
> >>>>>>> converted,
> >>>>>>>>>>>>>>>>>>> processing
> >>>>>>>>>>>>>>>>> time is evaluated on the fly
> >>>>>>>>>>>>>>>>>>>> 2. Built-in Functions
> >>>>>>>>>>>>>>>>>>>> + Users could try to modify time attributes which is
> >>>>>>>>>>>>>>>>>>>> + not
> >>>>>>>>>> possible
> >>>>>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>> functions
> >>>>>>>>>>>>>>>>>>>> - do not work with joins, because we need to address
> >>>>>>>> different
> >>>>>>>>>>>>>>>>>>> relations
> >>>>>>>>>>>>>>>>>>> - not a natural way to access a property of a record
> >>>>>>>>>>>>>>>>>>>> I think the only viable choice are system attributes,
> >>>>>>>> because
> >>>>>>>>>>>>>>>>>>> built-in
> >>>>>>>>>>>>>>>>> functions cannot be used for joins.
> >>>>>>>>>>>>>>>>>>>> However, system attributes are the more complex
> >>>>>>>>>>>>>>>>>>>> solution
> >>>>>>>>> because
> >>>>>>>>>>>>>>>>>>> they
> >>>>>>>>>>>>>>> need
> >>>>>>>>>>>>>>>>>>>> a better integration with Calcite's SQL validator
> >>>>>>>> (preventing
> >>>>>>>>>>>>>>>>>>> user
> >>>>>>>>>>>>>> attributes which are named rowtime for instance).
> >>>>>>>>>>>>>>>>>>>> Since there are currently a several contributions on
> >>>>>>>>>>>>>>>>>>>> the
> >>>>>>> way
> >>>>>>>>>>>>>>>>>>> (such
> >>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> SQL
> >>>>>>>>>>>>>>>>>>> OVER windows FLINK-5653 to FLINK-5658) that need time
> >>>>>>>>> indicators,
> >>>>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>> need a
> >>>>>>>>>>>>>>>>>>>> solution soon to be able to make progress.
> >>>>>>>>>>>>>>>>>>>> There are two PRs, #3252 and #3271, which implement
> the
> >>>>>>>>> built-in
> >>>>>>>>>>>>>>>>>>> marker
> >>>>>>>>>>>>>>>>> functions proctime() and rowtime() and which could serve
> >>>>>> as a
> >>>>>>>>>>>>>>>>>>> temporary
> >>>>>>>>>>>>>>>>> solution (since we do not work on joins yet).
> >>>>>>>>>>>>>>>>>>>> I would like to suggest to use these functions as a
> >>>>>>> starting
> >>>>>>>>>>>>>>>>>>> point
> >>>>>>>>>>>>>> (once
> >>>>>>>>>>>>>>>>>>> the PRs are merged) and later change to the system
> >>>>>>> attribute
> >>>>>>>>>>>>>>>>>>> solution
> >>>>>>>>>>>>>>> which
> >>>>>>>>>>>>>>>>>>>> needs a bit more time to be implemented.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> I talked with Timo today about this issue and he said
> >>>>>>>>>>>>>>>>>>>> he
> >>>>>>>> would
> >>>>>>>>>>>>>>>>>>> like
> >>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>> investigate how we can implement this as system functions
> >>>>>>>>>>>>>>>>>>> properly
> >>>>>>>>>>>>>> integrated with Calcite and the SQL Validator.
> >>>>>>>>>>>>>>>>>>>> What do others think?
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Best, Fabian
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
>
>

Reply via email to