Hi everybody, When Timo wrote to the Calcite mailing list, Julian Hyde replied and gave good advice and explained why a system attribute for event-time would be a problem [1]. I thought about this and agree with Julian.
Here is a document to describe the problem, constraints in Flink and a proposal how to handle processing time and event time in Table API and SQL: -> https://docs.google.com/document/d/1MDGViWA_TCqpaVoWub7u_GY4PMFSbT8TuaNl-EpbTHQ Please have a look, comment and ask questions. Thank you, Fabian [1] https://lists.apache.org/thread.html/6397caf0ca37f97f2cd27d96f7a12c6fa845d6fd0870214fdce18d96@%3Cdev.calcite.apache.org%3E 2017-02-16 1:18 GMT+01:00 Fabian Hueske <fhue...@gmail.com>: > Thanks everybody for the comments. > > Actually, I think we do not have much choice when deciding whether to use > attributes or functions. > Consider the following join query: > > SELECT t1.amount, t2.rate > FROM > table1 AS t1, > table2 AS t2 > WHERE > t1.currency = t2.currency AND > t2.rowtime = ( > SELECT MAX(t22.rowtime) > FROM table2 AS t22 > AND t22.rowtime <= t1.rowtime) > > The query joins two streaming tables. Table 1 is a streaming table with > amounts in a certain currency. Table 2 is a (slowly changing) streaming > table of currency exchange rates. > We want to join the amounts stream with the exchange rate of the > corresponding currency that is valid (i.e., last received value -> > MAX(rowtime)) at the rowtime of the amounts row. > In order to specify the query, we need to refer to the rowtime of the > different tables. Hence, we need a way to relate the rowtime expression (or > marker) to a table. > This is not possible with a parameterless scalar function. > > I'd like to comment on the concerns regarding the performance: > In fact, the columns could be completely virtual and only exist during > query parsing and validation. > During execution, we can directly access the rowtime metadata of a Flink > streaming record (which is present anyway) or look up the current > processing time from the machine clock. So the processing overhead would > actually be the same as with a marker function. > > Regarding the question on what should be allowed with a system attribute: > IMO, it could be used as any other attribute. We need it at least in GROUP > BY, ORDER BY, and WHERE to define windows and joins. We could also allow to > access it in SELECT if we want users to give access to rowtime and > processing time. So @Haohui, your query could be supported. > However, what would not be allowed is to modify the value of the rows, > i.e., by naming another column rowtime, i.e., "SELECT sometimestamp AS > rowtime" would not be allowed, because Flink does not support to modify the > event time of a row (for good reasons) and processing time should not be > modifiable anyway. > > @Timo: > I think the approach to only use the system columns during parsing and > validation and converting them to expressions afterwards makes a lot of > sense. > The question is how this approach could be nicely integrated with Calcite. > > Best, Fabian > > > > 2017-02-15 16:50 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com>: > >> Hi, >> >> My initial thought would be that it makes more sense to thave procTime() >> and rowTime() only as functions which in fact are to be used as markers. >> Having the value (even from special system attributes does not make sense >> in some scenario such as the ones for creating windows, e.g., >> If you have SELECT Count(*) OVER (ORDER BY procTime()...) >> If you get the value of procTime you cannot do anything as you need the >> marker to know how to construct the window logic. >> >> However, your final idea of having " implement some rule/logic that >> translates the attributes to special RexNodes internally " I believe is >> good and gives a solution to both problems. One the one hand for those >> scenarios where you need the value you can access the value, while for >> others you can see the special type of the RexNode and use it as a marker. >> >> Regarding keeping this data in a table...i am not sure as you would say >> we need to augment the data with two fields whether needed or not...this >> is nto necessary very efficient >> >> >> Dr. Radu Tudoran >> Senior Research Engineer - Big Data Expert >> IT R&D Division >> >> >> HUAWEI TECHNOLOGIES Duesseldorf GmbH >> European Research Center >> Riesstrasse 25, 80992 München >> >> E-mail: radu.tudo...@huawei.com >> Mobile: +49 15209084330 >> Telephone: +49 891588344173 >> >> HUAWEI TECHNOLOGIES Duesseldorf GmbH >> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com >> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, >> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN >> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, >> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN >> This e-mail and its attachments contain confidential information from >> HUAWEI, which is intended only for the person or entity whose address is >> listed above. Any use of the information contained herein in any way >> (including, but not limited to, total or partial disclosure, reproduction, >> or dissemination) by persons other than the intended recipient(s) is >> prohibited. If you receive this e-mail in error, please notify the sender >> by phone or email immediately and delete it! >> >> -----Original Message----- >> From: Timo Walther [mailto:twal...@apache.org] >> Sent: Wednesday, February 15, 2017 9:33 AM >> To: dev@flink.apache.org >> Subject: Re: [DISCUSS] Table API / SQL indicators for event and >> processing time >> >> Hi all, >> >> at first I also thought that built-in functions (rowtime() and >> proctime()) are the easiest solution. However, I think to be future-proof >> we should make them system attributes; esp. to relate them to a >> corresponding table in case of multiple tables. Logically they are >> attributes of each row, which is already done in Table API. >> >> I will ask on the Calcite ML if there is a good way for integrating >> system attributes. Right now, I would propose the following implementation: >> >> - we introduce a custom row type (extending RelDataType) >> - in a streaming environment every row has two attributes by default >> (rowtime and proctime) >> - we do not allow creating a row type with those attributes (this should >> already prevent `SELECT field AS rowtime FROM ...`) >> - we need to ensure that these attributes are not part of expansion like >> `SELECT * FROM ...` >> - implement some rule/logic that translates the attributes to special >> RexNodes internally, such that the opimizer does not modify these attributes >> >> What do you think? >> >> Regards, >> Timo >> >> >> >> >> Am 15/02/17 um 03:36 schrieb Xingcan Cui: >> > Hi all, >> > >> > thanks for this thread. >> > >> > @Fabian If I didn't miss the point, the main difference between the >> > two approaches is whether or not taking these time attributes as >> > common table fields that are directly available to users. Whatever, >> > these time attributes should be attached to records (right?), and the >> > discussion lies in whether give them public qualifiers like other >> > common fields or private qualifiers and related get/set methods. >> > >> > The former (system attributes) approach will be more compatible with >> > existing SQL read-only operations (e.g., select, join), but we need to >> > add restrictions on SQL modification operation (like what?). I think >> > there are no needs to forbid users modifying these attributes via >> > table APIs (like map function). Just inform them about these special >> > attribute names like system built in aggregator names in iteration. >> > >> > As for the built in function approach, I don't know if, for now, there >> > are functions applied on a single row (maybe the value access >> > functions like COMPOSITE.get(STRING)?). It seems that most of the >> > built in functions work for a single field or on columns and thus it >> > will be mountains of work if we want to add a new kind of function to >> > SQL. Maybe all existing operations should be modified to support it. >> > >> > All in all, if there are existing supports for single row function, I >> > prefer the built in function approach. Otherwise the system attributes >> > approach should be better. After all there are not so much >> > modification operations in SQL and maybe we can use alias to support >> > time attributes setting (just hypothesis, not sure if it's feasible). >> > >> > @Haohui I think the given query is valid if we add a aggregate >> > function to (PROCTIME() >> > - ROWTIME()) / 1000 and it should be executed efficiently. >> > >> > Best, >> > Xingcan >> > >> > On Wed, Feb 15, 2017 at 6:17 AM, Haohui Mai <ricet...@gmail.com> wrote: >> > >> >> Hi, >> >> >> >> Thanks for starting the discussion. I can see there are multiple >> >> trade-offs in these two approaches. One question I have is that to >> >> which extent Flink wants to open its APIs to allow users to access >> >> both processing and event time. >> >> >> >> Before we talk about joins, my understanding for the two approaches >> >> that you mentioned are essentially (1) treating the value of event / >> >> processing time as first-class fields for each row, (2) limiting the >> >> scope of time indicators to only specifying windows. Take the >> >> following query as an >> >> example: >> >> >> >> SELECT (PROCTIME() - ROWTIME()) / 1000 AS latency FROM table GROUP BY >> >> FLOOR(PROCTIME() TO MINUTES) >> >> >> >> There are several questions we can ask: >> >> >> >> (1) Is it a valid query? >> >> (2) How efficient the query will be? >> >> >> >> For this query I can see arguments from both sides. I think at the >> >> end of the day it really comes down to what Flink wants to support. >> >> After working on FLINK-5624 I'm more inclined to support the second >> >> approach (i.e., built-in functions). The main reason why is that the >> >> APIs of Flink are designed to separate times from the real payloads. >> >> It probably makes sense for the Table / SQL APIs to have the same >> designs. >> >> >> >> For joins I don't have a clear answer on top of my head. Flink >> >> requires two streams to be put in the same window before doing the >> >> joins. This is essentially a subset of what SQL can express. I don't >> >> know what would be the best approach here. >> >> >> >> Regards, >> >> Haohui >> >> >> >> >> >> On Tue, Feb 14, 2017 at 12:26 AM Fabian Hueske <fhue...@gmail.com> >> wrote: >> >> >> >>> Hi, >> >>> >> >>> It would as in the query I gave as an example before: >> >>> >> >>> SELECT >> >>> a, >> >>> SUM(b) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN 2 >> >>> PRECEDING AND CURRENT ROW) AS sumB, FROM myStream >> >>> >> >>> Here "proctime" would be a system attribute of the table "myStream". >> >>> The table would also have another system attribute called "rowtime" >> >>> which would be used to indicate event time semantics. >> >>> These attributes would always be present in tables which are derived >> >>> from streams. >> >>> Because we still require that streams have timestamps and watermarks >> >>> assigned (either by the StreamTableSource or the somewhere >> >>> downstream the DataStream program) when they are converted into a >> >>> table, there is no >> >> need >> >>> to register anything. >> >>> >> >>> Does that answer your questions? >> >>> >> >>> Best, Fabian >> >>> >> >>> >> >>> >> >>> 2017-02-14 2:04 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com>: >> >>> >> >>>> Hi Fabian, >> >>>> >> >>>> Thanks for starting the discussion. Before I give my thoughts on >> >>>> this >> >> can >> >>>> you please give some examples of how would you see option of using >> >>> "system >> >>>> attributes"? >> >>>> Do you use this when you register the stream as a table, do you use >> >>>> if when you call an SQL query, do you use it when you translate >> >>>> back a >> >> table >> >>>> to a stream / write it to a dynamic table? >> >>>> >> >>>> Dr. Radu Tudoran >> >>>> Senior Research Engineer - Big Data Expert IT R&D Division >> >>>> >> >>>> >> >>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH >> >>>> European Research Center >> >>>> Riesstrasse 25, 80992 München >> >>>> >> >>>> E-mail: radu.tudo...@huawei.com >> >>>> Mobile: +49 15209084330 <+49%201520%209084330> >> >>>> Telephone: +49 891588344173 <+49%2089%201588344173> >> >>>> >> >>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH >> >>>> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com >> >>>> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, >> >>>> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN >> >>>> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, >> >>>> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN >> >>>> This e-mail and its attachments contain confidential information from >> >>>> HUAWEI, which is intended only for the person or entity whose address >> >> is >> >>>> listed above. Any use of the information contained herein in any way >> >>>> (including, but not limited to, total or partial disclosure, >> >>> reproduction, >> >>>> or dissemination) by persons other than the intended recipient(s) is >> >>>> prohibited. If you receive this e-mail in error, please notify the >> >> sender >> >>>> by phone or email immediately and delete it! >> >>>> >> >>>> -----Original Message----- >> >>>> From: Fabian Hueske [mailto:fhue...@gmail.com] >> >>>> Sent: Tuesday, February 14, 2017 1:01 AM >> >>>> To: dev@flink.apache.org >> >>>> Subject: [DISCUSS] Table API / SQL indicators for event and >> processing >> >>> time >> >>>> Hi, >> >>>> >> >>>> I'd like to start an discussion about how Table API / SQL queries >> >>> indicate >> >>>> whether an operation is done in event or processing time. >> >>>> >> >>>> 1) Why do we need to indicate the time mode? >> >>>> >> >>>> We need to distinguish event time and processing time mode for >> >> operations >> >>>> in queries in order to have the semantics of a query fully defined. >> >>>> This cannot be globally done in the TableEnvironment because some >> >> queries >> >>>> explicitly request an expression such as the ORDER BY clause of an >> OVER >> >>>> window with PRECEDING / FOLLOWING clauses. >> >>>> So we need a way to specify something like the following query: >> >>>> >> >>>> SELECT >> >>>> a, >> >>>> SUM(b) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN 2 >> >> PRECEDING >> >>>> AND CURRENT ROW) AS sumB, FROM myStream >> >>>> >> >>>> where "proctime" indicates processing time. Equivalently "rowtime" >> >> would >> >>>> indicate event time. >> >>>> >> >>>> 2) Current state >> >>>> >> >>>> The current master branch implements time support only for grouping >> >>>> windows in the Table API. >> >>>> Internally, the Table API converts a 'rowtime symbol (which looks >> like >> >> a >> >>>> regular attribute) into a special expression which indicates >> >> event-time. >> >>>> For example: >> >>>> >> >>>> table >> >>>> .window(Tumble over 5.milli on 'rowtime as 'w) >> >>>> .groupBy('a, 'w) >> >>>> .select(...) >> >>>> >> >>>> defines a tumbling event-time window. >> >>>> >> >>>> Processing-time is indicated by omitting a time attribute >> >>>> (table.window(Tumble over 5.milli as 'w) ). >> >>>> >> >>>> 3) How can we do that in SQL? >> >>>> >> >>>> In SQL we cannot add special expressions without touching the parser >> >>> which >> >>>> we don't want to do because we want to stick to the SQL standard. >> >>>> Therefore, I see only two options: adding system attributes or >> >>>> (parameterless) built-in functions. I list some pros and cons of the >> >>>> approaches below: >> >>>> >> >>>> 1. System Attributes: >> >>>> + most natural way to access a property of a record. >> >>>> + works with joins, because time attributes can be related to tables >> >>>> - We need to ensure the attributes are not writable and always >> present >> >> in >> >>>> streaming tables (i.e., they should be system defined attributes). >> >>>> - Need to adapt existing Table API expressions (will not change the >> API >> >>>> but some parts of the internal translation) >> >>>> - Event time value must be set when the stream is converted, >> processing >> >>>> time is evaluated on the fly >> >>>> >> >>>> 2. Built-in Functions >> >>>> + Users could try to modify time attributes which is not possible >> with >> >>>> functions >> >>>> - do not work with joins, because we need to address different >> >> relations >> >>>> - not a natural way to access a property of a record >> >>>> >> >>>> I think the only viable choice are system attributes, because >> built-in >> >>>> functions cannot be used for joins. >> >>>> However, system attributes are the more complex solution because they >> >>> need >> >>>> a better integration with Calcite's SQL validator (preventing user >> >>>> attributes which are named rowtime for instance). >> >>>> >> >>>> Since there are currently a several contributions on the way (such as >> >> SQL >> >>>> OVER windows FLINK-5653 to FLINK-5658) that need time indicators, we >> >>> need a >> >>>> solution soon to be able to make progress. >> >>>> There are two PRs, #3252 and #3271, which implement the built-in >> marker >> >>>> functions proctime() and rowtime() and which could serve as a >> temporary >> >>>> solution (since we do not work on joins yet). >> >>>> I would like to suggest to use these functions as a starting point >> >> (once >> >>>> the PRs are merged) and later change to the system attribute solution >> >>> which >> >>>> needs a bit more time to be implemented. >> >>>> >> >>>> I talked with Timo today about this issue and he said he would like >> to >> >>>> investigate how we can implement this as system functions properly >> >>>> integrated with Calcite and the SQL Validator. >> >>>> >> >>>> What do others think? >> >>>> >> >>>> Best, Fabian >> >>>> >> >> >