Hi all,

Thanks for the answers, @Fabian.

@Jark, at first I also wanted the users to reassign the timestamp field
arbitrarily. However, that means we have to break the current "time system"
and create a new one. The blocked watermarks become meaningless and maybe a
new WatermarkAssigner should be provided. A little more strict mechanism
would be only allowing to use the existing timestamp fields. It sounds
reasonable, but will bring an unnecessary barrier to stream/batch SQL, i.e.
some SQL works for the batch can not be executed in the stream environment.
I just wonder if we could automatically choose a field, which will be used
in the following calculations. Not sure if it makes sense.

@Shaoxuan @Radu, I totally agree that the "proctime" is the main block for
consolidating stream/batch SQL. Though from a general point of view, it can
indicate the time to some extent, the randomness property determines that
it should never be used in time-sensitive applications. I always believe in
that all the information used for query evaluation should be acquired from
data itself.

Best,
Xingcan

On Thu, Jul 27, 2017 at 7:24 PM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Shaoxuan,
>
> thanks for your comments. I agree with your comment:
>
> > The problem we used to have is that we have treated eventtime column as a
> special timestamp column.
>
> IMO, an event-time timestamp column is a regular column that is aligned
> with the watermarks of the stream.
> In order to distinguish watermark aligned columns from others, we need a
> special flag in the schema.
> When a timestamp column is modified and we cannot guarantee that is it
> still aligned with the watermarks, it must lose the special flag and be
> treated like any other column.
>
> Regarding your comments:
> 1) I agree, that we can use Long in addition to Timestamp as a timestamp
> columns. Since timestamp columns need to be comparable to watermarks which
> are Longs, I don't see that other types would make sense. For now, I would
> keep the restriction that timestamps can only be of Timestamp type. I
> think, extending this to Long would be a follow-up issue to the changes I
> proposed here.
> 2) Relates to 1) and I agree. if we use a Long attribute as timestamp it
> should remain of type Long. For now I would keep converting it to Timestamp
> and change that later.
> 3) Yes, timestamp columns must be aligned to watermarks. That's their
> primary characteristic. How to define watermark strategies is orthogonal to
> this discussion, IMO.
> 4) From my point of view, proc-time is a purely virtual column and not
> related to an actual (data) column. However, it must be part of the schema
> and treated like any other attribute for a good user experience and SQL
> compliance. In order to be able to join two tables on processing time, it
> must be possible to include a processing time column in the schema
> definition of the table. Processing time queries can never compute the same
> results as batch queries but their semantics should be aligned with
> event-time queries.
>
> Best, Fabian
>
> 2017-07-27 9:47 GMT+02:00 Radu Tudoran <radu.tudo...@huawei.com>:
>
> > Hi all,
> >
> > @Shaoxuan - thanks for the  remarks. I have a question regarding your
> > suggestion not to consider to create proctime window in a regular
> column. I
> > think this would be useful though. First you might need to carry the
> > timestamp indicator of when the processing happened (for log purposes,
> > provenance, traceability ...). Secondly - I do not think it is
> > contradicting with the semantics in batch SQL as in SQL you have the
> > function "now()" ...which pretty much carry the same semantics as having
> a
> > function to mark the proctime and then projecting this into a column. If
> I
> > am not mistaken you can introduce in database columns the result of
> calling
> > now().
> >
> >
> > Dr. Radu Tudoran
> > Staff Research Engineer - Big Data Expert
> > IT R&D Division
> >
> >
> > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > German Research Center
> > Munich Office
> > Riesstrasse 25, 80992 München
> >
> > E-mail: radu.tudo...@huawei.com
> > Mobile: +49 15209084330
> > Telephone: +49 891588344173
> >
> > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> > Managing Director: Bo PENG, Qiuen Peng, Shengli Wang
> > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> > Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang
> > This e-mail and its attachments contain confidential information from
> > HUAWEI, which is intended only for the person or entity whose address is
> > listed above. Any use of the information contained herein in any way
> > (including, but not limited to, total or partial disclosure,
> reproduction,
> > or dissemination) by persons other than the intended recipient(s) is
> > prohibited. If you receive this e-mail in error, please notify the sender
> > by phone or email immediately and delete it!
> >
> >
> > -----Original Message-----
> > From: Shaoxuan Wang [mailto:shaox...@apache.org]
> > Sent: Thursday, July 27, 2017 6:00 AM
> > To: Dev
> > Subject: Re: [DISCUSS] Table API / SQL internal timestamp handling
> >
> >  Hi Everyone,
> > I like this proposal. The problem we used to have is that we have treated
> > eventtime column as a special timestamp column. An eventtime column is
> > nothing special than all other regular columns, but with a certain flag
> > (eventtime-indicator) inferring that this column can be used as an
> eventime
> > to decide when a bounded query can emit the final result by comparing
> with
> > a concern associated waterMark.
> >
> > I have a few comments adding on top of this (they may have already been
> > addressed in the conversation — since It’s a long discussion, I may miss
> > something):
> >
> >    1. While we remove timestamp column, we introduce eventtime-indicator
> >    (we may already have this concept), it is only a flag can be applied
> for
> >    any column (note that some types may not be able to be used as
> eventtime
> >    column), indicating if this column can be used as eventtime or not.
> This
> >    flag is useful for validation and codeGen.
> >    2. A column that has been used as an eventtime, should not lose its
> own
> >    type. We should not cast all eventime column to the timestamp type.
> For
> >    instance, if a column is a long type, it will keep as long type even
> if
> > a
> >    window aggregate has used it as a eventtime.
> >    3. Eventtime will only work well with some associated waterMark
> >    strategy. We may consider forcing user to provide a waterMark logic on
> >    his/her selected eventtime.
> >    4. For proctime, I hope we should not introduce proctime-indicator for
> >    regular column. Ideally we should not allow user to create proctime
> > window
> >    on regular column, as this is against the batch query semantics.
> > Therefore
> >    I suggest we should always introduce a proctime timestamp column for
> > users
> >    to create proctime window. And unlike eventtime, proctime does not
> need
> > any
> >    associated waterMark strategy, as there is no such out of order issue
> > for
> >    the proctime.
> >
> > Regards,
> > Shaoxuan
> >
> > On Wed, Jul 26, 2017 at 9:10 PM, Fabian Hueske <fhue...@gmail.com>
> wrote:
> >
> > > Thanks everybody for the replies so far.
> > >
> > > Let me answer your questions and reply to your thoughts:
> > >
> > > Radu:
> > > ---
> > > First of all, although my proposal is movivated by a join operator,
> > > this discussion is about timestamp handling, not about joins in
> general.
> > >
> > > - The semantics of outer joins is to emit null and there is no way
> > > around that. This is not an issue for us. Actually, outer joins are
> > > supported by the batch SQL / Table API. It is true that outer joins
> > > might result in null timestamps. Calcite will mark those fields as
> > > nullable and we should check that timestamps which are used in windows
> > or joins are not nullable.
> > > - The query has to explicitly specify which timestamp attribute to use.
> > > Otherwise its semantics are not complete and it is invalid. A
> > > group-window that follows a join will reference a timestamp attribute
> > > and this will be used. The other timestamp might be projected out.
> > > When a result with two timestamps is converted into a DataStream, the
> > > user has to decide. This could be done inside of the Table to
> > > DataStream conversion. If the Table has more than one valid timestamp,
> > > the conversion will ask which timestamp to forward.
> > > - A proctime join should forward all proctime attributes of the input
> > > tables. All will be the same, but that does not matter because they
> > > are either virtual or represented as 1 byte dummy attributes. Also,
> > > unused ones will be automatically projected out anyway.
> > > - An event-time join should forward all event-time attributes of the
> > > input tables. Creating a new event-time attribute using processing
> > > time makes event-time processing pointless and will give completely
> > random results.
> > > Event-time is not about the "time an event is created" but about a
> > > timestamp that is associated with an event. For example an order event
> > > could have three timestamps: "orderTime", "shipTime", and
> "receiveTime".
> > > Each could be a valid event-time attribute.
> > >
> > > Jark:
> > > ---
> > > Thanks for the proposal. I think I understand what you want to achieve
> > > with this, but I think functions to instantiate time attributes are
> > > not necessary and would make things more complicated. The point of
> > > supporting multiple time attributes is to ensure that all of them are
> > > aligned with the watermarks. If we add a method ROW_TIME(timestamp)
> > > and we don't know if the timestamp is aligned with the watermarks. If
> > > that is not the case, the query won't be executed as expected. The
> > > issue of LEFT JOIN can easily be addressed by checking for
> > > nullablility during optimization when an operator tries to use it.
> > >
> > > The beauty of supporting multiple timestamps is that a user does not
> > > have to care at all about timestamps (or timestamp functions) and
> > > watermarks. As long as the query uses a timestamp attribute that was
> > > originally declared as rowtime in a source table (and was not modified
> > > afterwards), this is fine. Think of a cascade of three windowed joins:
> > > R - S - T - U, and you want to join S - T first. In that case, you
> > > need to preserve the timestamps of S and T in order to join R and U.
> > > From a relational algebra point of view, there is no reason to have a
> > > limitation on how these attributes are accessed. Timestamps are just
> > > regular fields of a record. The only restriction in the context of
> > > stream processing is that the watermark must be aligned with
> > > timestamps, i.e., follow all timestamps such that data is not late
> > > according to any of the timestamps. This we can achieve and handle
> > internally without the user having to worry about it.
> > >
> > > Xingcan:
> > > ---
> > > I think your questions are mostly implementation details and not so
> > > much related to the original proposal of supporting multiple
> timestamps.
> > >
> > > My take on your questions is:
> > > 1. The rate at which watermarks are emitted is not important for the
> > > correctness of a query. However, it can affect the performance,
> > > because each watermark is sent as a special record and it is
> > > broadcasted. My initial take would be to emit a new watermark whenever
> > > the operator updated its watermark because usually, the operator would
> > > have forwarded the old watermark.
> > > 2. I would say this is the responsibility of the operator because
> > > first it is not related to the semantics of the query and second it is
> > > an operator responsibility in the existing code as well.
> > >
> > > Jark 2:
> > > You are right, the query (or user) must decide on the event-time
> > > attribute to use. My main point is, it is much easier for the user
> > > (and for us
> > > internally) if we internally track multiple timestamps. Because we do
> > > not have to prune the timestamp that will not be later used into the
> > join.
> > > Moreover, both timestamps might be used later (see join example, which
> > > could be reordered of course). All we have to do is to ensure that all
> > > timestamps are aligned with the watermarks.
> > >
> > > Radu 2:
> > > IMO, time (or anything else that affects the semantics) should never
> > > be decided by the system. When we would do that, a query is not fully
> > > specified or, even worse, the way it is executed is semantically
> > > incorrect and produces arbitrary results.
> > >
> > > Time attributes should be specified in the source tables and then
> > > forwarded from there. So far I haven't seen an example where this
> > > would not be possible (within the semantics or relational queries). If
> > > we do that right, there won't be a need for explicit time management
> > > except for the definition of the initial timestamps which can be
> > > hidden in the table definition. As I said before, we (or the system)
> > > cannot decide on the timestamp because that would lead to arbitrary
> > > results. Asking the user to do that would mean explicit time
> > > management which is also not desirable. I think my proposal gives
> > > users all options (timestamps) to chose from and the system can do the
> > rest.
> > >
> > > Best, Fabian
> > >
> > > 2017-07-26 10:46 GMT+02:00 Radu Tudoran <radu.tudo...@huawei.com>:
> > >
> > > > Hi everyone,
> > > >
> > > > I just want to add that I was referring to NULL values not
> > > > specifically
> > > to
> > > > timefields but to the event itself. If you have the follow situation
> > > >
> > > > Stream 1:     .... |    event1   | ....
> > > > Stream 2:     .... |             | ....
> > > >
> > > > And you have a LEFT JOIN between stream 1 and stream 2 (no
> > > > condition)...then you still need to emit (event1,null) ... as this
> > > > is the behavior of left join. This is maybe a very simple situation,
> > > > but the
> > > point
> > > > is that left joins and right joins can have situation when you have
> > > > elements only in the main stream and no element in the right stream.
> > > > And for this case you still need to emit.
> > > >
> > > >
> > > > Regarding whether time should be decided by system or not...i think
> > > > the answer is it depends. I think the example from Jack is very good
> > > > and
> > > shows
> > > > the need for some mechanisms to select/manage the time (I like the
> > > proposal
> > > > of having functions to insert the time in the output!). However, if
> > > > a business analyst would write a query without explicit time
> > > > management we still need to have some default behavior in the
> > > > system. As per my initial proposal, I think  we need to decide on
> > > > one timestamp field to carry (either a new one at the moment of the
> > > > join) or the timestamp from the
> > > main
> > > > stream  (...although I am not sure which one is the main stream in
> > > > the
> > > case
> > > > of a full join:) )
> > > >
> > > >
> > > > Dr. Radu Tudoran
> > > > Staff Research Engineer - Big Data Expert IT R&D Division
> > > >
> > > >
> > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > > > German Research Center
> > > > Munich Office
> > > > Riesstrasse 25, 80992 München
> > > >
> > > > E-mail: radu.tudo...@huawei.com
> > > > Mobile: +49 15209084330
> > > > Telephone: +49 891588344173
> > > >
> > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > > > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> > > > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> > > > Managing Director: Bo PENG, Qiuen Peng, Shengli Wang
> > > > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> > > > Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang
> > > > This e-mail and its attachments contain confidential information from
> > > > HUAWEI, which is intended only for the person or entity whose address
> > is
> > > > listed above. Any use of the information contained herein in any way
> > > > (including, but not limited to, total or partial disclosure,
> > > reproduction,
> > > > or dissemination) by persons other than the intended recipient(s) is
> > > > prohibited. If you receive this e-mail in error, please notify the
> > sender
> > > > by phone or email immediately and delete it!
> > > >
> > > > -----Original Message-----
> > > > From: Jark Wu [mailto:j...@apache.org]
> > > > Sent: Wednesday, July 26, 2017 8:29 AM
> > > > To: dev@flink.apache.org
> > > > Subject: Re: [DISCUSS] Table API / SQL internal timestamp handling
> > > >
> > > > Hi Xingcan,
> > > >
> > > > IMO, I don't think event-time of join results could be automatically
> > > > decided by system. Considering batch tables, if users want a event
> time
> > > > window aggregation after join, user must specify the time field
> > > explicitly
> > > > (T1.rowtime or T2.rowtime or the computed result of them). So in the
> > case
> > > > of streaming tables, the system also can't automatically decide the
> > time
> > > > field for users.
> > > >
> > > > In regards to the question you asked, I think we don't need to change
> > the
> > > > watermark no matter we choose the left rowtime or right rowtime or
> the
> > > > combination. Because the watermark has been aligned with the rowtime
> in
> > > the
> > > > source. Maybe I'm wrong about this, please correct me if I'm missing
> > > > something.
> > > >
> > > > What do you think?
> > > >
> > > > Regards,
> > > > Jark
> > > >
> > > > 2017-07-26 11:24 GMT+08:00 Xingcan Cui <xingc...@gmail.com>:
> > > >
> > > > > Hi all,
> > > > >
> > > > > @Fabian, thanks for raising this.
> > > > >
> > > > > @Radu and Jark, personally I think the timestamp field is critical
> > for
> > > > > query processing and thus should be declared as (or supposed to be)
> > > > > NOT NULL. In addition, I think the event-time semantic of the join
> > > > > results should be automatically decided by the system, i.e., we do
> > not
> > > > > hand it over to users so to avoid some unpredictable assignment.
> > > > >
> > > > > Generally speaking, consolidating different time fields is possible
> > > > > since all of them should ideally be monotonically increasing. From
> my
> > > > > point of view, the problem lies in
> > > > > (1) what's the relationship between the old and new watermarks.
> Shall
> > > > > they be one-to-one mapping or the new watermarks could skip some
> > > > > timestamps? And (2) who is in charge of emitting the blocked
> > > > > watermarks, the operator or the process function?
> > > > >
> > > > > I'd like to hear from you.
> > > > >
> > > > > Best,
> > > > > Xingcan
> > > > >
> > > > >
> > > > >
> > > > > On Wed, Jul 26, 2017 at 10:40 AM, Jark Wu <j...@apache.org> wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > Radu's concerns make sense to me, especially the null value
> > > > > > timestamp and multi-proctime.
> > > > > >
> > > > > > I have also something in my mind. I would like to propose some
> time
> > > > > > indicator built-in functions, e.g. ROW_TIME(Timestamp ts) will
> > > > > > generate a event time logical attribute, PROC_TIME() will
> generate
> > a
> > > > > > processing time logical attribute. It is similar to
> TUMBLE_ROWTIME
> > > > > > proposed in this PR https://github.com/apache/flink/pull/4199.
> > These
> > > > > > can be used in any queries, but there still can't be more than
> one
> > > > > > rowtime attribute or more than one proctime attribute in a table
> > > > schema.
> > > > > >
> > > > > > The both selected timestamp fields from a JOIN query will be
> > > > > materialized.
> > > > > > If someone needs further down the computation based on the event
> > > > > > time,
> > > > > they
> > > > > > need to create a new time attribute using the ROW_TIME(...)
> > > > > > function. And this can also solve the null timestamp problem in
> > LEFT
> > > > > > JOIN, because we
> > > > > can
> > > > > > use a user defined function to combine the two rowtimes and make
> > the
> > > > > result
> > > > > > as the event time attribute, e.g. SELECT ROW_TIME(udf(T1.rowtime,
> > > > > > T2.rowtime)) as rowtime FROM T1 JOIN T2 ...
> > > > > >
> > > > > >
> > > > > > What do you think?
> > > > > >
> > > > > >
> > > > > > 2017-07-25 23:48 GMT+08:00 Radu Tudoran <radu.tudo...@huawei.com
> >:
> > > > > >
> > > > > > > Hi,
> > > > > > >
> > > > > > > I think this is an interesting discussion and I would like to
> add
> > > > > > > some issues and give some feedback.
> > > > > > >
> > > > > > > - For supporting the join we do not only need to think of the
> > time
> > > > > > > but also on the null values. For example if you have a LEFT (or
> > > > > > > RIGHT) JOIN between items of 2 input streams, and the secondary
> > > > > > > input is not
> > > > > > available
> > > > > > > you should still emit Row.of(event1, null)...as far as I know
> if
> > > > > > > you
> > > > > need
> > > > > > > to serialize/deserialize null values to send them they do not
> > > > > > > work. So
> > > > > we
> > > > > > > should include this scenario in the discussions -If we will
> have
> > > > > > > multiple timestamp in an (output) event, one question
> > > > > is
> > > > > > > how to select afterwards which is the primary time field on
> which
> > > > > > > to operate. When we describe a query we might be able to
> specify
> > > > > > > (or we
> > > > > get
> > > > > > > this implicitly if we implement the carryon of the 2
> timestamps)
> > > > > Select
> > > > > > > T1.rowtime, T2.rowtime ...but if the output of a query is the
> > > > > > > input of
> > > > > a
> > > > > > > new processing pipeline, then, do we support generally also
> that
> > > > > > > the
> > > > > > input
> > > > > > > has 2 time fields? ...how do we deal with the 2 input fields
> > > > > > > (maybe I
> > > > > am
> > > > > > > missing something) further in the datastream pipeline that we
> > > > > > > build
> > > > > based
> > > > > > > on the output?
> > > > > > > - For the case of proctime - do we need to carry 2 proctimes
> (the
> > > > > > > proctimes of the incoming events from each stream), or 1
> proctime
> > > > > > > (as
> > > > > we
> > > > > > > operate on proctime and the combination of the 2 inputs can be
> > > > > considered
> > > > > > > as a new event, the current proctime on the machine can be
> > > > > > > considered
> > > > > the
> > > > > > > (proc)time reference for output event) or 3 proctimes (the 2
> > > > > > > proctimes
> > > > > of
> > > > > > > the input plus the proctime when the new event was created)?
> > > > > > > -Similar with the point above, for even time (which I am
> > > > > > > understanding
> > > > > as
> > > > > > > the time when the event was created...or do we understand them
> as
> > > > > > > a
> > > > > time
> > > > > > > carry within the event?) - when we join 2 events and output an
> > > > > > > event
> > > > > that
> > > > > > > is the result of the join - isn't this a new event detach from
> > the
> > > > > > > source\input events? ... I would tend to say it is a new event
> > and
> > > > > > > then
> > > > > > as
> > > > > > > for proctime the event time of the new event is the current
> time
> > > > > > > when
> > > > > > this
> > > > > > > output event was created. If we would accept this hypothesis
> then
> > > > > > > we
> > > > > > would
> > > > > > > not need the 2 time input fields to be carried/managed
> > implicitly.
> > > > > > > If someone needs further down the computation pipeline, then in
> > > > > > > the query
> > > > > > they
> > > > > > > would be selected explicitly from the input stream and
> projected
> > > > > > > in
> > > > > some
> > > > > > > fields to be carried (Select T1.rowtime as FormerTime1,
> > T2.rowtime
> > > > > > > as FormerTime2, .... JOIN T1, T2...)...but they would not have
> > the
> > > > > timestamp
> > > > > > > logic
> > > > > > >
> > > > > > > ..my 2 cents
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > Dr. Radu Tudoran
> > > > > > > Staff Research Engineer - Big Data Expert IT R&D Division
> > > > > > >
> > > > > > >
> > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > > > > > > German Research Center
> > > > > > > Munich Office
> > > > > > > Riesstrasse 25, 80992 München
> > > > > > >
> > > > > > > E-mail: radu.tudo...@huawei.com
> > > > > > > Mobile: +49 15209084330
> > > > > > > Telephone: +49 891588344173
> > > > > > >
> > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > > > > > > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> > > > > > > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB
> > > 56063,
> > > > > > > Managing Director: Bo PENG, Qiuen Peng, Shengli Wang
> > > > > > > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB
> > > 56063,
> > > > > > > Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang
> > > > > > > This e-mail and its attachments contain confidential
> information
> > > from
> > > > > > > HUAWEI, which is intended only for the person or entity whose
> > > address
> > > > > is
> > > > > > > listed above. Any use of the information contained herein in
> any
> > > way
> > > > > > > (including, but not limited to, total or partial disclosure,
> > > > > > reproduction,
> > > > > > > or dissemination) by persons other than the intended
> recipient(s)
> > > is
> > > > > > > prohibited. If you receive this e-mail in error, please notify
> > the
> > > > > sender
> > > > > > > by phone or email immediately and delete it!
> > > > > > >
> > > > > > > -----Original Message-----
> > > > > > > From: Fabian Hueske [mailto:fhue...@gmail.com]
> > > > > > > Sent: Tuesday, July 25, 2017 4:22 PM
> > > > > > > To: dev@flink.apache.org
> > > > > > > Subject: [DISCUSS] Table API / SQL internal timestamp handling
> > > > > > >
> > > > > > > Hi everybody,
> > > > > > >
> > > > > > > I'd like to propose and discuss some changes in the way how the
> > > Table
> > > > > API
> > > > > > > / SQL internally handles timestamps.
> > > > > > >
> > > > > > > The Table API is implemented on top of the DataStream API. The
> > > > > DataStream
> > > > > > > API hides timestamps from users in order to ensure that
> > timestamps
> > > > and
> > > > > > > watermarks are aligned. Instead users assign timestamps and
> > > > watermarks
> > > > > > once
> > > > > > > (usually at the source or in a subsequent operator) and let the
> > > > system
> > > > > > > handle the timestamps from there on. Timestamps are stored in
> the
> > > > > > timestamp
> > > > > > > field of the StreamRecord which is a holder for the user record
> > and
> > > > the
> > > > > > > timestamp. DataStream operators that depend on time
> > (time-windows,
> > > > > > process
> > > > > > > function, ...) access the timestamp from the StreamRecord.
> > > > > > >
> > > > > > > In contrast to the DataSteam API, the Table API and SQL are
> aware
> > > of
> > > > > the
> > > > > > > semantics of a query. I.e., we can analyze how users access
> > > > timestamps
> > > > > > and
> > > > > > > whether they are modified or not. Another difference is that
> the
> > > > > > timestamp
> > > > > > > must be part of the schema of a table in order to have correct
> > > query
> > > > > > > semantics.
> > > > > > >
> > > > > > > The current design to handle timestamps is as follows. The
> Table
> > > API
> > > > > > > stores timestamps in the timestamp field of the StreamRecord.
> > > > > Therefore,
> > > > > > > timestamps are detached from the remaining data which is stored
> > in
> > > > Row
> > > > > > > objects. Hence, the physical representation of a row is
> different
> > > > from
> > > > > > its
> > > > > > > logical representation. We introduced a translation layer
> > > (RowSchema)
> > > > > to
> > > > > > > convert logical schema into physical schema. This is necessery
> > for
> > > > > > > serialization or code generation when the logical plan is
> > > translated
> > > > > > into a
> > > > > > > physical execution plan. Processing-time timestamps are
> similarly
> > > > > > handled.
> > > > > > > They are not included in the physical schema and looked up when
> > > > needed.
> > > > > > > This design also requires that we need to materialize
> timestamps
> > > when
> > > > > > they
> > > > > > > are accessed by expressions. Timestamp materialization is done
> > as a
> > > > > > > pre-optimization step.
> > > > > > >
> > > > > > > While thinking about the implementation of the event-time
> > windowed
> > > > > > > stream-stream join [1] I stumbled over the question which
> > timestamp
> > > > of
> > > > > > both
> > > > > > > input tables to forward. With the current design, we could only
> > > have
> > > > a
> > > > > > > single timestamp, so keeping both timestamps would not be
> > possible.
> > > > The
> > > > > > > choice of the timestamp would need to be specified by the query
> > > > > otherwise
> > > > > > > it would lack clear semantics. When executing the join, the
> join
> > > > > operator
> > > > > > > would need to make sure that no late data is emitted. This
> would
> > > only
> > > > > > work
> > > > > > > the operator was able to hold back watermarks [2].
> > > > > > >
> > > > > > > With this information in mind, I'd like to discuss the
> following
> > > > > > proposal:
> > > > > > >
> > > > > > > - We allow more than one event-time timestamp and store them
> > > directly
> > > > > in
> > > > > > > the Row
> > > > > > > - The query operators ensure that the watermarks are always
> > behind
> > > > all
> > > > > > > event-time timestamps. With additional analysis we will be able
> > to
> > > > > > restrict
> > > > > > > this to timestamps that are actually used as such.
> > > > > > > - When a DataStream operator is time-based (e.g., a DataStream
> > > > > > > time-windows), we inject an operator that copies the timestamp
> > from
> > > > the
> > > > > > Row
> > > > > > > into the StreamRecord.
> > > > > > > - We try to remove the distinction between logical and physical
> > > > schema.
> > > > > > > For event-time timestamps this is because we store them in the
> > Row
> > > > > > object,
> > > > > > > for processing-time timestamps, we add a dummy byte field. When
> > > > > > accessing a
> > > > > > > field of this type, the code generator injects the code to
> fetch
> > > the
> > > > > > > timestamps.
> > > > > > > - We might be able to get around the pre-optimization time
> > > > > > materialization
> > > > > > > step.
> > > > > > > - A join result would be able to keep both timestamps. The
> > > watermark
> > > > > > would
> > > > > > > be hold back for both so both could be used in subsequent
> > > operations.
> > > > > > >
> > > > > > > I admit, I haven't thought this completely through.
> > > > > > > However, the benefits of this design from my point of view are:
> > > > > > > - encoding of timestamps in Rows means that the logical schema
> is
> > > > equal
> > > > > > to
> > > > > > > the physical schema
> > > > > > > - no timestamp materialization
> > > > > > > - support for multiple timestamps. Otherwise we would need to
> > > expose
> > > > > > > internal restrictions to the user which are hard to explain /
> > > > > > communicate.
> > > > > > > - no need to change any public interfaces at the moment.
> > > > > > >
> > > > > > > The drawbacks as far as I see them are:
> > > > > > > - additional payload due to unused timestamp field + possibly
> the
> > > > > > > processing-time dummy field
> > > > > > > - complete rework of the internal timestamp logic (again...)
> > > > > > >
> > > > > > > Please let me know what you think,
> > > > > > > Fabian
> > > > > > >
> > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-6233
> > > > > > > [2] https://issues.apache.org/jira/browse/FLINK-7245
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to