Hi everybody,

When Timo wrote to the Calcite mailing list, Julian Hyde replied and gave
good advice and explained why a system attribute for event-time would be a
problem [1].
I thought about this and agree with Julian.

Here is a document to describe the problem, constraints in Flink and a
proposal how to handle processing time and event time in Table API and SQL:

->
https://docs.google.com/document/d/1MDGViWA_TCqpaVoWub7u_GY4PMFSbT8TuaNl-EpbTHQ

Please have a look, comment and ask questions.

Thank you,
Fabian

[1]
https://lists.apache.org/thread.html/6397caf0ca37f97f2cd27d96f7a12c6fa845d6fd0870214fdce18d96@%3Cdev.calcite.apache.org%3E

2017-02-16 1:18 GMT+01:00 Fabian Hueske <fhue...@gmail.com>:

> Thanks everybody for the comments.
>
> Actually, I think we do not have much choice when deciding whether to use
> attributes or functions.
> Consider the following join query:
>
> SELECT​ ​t1.amount​,​ ​t2.rate
> FROM​ ​
>   table1 ​AS​ t1,
> ​ ​ table2 ​AS​ ​t2
> WHERE ​
>   t1.currency = t2.currency AND
>   t2.rowtime ​=​ ​(
> ​ ​​ ​  SELECT​ ​MAX(t22.rowtime)
> ​ ​​ ​  FROM​ ​table2 ​AS​ t22
> ​ ​​   ​AND​ ​t22.rowtime ​<=​ t1.rowtime)
>
> The query joins two streaming tables. Table 1 is a streaming table with
> amounts in a certain currency. Table 2 is a (slowly changing) streaming
> table of currency exchange rates.
> We want to join the amounts stream with the exchange rate of the
> corresponding currency that is valid (i.e., last received value ->
> MAX(rowtime)) at the rowtime of the amounts row.
> In order to specify the query, we need to refer to the rowtime of the
> different tables. Hence, we need a way to relate the rowtime expression (or
> marker) to a table.
> This is not possible with a parameterless scalar function.
>
> I'd like to comment on the concerns regarding the performance:
> In fact, the columns could be completely virtual and only exist during
> query parsing and validation.
> During execution, we can directly access the rowtime metadata of a Flink
> streaming record (which is present anyway) or look up the current
> processing time from the machine clock. So the processing overhead would
> actually be the same as with a marker function.
>
> Regarding the question on what should be allowed with a system attribute:
> IMO, it could be used as any other attribute. We need it at least in GROUP
> BY, ORDER BY, and WHERE to define windows and joins. We could also allow to
> access it in SELECT if we want users to give access to rowtime and
> processing time. So @Haohui, your query could be supported.
> However, what would not be allowed is to modify the value of the rows,
> i.e., by naming another column rowtime, i.e., "SELECT sometimestamp AS
> rowtime" would not be allowed, because Flink does not support to modify the
> event time of a row (for good reasons) and processing time should not be
> modifiable anyway.
>
> @Timo:
> I think the approach to only use the system columns during parsing and
> validation and converting them to expressions afterwards makes a lot of
> sense.
> The question is how this approach could be nicely integrated with Calcite.
>
> Best, Fabian
>
>
>
> 2017-02-15 16:50 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com>:
>
>> Hi,
>>
>> My initial thought would be that it makes more sense to thave procTime()
>> and rowTime() only as functions which in fact are to be used as markers.
>> Having the value (even from special system attributes does not make sense
>> in some scenario such as the ones for creating windows, e.g.,
>> If you have SELECT Count(*) OVER (ORDER BY procTime()...)
>> If you get the value of procTime you cannot do anything as you need the
>> marker to know how to construct the window logic.
>>
>> However, your final idea of having " implement some rule/logic that
>> translates the attributes to special RexNodes internally " I believe is
>> good and gives a solution to both problems. One the one hand for those
>> scenarios where you need the value you can access the value, while for
>> others you can see the special type of the RexNode and use it as a marker.
>>
>> Regarding keeping this data in a table...i am not sure as you would say
>> we  need to augment the data with two fields whether needed or not...this
>> is nto necessary very efficient
>>
>>
>> Dr. Radu Tudoran
>> Senior Research Engineer - Big Data Expert
>> IT R&D Division
>>
>>
>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>> European Research Center
>> Riesstrasse 25, 80992 München
>>
>> E-mail: radu.tudo...@huawei.com
>> Mobile: +49 15209084330
>> Telephone: +49 891588344173
>>
>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
>> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
>> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
>> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
>> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
>> This e-mail and its attachments contain confidential information from
>> HUAWEI, which is intended only for the person or entity whose address is
>> listed above. Any use of the information contained herein in any way
>> (including, but not limited to, total or partial disclosure, reproduction,
>> or dissemination) by persons other than the intended recipient(s) is
>> prohibited. If you receive this e-mail in error, please notify the sender
>> by phone or email immediately and delete it!
>>
>> -----Original Message-----
>> From: Timo Walther [mailto:twal...@apache.org]
>> Sent: Wednesday, February 15, 2017 9:33 AM
>> To: dev@flink.apache.org
>> Subject: Re: [DISCUSS] Table API / SQL indicators for event and
>> processing time
>>
>> Hi all,
>>
>> at first I also thought that built-in functions (rowtime() and
>> proctime()) are the easiest solution. However, I think to be future-proof
>> we should make them system attributes; esp. to relate them to a
>> corresponding table in case of multiple tables. Logically they are
>> attributes of each row, which is already done in Table API.
>>
>> I will ask on the Calcite ML if there is a good way for integrating
>> system attributes. Right now, I would propose the following implementation:
>>
>> - we introduce a custom row type (extending RelDataType)
>> - in a streaming environment every row has two attributes by default
>> (rowtime and proctime)
>> - we do not allow creating a row type with those attributes (this should
>> already prevent `SELECT field AS rowtime FROM ...`)
>> - we need to ensure that these attributes are not part of expansion like
>> `SELECT * FROM ...`
>> - implement some rule/logic that translates the attributes to special
>> RexNodes internally, such that the opimizer does not modify these attributes
>>
>> What do you think?
>>
>> Regards,
>> Timo
>>
>>
>>
>>
>> Am 15/02/17 um 03:36 schrieb Xingcan Cui:
>> > Hi all,
>> >
>> > thanks for this thread.
>> >
>> > @Fabian If I didn't miss the point, the main difference between the
>> > two approaches is whether or not taking these time attributes as
>> > common table fields that are directly available to users. Whatever,
>> > these time attributes should be attached to records (right?), and the
>> > discussion lies in whether give them public qualifiers like other
>> > common fields or private qualifiers and related get/set methods.
>> >
>> > The former (system attributes) approach will be more compatible with
>> > existing SQL read-only operations (e.g., select, join), but we need to
>> > add restrictions on SQL modification operation (like what?). I think
>> > there are no needs to forbid users modifying these attributes via
>> > table APIs (like map function). Just inform them about these special
>> > attribute names like system built in aggregator names in iteration.
>> >
>> > As for the built in function approach, I don't know if, for now, there
>> > are functions applied on a single row (maybe the value access
>> > functions like COMPOSITE.get(STRING)?). It seems that most of the
>> > built in functions work for a single field or on columns and thus it
>> > will be mountains of work if we want to add a new kind of function to
>> > SQL. Maybe all existing operations should be modified to support it.
>> >
>> > All in all, if there are existing supports for single row function, I
>> > prefer the built in function approach. Otherwise the system attributes
>> > approach should be better. After all there are not so much
>> > modification operations in SQL and maybe we can use alias to support
>> > time attributes setting (just hypothesis, not sure if it's feasible).
>> >
>> > @Haohui I think the given query is valid if we add a aggregate
>> > function to (PROCTIME()
>> > - ROWTIME()) / 1000 and it should be executed efficiently.
>> >
>> > Best,
>> > Xingcan
>> >
>> > On Wed, Feb 15, 2017 at 6:17 AM, Haohui Mai <ricet...@gmail.com> wrote:
>> >
>> >> Hi,
>> >>
>> >> Thanks for starting the discussion. I can see there are multiple
>> >> trade-offs in these two approaches. One question I have is that to
>> >> which extent Flink wants to open its APIs to allow users to access
>> >> both processing and event time.
>> >>
>> >> Before we talk about joins, my understanding for the two approaches
>> >> that you mentioned are essentially (1) treating the value of event /
>> >> processing time as first-class fields for each row, (2) limiting the
>> >> scope of time indicators to only specifying windows. Take the
>> >> following query as an
>> >> example:
>> >>
>> >> SELECT (PROCTIME() - ROWTIME()) / 1000 AS latency FROM table GROUP BY
>> >> FLOOR(PROCTIME() TO MINUTES)
>> >>
>> >> There are several questions we can ask:
>> >>
>> >> (1) Is it a valid query?
>> >> (2) How efficient the query will be?
>> >>
>> >> For this query I can see arguments from both sides. I think at the
>> >> end of the day it really comes down to what Flink wants to support.
>> >> After working on FLINK-5624 I'm more inclined to support the second
>> >> approach (i.e., built-in functions). The main reason why is that the
>> >> APIs of Flink are designed to separate times from the real payloads.
>> >> It probably makes sense for the Table / SQL APIs to have the same
>> designs.
>> >>
>> >> For joins I don't have a clear answer on top of my head. Flink
>> >> requires two streams to be put in the same window before doing the
>> >> joins. This is essentially a subset of what SQL can express. I don't
>> >> know what would be the best approach here.
>> >>
>> >> Regards,
>> >> Haohui
>> >>
>> >>
>> >> On Tue, Feb 14, 2017 at 12:26 AM Fabian Hueske <fhue...@gmail.com>
>> wrote:
>> >>
>> >>> Hi,
>> >>>
>> >>> It would as in the query I gave as an example before:
>> >>>
>> >>> SELECT
>> >>>    a,
>> >>>    SUM(b) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN 2
>> >>> PRECEDING AND CURRENT ROW) AS sumB, FROM myStream
>> >>>
>> >>> Here "proctime" would be a system attribute of the table "myStream".
>> >>> The table would also have another system attribute called "rowtime"
>> >>> which would be used to indicate event time semantics.
>> >>> These attributes would always be present in tables which are derived
>> >>> from streams.
>> >>> Because we still require that streams have timestamps and watermarks
>> >>> assigned (either by the StreamTableSource or the somewhere
>> >>> downstream the DataStream program) when they are converted into a
>> >>> table, there is no
>> >> need
>> >>> to register anything.
>> >>>
>> >>> Does that answer your questions?
>> >>>
>> >>> Best, Fabian
>> >>>
>> >>>
>> >>>
>> >>> 2017-02-14 2:04 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com>:
>> >>>
>> >>>> Hi Fabian,
>> >>>>
>> >>>> Thanks for starting the discussion. Before I give my thoughts on
>> >>>> this
>> >> can
>> >>>> you please give some examples of how would you see option of using
>> >>> "system
>> >>>> attributes"?
>> >>>> Do you use this when you register the stream as a table, do you use
>> >>>> if when you call an SQL query, do you use it when you translate
>> >>>> back a
>> >> table
>> >>>> to a stream / write it to a dynamic table?
>> >>>>
>> >>>> Dr. Radu Tudoran
>> >>>> Senior Research Engineer - Big Data Expert IT R&D Division
>> >>>>
>> >>>>
>> >>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>> >>>> European Research Center
>> >>>> Riesstrasse 25, 80992 München
>> >>>>
>> >>>> E-mail: radu.tudo...@huawei.com
>> >>>> Mobile: +49 15209084330 <+49%201520%209084330>
>> >>>> Telephone: +49 891588344173 <+49%2089%201588344173>
>> >>>>
>> >>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>> >>>> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
>> >>>> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
>> >>>> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
>> >>>> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
>> >>>> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
>> >>>> This e-mail and its attachments contain confidential information from
>> >>>> HUAWEI, which is intended only for the person or entity whose address
>> >> is
>> >>>> listed above. Any use of the information contained herein in any way
>> >>>> (including, but not limited to, total or partial disclosure,
>> >>> reproduction,
>> >>>> or dissemination) by persons other than the intended recipient(s) is
>> >>>> prohibited. If you receive this e-mail in error, please notify the
>> >> sender
>> >>>> by phone or email immediately and delete it!
>> >>>>
>> >>>> -----Original Message-----
>> >>>> From: Fabian Hueske [mailto:fhue...@gmail.com]
>> >>>> Sent: Tuesday, February 14, 2017 1:01 AM
>> >>>> To: dev@flink.apache.org
>> >>>> Subject: [DISCUSS] Table API / SQL indicators for event and
>> processing
>> >>> time
>> >>>> Hi,
>> >>>>
>> >>>> I'd like to start an discussion about how Table API / SQL queries
>> >>> indicate
>> >>>> whether an operation is done in event or processing time.
>> >>>>
>> >>>> 1) Why do we need to indicate the time mode?
>> >>>>
>> >>>> We need to distinguish event time and processing time mode for
>> >> operations
>> >>>> in queries in order to have the semantics of a query fully defined.
>> >>>> This cannot be globally done in the TableEnvironment because some
>> >> queries
>> >>>> explicitly request an expression such as the ORDER BY clause of an
>> OVER
>> >>>> window with PRECEDING / FOLLOWING clauses.
>> >>>> So we need a way to specify something like the following query:
>> >>>>
>> >>>> SELECT
>> >>>>    a,
>> >>>>    SUM(b) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN 2
>> >> PRECEDING
>> >>>> AND CURRENT ROW) AS sumB, FROM myStream
>> >>>>
>> >>>> where "proctime" indicates processing time. Equivalently "rowtime"
>> >> would
>> >>>> indicate event time.
>> >>>>
>> >>>> 2) Current state
>> >>>>
>> >>>> The current master branch implements time support only for grouping
>> >>>> windows in the Table API.
>> >>>> Internally, the Table API converts a 'rowtime symbol (which looks
>> like
>> >> a
>> >>>> regular attribute) into a special expression which indicates
>> >> event-time.
>> >>>> For example:
>> >>>>
>> >>>> table
>> >>>>    .window(Tumble over 5.milli on 'rowtime as 'w)
>> >>>>    .groupBy('a, 'w)
>> >>>>    .select(...)
>> >>>>
>> >>>> defines a tumbling event-time window.
>> >>>>
>> >>>> Processing-time is indicated by omitting a time attribute
>> >>>> (table.window(Tumble over 5.milli as 'w) ).
>> >>>>
>> >>>> 3) How can we do that in SQL?
>> >>>>
>> >>>> In SQL we cannot add special expressions without touching the parser
>> >>> which
>> >>>> we don't want to do because we want to stick to the SQL standard.
>> >>>> Therefore, I see only two options: adding system attributes or
>> >>>> (parameterless) built-in functions. I list some pros and cons of the
>> >>>> approaches below:
>> >>>>
>> >>>> 1. System Attributes:
>> >>>> + most natural way to access a property of a record.
>> >>>> + works with joins, because time attributes can be related to tables
>> >>>> - We need to ensure the attributes are not writable and always
>> present
>> >> in
>> >>>> streaming tables (i.e., they should be system defined attributes).
>> >>>> - Need to adapt existing Table API expressions (will not change the
>> API
>> >>>> but some parts of the internal translation)
>> >>>> - Event time value must be set when the stream is converted,
>> processing
>> >>>> time is evaluated on the fly
>> >>>>
>> >>>> 2. Built-in Functions
>> >>>> + Users could try to modify time attributes which is not possible
>> with
>> >>>> functions
>> >>>> - do not work with joins, because we need to address different
>> >> relations
>> >>>> - not a natural way to access a property of a record
>> >>>>
>> >>>> I think the only viable choice are system attributes, because
>> built-in
>> >>>> functions cannot be used for joins.
>> >>>> However, system attributes are the more complex solution because they
>> >>> need
>> >>>> a better integration with Calcite's SQL validator (preventing user
>> >>>> attributes which are named rowtime for instance).
>> >>>>
>> >>>> Since there are currently a several contributions on the way (such as
>> >> SQL
>> >>>> OVER windows FLINK-5653 to FLINK-5658) that need time indicators, we
>> >>> need a
>> >>>> solution soon to be able to make progress.
>> >>>> There are two PRs, #3252 and #3271, which implement the built-in
>> marker
>> >>>> functions proctime() and rowtime() and which could serve as a
>> temporary
>> >>>> solution (since we do not work on joins yet).
>> >>>> I would like to suggest to use these functions as a starting point
>> >> (once
>> >>>> the PRs are merged) and later change to the system attribute solution
>> >>> which
>> >>>> needs a bit more time to be implemented.
>> >>>>
>> >>>> I talked with Timo today about this issue and he said he would like
>> to
>> >>>> investigate how we can implement this as system functions properly
>> >>>> integrated with Calcite and the SQL Validator.
>> >>>>
>> >>>> What do others think?
>> >>>>
>> >>>> Best, Fabian
>> >>>>
>>
>>
>

Reply via email to