Hi Xingcan,

thanks for your thoughts.
In principle you are right that the monotone attribute property would be
sufficient, however there are more aspects to consider than that.

Flink is a parallel stream processor engine which means that data is
processed in separate processes and shuffle across them.
Maintaining a strict order when merging parallel streams would be
prohibitively expensive.
Flink's watermark mechanism helps operators to deal with out-of-order data
(due to out-of-order input or shuffles).
I don't think we can separate the discussion about time attributes from
watermarks if we want to use Flink as a processing engine and not
reimplement large parts from scratch.

When transforming a time attribute, we have to either align it with
existing watermarks or generate new watermarks.
If we want to allow all kinds of monotone transformations, we have to adapt
the watermarks which is not trivial.
Instead, I think we should initially only allow very few monotone
transformations which are aligned with the existing watermarks. We might
later relax this condition if we see that users request this feature.

You are right, that we need to track which attribute can be used as a time
attribute (i.e., is increasing and guarded by watermarks).
For that we need to expose the time attribute when a Table is created
(either when a DataStream is converted like: stream.toTable(tEnv, 'a, 'b,
't.rowtime) or in a StreamTableSource) and track how it is used in queries.
I am not sure if the monotone property would be the right choice here,
since data is only quasi-monotone and a monotone annotation might trigger
some invalid optimizations which change the semantics of a query.
Right now, Calcite does not offer a quasi-monotone property (at least I
haven't found it).

Best, Fabian


2017-02-21 4:41 GMT+01:00 Xingcan Cui <xingc...@gmail.com>:

> Hi all,
>
> As I said in another thread, the main difference between stream and table
> is that a stream is an ordered list while a table is an unordered set.
>
> Without considering the out-of-order problem in practice, whether
> event-time or processing-time can be just taken as a monotonically
> increasing field and that's why the given query[1] would work. In other
> words, we must guarantee the "SELECT MAX(t22.rowtime) ..." subquery returns
> a single value that can be retrieved from the cached dynamic table since
> it's dangerous to join two un-windowed streams.
>
> Under this circumstance, I just consider adding a "monotonic hint"(INC or
> DEC) to the field of a (generalized) table (maybe using an annotation on
> the registerDataXX method) that can be used to indicate whether a field is
> monotonically increasing or decreasing. Then by taking rowtime as common
> (monotonically increasing) field, there are several benefits:
>
> 1) This can uniform the table and stream by importing total ordering
> relation to an unordered set.
>
> 2) These fields can be modified arbitrarily as long as they keep the
> declared monotonic feature and the watermark problem does not exist any
> more.
>
> 3) The monotonic hint will be useful in the query optimization process.
>
> What do you think?
>
> Best,
> Xingcan
>
> [1]
> SELECT​ ​t1.amount​,​ ​t2.rate
> FROM​ ​
>   table1 ​AS​ t1,
> ​ ​ table2 ​AS​ ​t2
> WHERE ​
>   t1.currency = t2.currency AND
>   t2.rowtime ​=​ ​(
> ​ ​​ ​  SELECT​ ​MAX(t22.rowtime)
> ​ ​​ ​  FROM​ ​table2 ​AS​ t22
> ​ ​​   ​AND​ ​t22.rowtime ​<=​ t1.rowtime)
>
> On Tue, Feb 21, 2017 at 2:52 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>
> > Hi everybody,
> >
> > When Timo wrote to the Calcite mailing list, Julian Hyde replied and gave
> > good advice and explained why a system attribute for event-time would be
> a
> > problem [1].
> > I thought about this and agree with Julian.
> >
> > Here is a document to describe the problem, constraints in Flink and a
> > proposal how to handle processing time and event time in Table API and
> SQL:
> >
> > ->
> > https://docs.google.com/document/d/1MDGViWA_
> TCqpaVoWub7u_GY4PMFSbT8TuaNl-
> > EpbTHQ
> >
> > Please have a look, comment and ask questions.
> >
> > Thank you,
> > Fabian
> >
> > [1]
> > https://lists.apache.org/thread.html/6397caf0ca37f97f2cd27d96f7a12c
> > 6fa845d6fd0870214fdce18d96@%3Cdev.calcite.apache.org%3E
> >
> > 2017-02-16 1:18 GMT+01:00 Fabian Hueske <fhue...@gmail.com>:
> >
> > > Thanks everybody for the comments.
> > >
> > > Actually, I think we do not have much choice when deciding whether to
> use
> > > attributes or functions.
> > > Consider the following join query:
> > >
> > > SELECT​ ​t1.amount​,​ ​t2.rate
> > > FROM​ ​
> > >   table1 ​AS​ t1,
> > > ​ ​ table2 ​AS​ ​t2
> > > WHERE ​
> > >   t1.currency = t2.currency AND
> > >   t2.rowtime ​=​ ​(
> > > ​ ​​ ​  SELECT​ ​MAX(t22.rowtime)
> > > ​ ​​ ​  FROM​ ​table2 ​AS​ t22
> > > ​ ​​   ​AND​ ​t22.rowtime ​<=​ t1.rowtime)
> > >
> > > The query joins two streaming tables. Table 1 is a streaming table with
> > > amounts in a certain currency. Table 2 is a (slowly changing) streaming
> > > table of currency exchange rates.
> > > We want to join the amounts stream with the exchange rate of the
> > > corresponding currency that is valid (i.e., last received value ->
> > > MAX(rowtime)) at the rowtime of the amounts row.
> > > In order to specify the query, we need to refer to the rowtime of the
> > > different tables. Hence, we need a way to relate the rowtime expression
> > (or
> > > marker) to a table.
> > > This is not possible with a parameterless scalar function.
> > >
> > > I'd like to comment on the concerns regarding the performance:
> > > In fact, the columns could be completely virtual and only exist during
> > > query parsing and validation.
> > > During execution, we can directly access the rowtime metadata of a
> Flink
> > > streaming record (which is present anyway) or look up the current
> > > processing time from the machine clock. So the processing overhead
> would
> > > actually be the same as with a marker function.
> > >
> > > Regarding the question on what should be allowed with a system
> attribute:
> > > IMO, it could be used as any other attribute. We need it at least in
> > GROUP
> > > BY, ORDER BY, and WHERE to define windows and joins. We could also
> allow
> > to
> > > access it in SELECT if we want users to give access to rowtime and
> > > processing time. So @Haohui, your query could be supported.
> > > However, what would not be allowed is to modify the value of the rows,
> > > i.e., by naming another column rowtime, i.e., "SELECT sometimestamp AS
> > > rowtime" would not be allowed, because Flink does not support to modify
> > the
> > > event time of a row (for good reasons) and processing time should not
> be
> > > modifiable anyway.
> > >
> > > @Timo:
> > > I think the approach to only use the system columns during parsing and
> > > validation and converting them to expressions afterwards makes a lot of
> > > sense.
> > > The question is how this approach could be nicely integrated with
> > Calcite.
> > >
> > > Best, Fabian
> > >
> > >
> > >
> > > 2017-02-15 16:50 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com>:
> > >
> > >> Hi,
> > >>
> > >> My initial thought would be that it makes more sense to thave
> procTime()
> > >> and rowTime() only as functions which in fact are to be used as
> markers.
> > >> Having the value (even from special system attributes does not make
> > sense
> > >> in some scenario such as the ones for creating windows, e.g.,
> > >> If you have SELECT Count(*) OVER (ORDER BY procTime()...)
> > >> If you get the value of procTime you cannot do anything as you need
> the
> > >> marker to know how to construct the window logic.
> > >>
> > >> However, your final idea of having " implement some rule/logic that
> > >> translates the attributes to special RexNodes internally " I believe
> is
> > >> good and gives a solution to both problems. One the one hand for those
> > >> scenarios where you need the value you can access the value, while for
> > >> others you can see the special type of the RexNode and use it as a
> > marker.
> > >>
> > >> Regarding keeping this data in a table...i am not sure as you would
> say
> > >> we  need to augment the data with two fields whether needed or
> > not...this
> > >> is nto necessary very efficient
> > >>
> > >>
> > >> Dr. Radu Tudoran
> > >> Senior Research Engineer - Big Data Expert
> > >> IT R&D Division
> > >>
> > >>
> > >> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > >> European Research Center
> > >> Riesstrasse 25, 80992 München
> > >>
> > >> E-mail: radu.tudo...@huawei.com
> > >> Mobile: +49 15209084330
> > >> Telephone: +49 891588344173
> > >>
> > >> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > >> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> > >> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> > >> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
> > >> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> > >> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
> > >> This e-mail and its attachments contain confidential information from
> > >> HUAWEI, which is intended only for the person or entity whose address
> is
> > >> listed above. Any use of the information contained herein in any way
> > >> (including, but not limited to, total or partial disclosure,
> > reproduction,
> > >> or dissemination) by persons other than the intended recipient(s) is
> > >> prohibited. If you receive this e-mail in error, please notify the
> > sender
> > >> by phone or email immediately and delete it!
> > >>
> > >> -----Original Message-----
> > >> From: Timo Walther [mailto:twal...@apache.org]
> > >> Sent: Wednesday, February 15, 2017 9:33 AM
> > >> To: dev@flink.apache.org
> > >> Subject: Re: [DISCUSS] Table API / SQL indicators for event and
> > >> processing time
> > >>
> > >> Hi all,
> > >>
> > >> at first I also thought that built-in functions (rowtime() and
> > >> proctime()) are the easiest solution. However, I think to be
> > future-proof
> > >> we should make them system attributes; esp. to relate them to a
> > >> corresponding table in case of multiple tables. Logically they are
> > >> attributes of each row, which is already done in Table API.
> > >>
> > >> I will ask on the Calcite ML if there is a good way for integrating
> > >> system attributes. Right now, I would propose the following
> > implementation:
> > >>
> > >> - we introduce a custom row type (extending RelDataType)
> > >> - in a streaming environment every row has two attributes by default
> > >> (rowtime and proctime)
> > >> - we do not allow creating a row type with those attributes (this
> should
> > >> already prevent `SELECT field AS rowtime FROM ...`)
> > >> - we need to ensure that these attributes are not part of expansion
> like
> > >> `SELECT * FROM ...`
> > >> - implement some rule/logic that translates the attributes to special
> > >> RexNodes internally, such that the opimizer does not modify these
> > attributes
> > >>
> > >> What do you think?
> > >>
> > >> Regards,
> > >> Timo
> > >>
> > >>
> > >>
> > >>
> > >> Am 15/02/17 um 03:36 schrieb Xingcan Cui:
> > >> > Hi all,
> > >> >
> > >> > thanks for this thread.
> > >> >
> > >> > @Fabian If I didn't miss the point, the main difference between the
> > >> > two approaches is whether or not taking these time attributes as
> > >> > common table fields that are directly available to users. Whatever,
> > >> > these time attributes should be attached to records (right?), and
> the
> > >> > discussion lies in whether give them public qualifiers like other
> > >> > common fields or private qualifiers and related get/set methods.
> > >> >
> > >> > The former (system attributes) approach will be more compatible with
> > >> > existing SQL read-only operations (e.g., select, join), but we need
> to
> > >> > add restrictions on SQL modification operation (like what?). I think
> > >> > there are no needs to forbid users modifying these attributes via
> > >> > table APIs (like map function). Just inform them about these special
> > >> > attribute names like system built in aggregator names in iteration.
> > >> >
> > >> > As for the built in function approach, I don't know if, for now,
> there
> > >> > are functions applied on a single row (maybe the value access
> > >> > functions like COMPOSITE.get(STRING)?). It seems that most of the
> > >> > built in functions work for a single field or on columns and thus it
> > >> > will be mountains of work if we want to add a new kind of function
> to
> > >> > SQL. Maybe all existing operations should be modified to support it.
> > >> >
> > >> > All in all, if there are existing supports for single row function,
> I
> > >> > prefer the built in function approach. Otherwise the system
> attributes
> > >> > approach should be better. After all there are not so much
> > >> > modification operations in SQL and maybe we can use alias to support
> > >> > time attributes setting (just hypothesis, not sure if it's
> feasible).
> > >> >
> > >> > @Haohui I think the given query is valid if we add a aggregate
> > >> > function to (PROCTIME()
> > >> > - ROWTIME()) / 1000 and it should be executed efficiently.
> > >> >
> > >> > Best,
> > >> > Xingcan
> > >> >
> > >> > On Wed, Feb 15, 2017 at 6:17 AM, Haohui Mai <ricet...@gmail.com>
> > wrote:
> > >> >
> > >> >> Hi,
> > >> >>
> > >> >> Thanks for starting the discussion. I can see there are multiple
> > >> >> trade-offs in these two approaches. One question I have is that to
> > >> >> which extent Flink wants to open its APIs to allow users to access
> > >> >> both processing and event time.
> > >> >>
> > >> >> Before we talk about joins, my understanding for the two approaches
> > >> >> that you mentioned are essentially (1) treating the value of event
> /
> > >> >> processing time as first-class fields for each row, (2) limiting
> the
> > >> >> scope of time indicators to only specifying windows. Take the
> > >> >> following query as an
> > >> >> example:
> > >> >>
> > >> >> SELECT (PROCTIME() - ROWTIME()) / 1000 AS latency FROM table GROUP
> BY
> > >> >> FLOOR(PROCTIME() TO MINUTES)
> > >> >>
> > >> >> There are several questions we can ask:
> > >> >>
> > >> >> (1) Is it a valid query?
> > >> >> (2) How efficient the query will be?
> > >> >>
> > >> >> For this query I can see arguments from both sides. I think at the
> > >> >> end of the day it really comes down to what Flink wants to support.
> > >> >> After working on FLINK-5624 I'm more inclined to support the second
> > >> >> approach (i.e., built-in functions). The main reason why is that
> the
> > >> >> APIs of Flink are designed to separate times from the real
> payloads.
> > >> >> It probably makes sense for the Table / SQL APIs to have the same
> > >> designs.
> > >> >>
> > >> >> For joins I don't have a clear answer on top of my head. Flink
> > >> >> requires two streams to be put in the same window before doing the
> > >> >> joins. This is essentially a subset of what SQL can express. I
> don't
> > >> >> know what would be the best approach here.
> > >> >>
> > >> >> Regards,
> > >> >> Haohui
> > >> >>
> > >> >>
> > >> >> On Tue, Feb 14, 2017 at 12:26 AM Fabian Hueske <fhue...@gmail.com>
> > >> wrote:
> > >> >>
> > >> >>> Hi,
> > >> >>>
> > >> >>> It would as in the query I gave as an example before:
> > >> >>>
> > >> >>> SELECT
> > >> >>>    a,
> > >> >>>    SUM(b) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN 2
> > >> >>> PRECEDING AND CURRENT ROW) AS sumB, FROM myStream
> > >> >>>
> > >> >>> Here "proctime" would be a system attribute of the table
> "myStream".
> > >> >>> The table would also have another system attribute called
> "rowtime"
> > >> >>> which would be used to indicate event time semantics.
> > >> >>> These attributes would always be present in tables which are
> derived
> > >> >>> from streams.
> > >> >>> Because we still require that streams have timestamps and
> watermarks
> > >> >>> assigned (either by the StreamTableSource or the somewhere
> > >> >>> downstream the DataStream program) when they are converted into a
> > >> >>> table, there is no
> > >> >> need
> > >> >>> to register anything.
> > >> >>>
> > >> >>> Does that answer your questions?
> > >> >>>
> > >> >>> Best, Fabian
> > >> >>>
> > >> >>>
> > >> >>>
> > >> >>> 2017-02-14 2:04 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com>:
> > >> >>>
> > >> >>>> Hi Fabian,
> > >> >>>>
> > >> >>>> Thanks for starting the discussion. Before I give my thoughts on
> > >> >>>> this
> > >> >> can
> > >> >>>> you please give some examples of how would you see option of
> using
> > >> >>> "system
> > >> >>>> attributes"?
> > >> >>>> Do you use this when you register the stream as a table, do you
> use
> > >> >>>> if when you call an SQL query, do you use it when you translate
> > >> >>>> back a
> > >> >> table
> > >> >>>> to a stream / write it to a dynamic table?
> > >> >>>>
> > >> >>>> Dr. Radu Tudoran
> > >> >>>> Senior Research Engineer - Big Data Expert IT R&D Division
> > >> >>>>
> > >> >>>>
> > >> >>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > >> >>>> European Research Center
> > >> >>>> Riesstrasse 25, 80992 München
> > >> >>>>
> > >> >>>> E-mail: radu.tudo...@huawei.com
> > >> >>>> Mobile: +49 15209084330 <+49%201520%209084330>
> > >> >>>> Telephone: +49 891588344173 <+49%2089%201588344173>
> > >> >>>>
> > >> >>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > >> >>>> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> > >> >>>> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB
> > 56063,
> > >> >>>> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
> > >> >>>> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB
> > 56063,
> > >> >>>> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
> > >> >>>> This e-mail and its attachments contain confidential information
> > from
> > >> >>>> HUAWEI, which is intended only for the person or entity whose
> > address
> > >> >> is
> > >> >>>> listed above. Any use of the information contained herein in any
> > way
> > >> >>>> (including, but not limited to, total or partial disclosure,
> > >> >>> reproduction,
> > >> >>>> or dissemination) by persons other than the intended recipient(s)
> > is
> > >> >>>> prohibited. If you receive this e-mail in error, please notify
> the
> > >> >> sender
> > >> >>>> by phone or email immediately and delete it!
> > >> >>>>
> > >> >>>> -----Original Message-----
> > >> >>>> From: Fabian Hueske [mailto:fhue...@gmail.com]
> > >> >>>> Sent: Tuesday, February 14, 2017 1:01 AM
> > >> >>>> To: dev@flink.apache.org
> > >> >>>> Subject: [DISCUSS] Table API / SQL indicators for event and
> > >> processing
> > >> >>> time
> > >> >>>> Hi,
> > >> >>>>
> > >> >>>> I'd like to start an discussion about how Table API / SQL queries
> > >> >>> indicate
> > >> >>>> whether an operation is done in event or processing time.
> > >> >>>>
> > >> >>>> 1) Why do we need to indicate the time mode?
> > >> >>>>
> > >> >>>> We need to distinguish event time and processing time mode for
> > >> >> operations
> > >> >>>> in queries in order to have the semantics of a query fully
> defined.
> > >> >>>> This cannot be globally done in the TableEnvironment because some
> > >> >> queries
> > >> >>>> explicitly request an expression such as the ORDER BY clause of
> an
> > >> OVER
> > >> >>>> window with PRECEDING / FOLLOWING clauses.
> > >> >>>> So we need a way to specify something like the following query:
> > >> >>>>
> > >> >>>> SELECT
> > >> >>>>    a,
> > >> >>>>    SUM(b) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN 2
> > >> >> PRECEDING
> > >> >>>> AND CURRENT ROW) AS sumB, FROM myStream
> > >> >>>>
> > >> >>>> where "proctime" indicates processing time. Equivalently
> "rowtime"
> > >> >> would
> > >> >>>> indicate event time.
> > >> >>>>
> > >> >>>> 2) Current state
> > >> >>>>
> > >> >>>> The current master branch implements time support only for
> grouping
> > >> >>>> windows in the Table API.
> > >> >>>> Internally, the Table API converts a 'rowtime symbol (which looks
> > >> like
> > >> >> a
> > >> >>>> regular attribute) into a special expression which indicates
> > >> >> event-time.
> > >> >>>> For example:
> > >> >>>>
> > >> >>>> table
> > >> >>>>    .window(Tumble over 5.milli on 'rowtime as 'w)
> > >> >>>>    .groupBy('a, 'w)
> > >> >>>>    .select(...)
> > >> >>>>
> > >> >>>> defines a tumbling event-time window.
> > >> >>>>
> > >> >>>> Processing-time is indicated by omitting a time attribute
> > >> >>>> (table.window(Tumble over 5.milli as 'w) ).
> > >> >>>>
> > >> >>>> 3) How can we do that in SQL?
> > >> >>>>
> > >> >>>> In SQL we cannot add special expressions without touching the
> > parser
> > >> >>> which
> > >> >>>> we don't want to do because we want to stick to the SQL standard.
> > >> >>>> Therefore, I see only two options: adding system attributes or
> > >> >>>> (parameterless) built-in functions. I list some pros and cons of
> > the
> > >> >>>> approaches below:
> > >> >>>>
> > >> >>>> 1. System Attributes:
> > >> >>>> + most natural way to access a property of a record.
> > >> >>>> + works with joins, because time attributes can be related to
> > tables
> > >> >>>> - We need to ensure the attributes are not writable and always
> > >> present
> > >> >> in
> > >> >>>> streaming tables (i.e., they should be system defined
> attributes).
> > >> >>>> - Need to adapt existing Table API expressions (will not change
> the
> > >> API
> > >> >>>> but some parts of the internal translation)
> > >> >>>> - Event time value must be set when the stream is converted,
> > >> processing
> > >> >>>> time is evaluated on the fly
> > >> >>>>
> > >> >>>> 2. Built-in Functions
> > >> >>>> + Users could try to modify time attributes which is not possible
> > >> with
> > >> >>>> functions
> > >> >>>> - do not work with joins, because we need to address different
> > >> >> relations
> > >> >>>> - not a natural way to access a property of a record
> > >> >>>>
> > >> >>>> I think the only viable choice are system attributes, because
> > >> built-in
> > >> >>>> functions cannot be used for joins.
> > >> >>>> However, system attributes are the more complex solution because
> > they
> > >> >>> need
> > >> >>>> a better integration with Calcite's SQL validator (preventing
> user
> > >> >>>> attributes which are named rowtime for instance).
> > >> >>>>
> > >> >>>> Since there are currently a several contributions on the way
> (such
> > as
> > >> >> SQL
> > >> >>>> OVER windows FLINK-5653 to FLINK-5658) that need time indicators,
> > we
> > >> >>> need a
> > >> >>>> solution soon to be able to make progress.
> > >> >>>> There are two PRs, #3252 and #3271, which implement the built-in
> > >> marker
> > >> >>>> functions proctime() and rowtime() and which could serve as a
> > >> temporary
> > >> >>>> solution (since we do not work on joins yet).
> > >> >>>> I would like to suggest to use these functions as a starting
> point
> > >> >> (once
> > >> >>>> the PRs are merged) and later change to the system attribute
> > solution
> > >> >>> which
> > >> >>>> needs a bit more time to be implemented.
> > >> >>>>
> > >> >>>> I talked with Timo today about this issue and he said he would
> like
> > >> to
> > >> >>>> investigate how we can implement this as system functions
> properly
> > >> >>>> integrated with Calcite and the SQL Validator.
> > >> >>>>
> > >> >>>> What do others think?
> > >> >>>>
> > >> >>>> Best, Fabian
> > >> >>>>
> > >>
> > >>
> > >
> >
>

Reply via email to