Hi all, Thanks for the answers, @Fabian.
@Jark, at first I also wanted the users to reassign the timestamp field arbitrarily. However, that means we have to break the current "time system" and create a new one. The blocked watermarks become meaningless and maybe a new WatermarkAssigner should be provided. A little more strict mechanism would be only allowing to use the existing timestamp fields. It sounds reasonable, but will bring an unnecessary barrier to stream/batch SQL, i.e. some SQL works for the batch can not be executed in the stream environment. I just wonder if we could automatically choose a field, which will be used in the following calculations. Not sure if it makes sense. @Shaoxuan @Radu, I totally agree that the "proctime" is the main block for consolidating stream/batch SQL. Though from a general point of view, it can indicate the time to some extent, the randomness property determines that it should never be used in time-sensitive applications. I always believe in that all the information used for query evaluation should be acquired from data itself. Best, Xingcan On Thu, Jul 27, 2017 at 7:24 PM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi Shaoxuan, > > thanks for your comments. I agree with your comment: > > > The problem we used to have is that we have treated eventtime column as a > special timestamp column. > > IMO, an event-time timestamp column is a regular column that is aligned > with the watermarks of the stream. > In order to distinguish watermark aligned columns from others, we need a > special flag in the schema. > When a timestamp column is modified and we cannot guarantee that is it > still aligned with the watermarks, it must lose the special flag and be > treated like any other column. > > Regarding your comments: > 1) I agree, that we can use Long in addition to Timestamp as a timestamp > columns. Since timestamp columns need to be comparable to watermarks which > are Longs, I don't see that other types would make sense. For now, I would > keep the restriction that timestamps can only be of Timestamp type. I > think, extending this to Long would be a follow-up issue to the changes I > proposed here. > 2) Relates to 1) and I agree. if we use a Long attribute as timestamp it > should remain of type Long. For now I would keep converting it to Timestamp > and change that later. > 3) Yes, timestamp columns must be aligned to watermarks. That's their > primary characteristic. How to define watermark strategies is orthogonal to > this discussion, IMO. > 4) From my point of view, proc-time is a purely virtual column and not > related to an actual (data) column. However, it must be part of the schema > and treated like any other attribute for a good user experience and SQL > compliance. In order to be able to join two tables on processing time, it > must be possible to include a processing time column in the schema > definition of the table. Processing time queries can never compute the same > results as batch queries but their semantics should be aligned with > event-time queries. > > Best, Fabian > > 2017-07-27 9:47 GMT+02:00 Radu Tudoran <radu.tudo...@huawei.com>: > > > Hi all, > > > > @Shaoxuan - thanks for the remarks. I have a question regarding your > > suggestion not to consider to create proctime window in a regular > column. I > > think this would be useful though. First you might need to carry the > > timestamp indicator of when the processing happened (for log purposes, > > provenance, traceability ...). Secondly - I do not think it is > > contradicting with the semantics in batch SQL as in SQL you have the > > function "now()" ...which pretty much carry the same semantics as having > a > > function to mark the proctime and then projecting this into a column. If > I > > am not mistaken you can introduce in database columns the result of > calling > > now(). > > > > > > Dr. Radu Tudoran > > Staff Research Engineer - Big Data Expert > > IT R&D Division > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > > German Research Center > > Munich Office > > Riesstrasse 25, 80992 München > > > > E-mail: radu.tudo...@huawei.com > > Mobile: +49 15209084330 > > Telephone: +49 891588344173 > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com > > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, > > Managing Director: Bo PENG, Qiuen Peng, Shengli Wang > > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, > > Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang > > This e-mail and its attachments contain confidential information from > > HUAWEI, which is intended only for the person or entity whose address is > > listed above. Any use of the information contained herein in any way > > (including, but not limited to, total or partial disclosure, > reproduction, > > or dissemination) by persons other than the intended recipient(s) is > > prohibited. If you receive this e-mail in error, please notify the sender > > by phone or email immediately and delete it! > > > > > > -----Original Message----- > > From: Shaoxuan Wang [mailto:shaox...@apache.org] > > Sent: Thursday, July 27, 2017 6:00 AM > > To: Dev > > Subject: Re: [DISCUSS] Table API / SQL internal timestamp handling > > > > Hi Everyone, > > I like this proposal. The problem we used to have is that we have treated > > eventtime column as a special timestamp column. An eventtime column is > > nothing special than all other regular columns, but with a certain flag > > (eventtime-indicator) inferring that this column can be used as an > eventime > > to decide when a bounded query can emit the final result by comparing > with > > a concern associated waterMark. > > > > I have a few comments adding on top of this (they may have already been > > addressed in the conversation — since It’s a long discussion, I may miss > > something): > > > > 1. While we remove timestamp column, we introduce eventtime-indicator > > (we may already have this concept), it is only a flag can be applied > for > > any column (note that some types may not be able to be used as > eventtime > > column), indicating if this column can be used as eventtime or not. > This > > flag is useful for validation and codeGen. > > 2. A column that has been used as an eventtime, should not lose its > own > > type. We should not cast all eventime column to the timestamp type. > For > > instance, if a column is a long type, it will keep as long type even > if > > a > > window aggregate has used it as a eventtime. > > 3. Eventtime will only work well with some associated waterMark > > strategy. We may consider forcing user to provide a waterMark logic on > > his/her selected eventtime. > > 4. For proctime, I hope we should not introduce proctime-indicator for > > regular column. Ideally we should not allow user to create proctime > > window > > on regular column, as this is against the batch query semantics. > > Therefore > > I suggest we should always introduce a proctime timestamp column for > > users > > to create proctime window. And unlike eventtime, proctime does not > need > > any > > associated waterMark strategy, as there is no such out of order issue > > for > > the proctime. > > > > Regards, > > Shaoxuan > > > > On Wed, Jul 26, 2017 at 9:10 PM, Fabian Hueske <fhue...@gmail.com> > wrote: > > > > > Thanks everybody for the replies so far. > > > > > > Let me answer your questions and reply to your thoughts: > > > > > > Radu: > > > --- > > > First of all, although my proposal is movivated by a join operator, > > > this discussion is about timestamp handling, not about joins in > general. > > > > > > - The semantics of outer joins is to emit null and there is no way > > > around that. This is not an issue for us. Actually, outer joins are > > > supported by the batch SQL / Table API. It is true that outer joins > > > might result in null timestamps. Calcite will mark those fields as > > > nullable and we should check that timestamps which are used in windows > > or joins are not nullable. > > > - The query has to explicitly specify which timestamp attribute to use. > > > Otherwise its semantics are not complete and it is invalid. A > > > group-window that follows a join will reference a timestamp attribute > > > and this will be used. The other timestamp might be projected out. > > > When a result with two timestamps is converted into a DataStream, the > > > user has to decide. This could be done inside of the Table to > > > DataStream conversion. If the Table has more than one valid timestamp, > > > the conversion will ask which timestamp to forward. > > > - A proctime join should forward all proctime attributes of the input > > > tables. All will be the same, but that does not matter because they > > > are either virtual or represented as 1 byte dummy attributes. Also, > > > unused ones will be automatically projected out anyway. > > > - An event-time join should forward all event-time attributes of the > > > input tables. Creating a new event-time attribute using processing > > > time makes event-time processing pointless and will give completely > > random results. > > > Event-time is not about the "time an event is created" but about a > > > timestamp that is associated with an event. For example an order event > > > could have three timestamps: "orderTime", "shipTime", and > "receiveTime". > > > Each could be a valid event-time attribute. > > > > > > Jark: > > > --- > > > Thanks for the proposal. I think I understand what you want to achieve > > > with this, but I think functions to instantiate time attributes are > > > not necessary and would make things more complicated. The point of > > > supporting multiple time attributes is to ensure that all of them are > > > aligned with the watermarks. If we add a method ROW_TIME(timestamp) > > > and we don't know if the timestamp is aligned with the watermarks. If > > > that is not the case, the query won't be executed as expected. The > > > issue of LEFT JOIN can easily be addressed by checking for > > > nullablility during optimization when an operator tries to use it. > > > > > > The beauty of supporting multiple timestamps is that a user does not > > > have to care at all about timestamps (or timestamp functions) and > > > watermarks. As long as the query uses a timestamp attribute that was > > > originally declared as rowtime in a source table (and was not modified > > > afterwards), this is fine. Think of a cascade of three windowed joins: > > > R - S - T - U, and you want to join S - T first. In that case, you > > > need to preserve the timestamps of S and T in order to join R and U. > > > From a relational algebra point of view, there is no reason to have a > > > limitation on how these attributes are accessed. Timestamps are just > > > regular fields of a record. The only restriction in the context of > > > stream processing is that the watermark must be aligned with > > > timestamps, i.e., follow all timestamps such that data is not late > > > according to any of the timestamps. This we can achieve and handle > > internally without the user having to worry about it. > > > > > > Xingcan: > > > --- > > > I think your questions are mostly implementation details and not so > > > much related to the original proposal of supporting multiple > timestamps. > > > > > > My take on your questions is: > > > 1. The rate at which watermarks are emitted is not important for the > > > correctness of a query. However, it can affect the performance, > > > because each watermark is sent as a special record and it is > > > broadcasted. My initial take would be to emit a new watermark whenever > > > the operator updated its watermark because usually, the operator would > > > have forwarded the old watermark. > > > 2. I would say this is the responsibility of the operator because > > > first it is not related to the semantics of the query and second it is > > > an operator responsibility in the existing code as well. > > > > > > Jark 2: > > > You are right, the query (or user) must decide on the event-time > > > attribute to use. My main point is, it is much easier for the user > > > (and for us > > > internally) if we internally track multiple timestamps. Because we do > > > not have to prune the timestamp that will not be later used into the > > join. > > > Moreover, both timestamps might be used later (see join example, which > > > could be reordered of course). All we have to do is to ensure that all > > > timestamps are aligned with the watermarks. > > > > > > Radu 2: > > > IMO, time (or anything else that affects the semantics) should never > > > be decided by the system. When we would do that, a query is not fully > > > specified or, even worse, the way it is executed is semantically > > > incorrect and produces arbitrary results. > > > > > > Time attributes should be specified in the source tables and then > > > forwarded from there. So far I haven't seen an example where this > > > would not be possible (within the semantics or relational queries). If > > > we do that right, there won't be a need for explicit time management > > > except for the definition of the initial timestamps which can be > > > hidden in the table definition. As I said before, we (or the system) > > > cannot decide on the timestamp because that would lead to arbitrary > > > results. Asking the user to do that would mean explicit time > > > management which is also not desirable. I think my proposal gives > > > users all options (timestamps) to chose from and the system can do the > > rest. > > > > > > Best, Fabian > > > > > > 2017-07-26 10:46 GMT+02:00 Radu Tudoran <radu.tudo...@huawei.com>: > > > > > > > Hi everyone, > > > > > > > > I just want to add that I was referring to NULL values not > > > > specifically > > > to > > > > timefields but to the event itself. If you have the follow situation > > > > > > > > Stream 1: .... | event1 | .... > > > > Stream 2: .... | | .... > > > > > > > > And you have a LEFT JOIN between stream 1 and stream 2 (no > > > > condition)...then you still need to emit (event1,null) ... as this > > > > is the behavior of left join. This is maybe a very simple situation, > > > > but the > > > point > > > > is that left joins and right joins can have situation when you have > > > > elements only in the main stream and no element in the right stream. > > > > And for this case you still need to emit. > > > > > > > > > > > > Regarding whether time should be decided by system or not...i think > > > > the answer is it depends. I think the example from Jack is very good > > > > and > > > shows > > > > the need for some mechanisms to select/manage the time (I like the > > > proposal > > > > of having functions to insert the time in the output!). However, if > > > > a business analyst would write a query without explicit time > > > > management we still need to have some default behavior in the > > > > system. As per my initial proposal, I think we need to decide on > > > > one timestamp field to carry (either a new one at the moment of the > > > > join) or the timestamp from the > > > main > > > > stream (...although I am not sure which one is the main stream in > > > > the > > > case > > > > of a full join:) ) > > > > > > > > > > > > Dr. Radu Tudoran > > > > Staff Research Engineer - Big Data Expert IT R&D Division > > > > > > > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > > > > German Research Center > > > > Munich Office > > > > Riesstrasse 25, 80992 München > > > > > > > > E-mail: radu.tudo...@huawei.com > > > > Mobile: +49 15209084330 > > > > Telephone: +49 891588344173 > > > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > > > > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com > > > > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, > > > > Managing Director: Bo PENG, Qiuen Peng, Shengli Wang > > > > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, > > > > Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang > > > > This e-mail and its attachments contain confidential information from > > > > HUAWEI, which is intended only for the person or entity whose address > > is > > > > listed above. Any use of the information contained herein in any way > > > > (including, but not limited to, total or partial disclosure, > > > reproduction, > > > > or dissemination) by persons other than the intended recipient(s) is > > > > prohibited. If you receive this e-mail in error, please notify the > > sender > > > > by phone or email immediately and delete it! > > > > > > > > -----Original Message----- > > > > From: Jark Wu [mailto:j...@apache.org] > > > > Sent: Wednesday, July 26, 2017 8:29 AM > > > > To: dev@flink.apache.org > > > > Subject: Re: [DISCUSS] Table API / SQL internal timestamp handling > > > > > > > > Hi Xingcan, > > > > > > > > IMO, I don't think event-time of join results could be automatically > > > > decided by system. Considering batch tables, if users want a event > time > > > > window aggregation after join, user must specify the time field > > > explicitly > > > > (T1.rowtime or T2.rowtime or the computed result of them). So in the > > case > > > > of streaming tables, the system also can't automatically decide the > > time > > > > field for users. > > > > > > > > In regards to the question you asked, I think we don't need to change > > the > > > > watermark no matter we choose the left rowtime or right rowtime or > the > > > > combination. Because the watermark has been aligned with the rowtime > in > > > the > > > > source. Maybe I'm wrong about this, please correct me if I'm missing > > > > something. > > > > > > > > What do you think? > > > > > > > > Regards, > > > > Jark > > > > > > > > 2017-07-26 11:24 GMT+08:00 Xingcan Cui <xingc...@gmail.com>: > > > > > > > > > Hi all, > > > > > > > > > > @Fabian, thanks for raising this. > > > > > > > > > > @Radu and Jark, personally I think the timestamp field is critical > > for > > > > > query processing and thus should be declared as (or supposed to be) > > > > > NOT NULL. In addition, I think the event-time semantic of the join > > > > > results should be automatically decided by the system, i.e., we do > > not > > > > > hand it over to users so to avoid some unpredictable assignment. > > > > > > > > > > Generally speaking, consolidating different time fields is possible > > > > > since all of them should ideally be monotonically increasing. From > my > > > > > point of view, the problem lies in > > > > > (1) what's the relationship between the old and new watermarks. > Shall > > > > > they be one-to-one mapping or the new watermarks could skip some > > > > > timestamps? And (2) who is in charge of emitting the blocked > > > > > watermarks, the operator or the process function? > > > > > > > > > > I'd like to hear from you. > > > > > > > > > > Best, > > > > > Xingcan > > > > > > > > > > > > > > > > > > > > On Wed, Jul 26, 2017 at 10:40 AM, Jark Wu <j...@apache.org> wrote: > > > > > > > > > > > Hi, > > > > > > > > > > > > Radu's concerns make sense to me, especially the null value > > > > > > timestamp and multi-proctime. > > > > > > > > > > > > I have also something in my mind. I would like to propose some > time > > > > > > indicator built-in functions, e.g. ROW_TIME(Timestamp ts) will > > > > > > generate a event time logical attribute, PROC_TIME() will > generate > > a > > > > > > processing time logical attribute. It is similar to > TUMBLE_ROWTIME > > > > > > proposed in this PR https://github.com/apache/flink/pull/4199. > > These > > > > > > can be used in any queries, but there still can't be more than > one > > > > > > rowtime attribute or more than one proctime attribute in a table > > > > schema. > > > > > > > > > > > > The both selected timestamp fields from a JOIN query will be > > > > > materialized. > > > > > > If someone needs further down the computation based on the event > > > > > > time, > > > > > they > > > > > > need to create a new time attribute using the ROW_TIME(...) > > > > > > function. And this can also solve the null timestamp problem in > > LEFT > > > > > > JOIN, because we > > > > > can > > > > > > use a user defined function to combine the two rowtimes and make > > the > > > > > result > > > > > > as the event time attribute, e.g. SELECT ROW_TIME(udf(T1.rowtime, > > > > > > T2.rowtime)) as rowtime FROM T1 JOIN T2 ... > > > > > > > > > > > > > > > > > > What do you think? > > > > > > > > > > > > > > > > > > 2017-07-25 23:48 GMT+08:00 Radu Tudoran <radu.tudo...@huawei.com > >: > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > I think this is an interesting discussion and I would like to > add > > > > > > > some issues and give some feedback. > > > > > > > > > > > > > > - For supporting the join we do not only need to think of the > > time > > > > > > > but also on the null values. For example if you have a LEFT (or > > > > > > > RIGHT) JOIN between items of 2 input streams, and the secondary > > > > > > > input is not > > > > > > available > > > > > > > you should still emit Row.of(event1, null)...as far as I know > if > > > > > > > you > > > > > need > > > > > > > to serialize/deserialize null values to send them they do not > > > > > > > work. So > > > > > we > > > > > > > should include this scenario in the discussions -If we will > have > > > > > > > multiple timestamp in an (output) event, one question > > > > > is > > > > > > > how to select afterwards which is the primary time field on > which > > > > > > > to operate. When we describe a query we might be able to > specify > > > > > > > (or we > > > > > get > > > > > > > this implicitly if we implement the carryon of the 2 > timestamps) > > > > > Select > > > > > > > T1.rowtime, T2.rowtime ...but if the output of a query is the > > > > > > > input of > > > > > a > > > > > > > new processing pipeline, then, do we support generally also > that > > > > > > > the > > > > > > input > > > > > > > has 2 time fields? ...how do we deal with the 2 input fields > > > > > > > (maybe I > > > > > am > > > > > > > missing something) further in the datastream pipeline that we > > > > > > > build > > > > > based > > > > > > > on the output? > > > > > > > - For the case of proctime - do we need to carry 2 proctimes > (the > > > > > > > proctimes of the incoming events from each stream), or 1 > proctime > > > > > > > (as > > > > > we > > > > > > > operate on proctime and the combination of the 2 inputs can be > > > > > considered > > > > > > > as a new event, the current proctime on the machine can be > > > > > > > considered > > > > > the > > > > > > > (proc)time reference for output event) or 3 proctimes (the 2 > > > > > > > proctimes > > > > > of > > > > > > > the input plus the proctime when the new event was created)? > > > > > > > -Similar with the point above, for even time (which I am > > > > > > > understanding > > > > > as > > > > > > > the time when the event was created...or do we understand them > as > > > > > > > a > > > > > time > > > > > > > carry within the event?) - when we join 2 events and output an > > > > > > > event > > > > > that > > > > > > > is the result of the join - isn't this a new event detach from > > the > > > > > > > source\input events? ... I would tend to say it is a new event > > and > > > > > > > then > > > > > > as > > > > > > > for proctime the event time of the new event is the current > time > > > > > > > when > > > > > > this > > > > > > > output event was created. If we would accept this hypothesis > then > > > > > > > we > > > > > > would > > > > > > > not need the 2 time input fields to be carried/managed > > implicitly. > > > > > > > If someone needs further down the computation pipeline, then in > > > > > > > the query > > > > > > they > > > > > > > would be selected explicitly from the input stream and > projected > > > > > > > in > > > > > some > > > > > > > fields to be carried (Select T1.rowtime as FormerTime1, > > T2.rowtime > > > > > > > as FormerTime2, .... JOIN T1, T2...)...but they would not have > > the > > > > > timestamp > > > > > > > logic > > > > > > > > > > > > > > ..my 2 cents > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Dr. Radu Tudoran > > > > > > > Staff Research Engineer - Big Data Expert IT R&D Division > > > > > > > > > > > > > > > > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > > > > > > > German Research Center > > > > > > > Munich Office > > > > > > > Riesstrasse 25, 80992 München > > > > > > > > > > > > > > E-mail: radu.tudo...@huawei.com > > > > > > > Mobile: +49 15209084330 > > > > > > > Telephone: +49 891588344173 > > > > > > > > > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > > > > > > > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com > > > > > > > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB > > > 56063, > > > > > > > Managing Director: Bo PENG, Qiuen Peng, Shengli Wang > > > > > > > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB > > > 56063, > > > > > > > Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang > > > > > > > This e-mail and its attachments contain confidential > information > > > from > > > > > > > HUAWEI, which is intended only for the person or entity whose > > > address > > > > > is > > > > > > > listed above. Any use of the information contained herein in > any > > > way > > > > > > > (including, but not limited to, total or partial disclosure, > > > > > > reproduction, > > > > > > > or dissemination) by persons other than the intended > recipient(s) > > > is > > > > > > > prohibited. If you receive this e-mail in error, please notify > > the > > > > > sender > > > > > > > by phone or email immediately and delete it! > > > > > > > > > > > > > > -----Original Message----- > > > > > > > From: Fabian Hueske [mailto:fhue...@gmail.com] > > > > > > > Sent: Tuesday, July 25, 2017 4:22 PM > > > > > > > To: dev@flink.apache.org > > > > > > > Subject: [DISCUSS] Table API / SQL internal timestamp handling > > > > > > > > > > > > > > Hi everybody, > > > > > > > > > > > > > > I'd like to propose and discuss some changes in the way how the > > > Table > > > > > API > > > > > > > / SQL internally handles timestamps. > > > > > > > > > > > > > > The Table API is implemented on top of the DataStream API. The > > > > > DataStream > > > > > > > API hides timestamps from users in order to ensure that > > timestamps > > > > and > > > > > > > watermarks are aligned. Instead users assign timestamps and > > > > watermarks > > > > > > once > > > > > > > (usually at the source or in a subsequent operator) and let the > > > > system > > > > > > > handle the timestamps from there on. Timestamps are stored in > the > > > > > > timestamp > > > > > > > field of the StreamRecord which is a holder for the user record > > and > > > > the > > > > > > > timestamp. DataStream operators that depend on time > > (time-windows, > > > > > > process > > > > > > > function, ...) access the timestamp from the StreamRecord. > > > > > > > > > > > > > > In contrast to the DataSteam API, the Table API and SQL are > aware > > > of > > > > > the > > > > > > > semantics of a query. I.e., we can analyze how users access > > > > timestamps > > > > > > and > > > > > > > whether they are modified or not. Another difference is that > the > > > > > > timestamp > > > > > > > must be part of the schema of a table in order to have correct > > > query > > > > > > > semantics. > > > > > > > > > > > > > > The current design to handle timestamps is as follows. The > Table > > > API > > > > > > > stores timestamps in the timestamp field of the StreamRecord. > > > > > Therefore, > > > > > > > timestamps are detached from the remaining data which is stored > > in > > > > Row > > > > > > > objects. Hence, the physical representation of a row is > different > > > > from > > > > > > its > > > > > > > logical representation. We introduced a translation layer > > > (RowSchema) > > > > > to > > > > > > > convert logical schema into physical schema. This is necessery > > for > > > > > > > serialization or code generation when the logical plan is > > > translated > > > > > > into a > > > > > > > physical execution plan. Processing-time timestamps are > similarly > > > > > > handled. > > > > > > > They are not included in the physical schema and looked up when > > > > needed. > > > > > > > This design also requires that we need to materialize > timestamps > > > when > > > > > > they > > > > > > > are accessed by expressions. Timestamp materialization is done > > as a > > > > > > > pre-optimization step. > > > > > > > > > > > > > > While thinking about the implementation of the event-time > > windowed > > > > > > > stream-stream join [1] I stumbled over the question which > > timestamp > > > > of > > > > > > both > > > > > > > input tables to forward. With the current design, we could only > > > have > > > > a > > > > > > > single timestamp, so keeping both timestamps would not be > > possible. > > > > The > > > > > > > choice of the timestamp would need to be specified by the query > > > > > otherwise > > > > > > > it would lack clear semantics. When executing the join, the > join > > > > > operator > > > > > > > would need to make sure that no late data is emitted. This > would > > > only > > > > > > work > > > > > > > the operator was able to hold back watermarks [2]. > > > > > > > > > > > > > > With this information in mind, I'd like to discuss the > following > > > > > > proposal: > > > > > > > > > > > > > > - We allow more than one event-time timestamp and store them > > > directly > > > > > in > > > > > > > the Row > > > > > > > - The query operators ensure that the watermarks are always > > behind > > > > all > > > > > > > event-time timestamps. With additional analysis we will be able > > to > > > > > > restrict > > > > > > > this to timestamps that are actually used as such. > > > > > > > - When a DataStream operator is time-based (e.g., a DataStream > > > > > > > time-windows), we inject an operator that copies the timestamp > > from > > > > the > > > > > > Row > > > > > > > into the StreamRecord. > > > > > > > - We try to remove the distinction between logical and physical > > > > schema. > > > > > > > For event-time timestamps this is because we store them in the > > Row > > > > > > object, > > > > > > > for processing-time timestamps, we add a dummy byte field. When > > > > > > accessing a > > > > > > > field of this type, the code generator injects the code to > fetch > > > the > > > > > > > timestamps. > > > > > > > - We might be able to get around the pre-optimization time > > > > > > materialization > > > > > > > step. > > > > > > > - A join result would be able to keep both timestamps. The > > > watermark > > > > > > would > > > > > > > be hold back for both so both could be used in subsequent > > > operations. > > > > > > > > > > > > > > I admit, I haven't thought this completely through. > > > > > > > However, the benefits of this design from my point of view are: > > > > > > > - encoding of timestamps in Rows means that the logical schema > is > > > > equal > > > > > > to > > > > > > > the physical schema > > > > > > > - no timestamp materialization > > > > > > > - support for multiple timestamps. Otherwise we would need to > > > expose > > > > > > > internal restrictions to the user which are hard to explain / > > > > > > communicate. > > > > > > > - no need to change any public interfaces at the moment. > > > > > > > > > > > > > > The drawbacks as far as I see them are: > > > > > > > - additional payload due to unused timestamp field + possibly > the > > > > > > > processing-time dummy field > > > > > > > - complete rework of the internal timestamp logic (again...) > > > > > > > > > > > > > > Please let me know what you think, > > > > > > > Fabian > > > > > > > > > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-6233 > > > > > > > [2] https://issues.apache.org/jira/browse/FLINK-7245 > > > > > > > > > > > > > > > > > > > > > > > > > > > >