Hi everybody, I created FLINK-7337 and will work on this.
Best, Fabian [1] https://issues.apache.org/jira/browse/FLINK-7337 2017-08-01 11:36 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: > Hi, > > I think the implementation of the join operator should not depend on the > synchronization of the watermarks. > If we need to buffer a stream because we have to wait for future records > from the other stream to join them, then that's how it is. We cannot change > the semantics of the query. > It might be possible to apply backpressure to the source to delay one > stream, but this is a very delicate thing to do and can easily lead to > deadlocks and can have unpredictable side effects. So I would just consume > the stream and put it into the state. > > From my point of view, timestamps cannot be "irreconcilable" because the > query semantics and input streams are given. > A join will always need to buffer some records. It is true that the join > operator might need to buffer a lot of records if the input streams and > join predicate are not aligned. > IMO, This is fine as long as the query has enough resources. Once it runs > out of resources, it simply fails and can be restarted with more resources > or adapted. > > Cheers, Fabian > > > 2017-08-01 6:04 GMT+02:00 Xingcan Cui <xingc...@gmail.com>: > >> Hi Shaoxuan, >> >> I really appreciate your prompt reply. What you explained makes sense to >> me. >> >> There is only one point that I got some different ideas about "we have to >> buffer >> all the delta data between watermarks of two inputs". >> >> Consider the following SQL on joining two streams l and r: >> >> SELECT * FROM l, r >> WHERE l.name = r.name >> AND l.ts BETWEEN r.ts - INTERVAL '600' MINUTE >> AND r.ts - INTERVAL '599' MINUTE; >> >> This query is valid since it holds both an equi-key and a time span >> restriction. >> >> There are two different situations to execute the query: (1) if the >> timestamps of >> l and r are synchronized, e.g., they both contain new generated events, we >> must >> buffer the l stream for 600 minutes; and (2) if there exists a natural >> offset of the two >> streams, e.g., the r stream is new generated while the l stream is sourced >> from >> a event queue generated 10 hours ago, it is unnecessary to buffer so much >> data. >> >> That raises the question. What if the timestamps of the two streams are >> essentially >> “irreconcilable"? >> >> Best, >> Xingcan >> >> On Mon, Jul 31, 2017 at 10:42 PM, Shaoxuan Wang <wshaox...@gmail.com> >> wrote: >> >> > Xingcan, >> > Watermark is the “estimate of completion”. User defines the waterMark >> based >> > on the best estimation per each input of when it pretty much sees all >> the >> > data. It is usually calculated by the event timestamp. >> > When we do a windowed join, we have to make sure the watermark for both >> > inputs are received before emit a window result at this watermark. If >> the >> > two inputs have large difference, say "one for today and the other one >> > for yesterday" as you pointed out, the watermark for the windowed join >> > operator is just yesterday. I guess this is what Fabian means "In case >> of >> > a join, the smallest future timestamp depends on two fields and not >> just on >> > one." In the windowed join cases, we have to buffer all the delta data >> > between watermarks of two inputs. It is the user's responsibility (if >> > she/he wants to reduce the cost) to align watermarks of the stream >> sources >> > as much as possible. >> > >> > Regards, >> > Shaoxuan >> > >> > >> > On Mon, Jul 31, 2017 at 10:09 PM, Xingcan Cui <xingc...@gmail.com> >> wrote: >> > >> > > Hi Fabian, >> > > >> > > I got a similar question with Jark. Theoretically, the row times of >> two >> > > streams >> > > could be quite difference, e.g., one for today and the other one for >> > > yesterday. >> > > How can we align them? >> > > >> > > Best, >> > > Xingcan >> > > >> > > On Mon, Jul 31, 2017 at 9:04 PM, Fabian Hueske <fhue...@gmail.com> >> > wrote: >> > > >> > > > Hi Jark, >> > > > >> > > > yes, the handling of watermarks is very tricky. It is not directly >> > > related >> > > > to the proposal which is only about the representation of timestamps >> > but >> > > > becomes important for event-time joins. >> > > > We have a JIRA about an operator that is able to hold back >> watermarks >> > > [1]. >> > > > >> > > > Roughly the idea is to track the smallest timestamp that will be >> > emitted >> > > in >> > > > the future and align the watermark to this timestamp. >> > > > For this we need to know the semantics of the operator (which >> timestamp >> > > > will be emitted in the future) but this will be given for relational >> > > > operators. >> > > > The new operator could emit a watermark whenever it received one. >> > > > >> > > > In case of a join, the smallest future timestamp depends on two >> fields >> > > and >> > > > not just on one. >> > > > >> > > > Best, >> > > > Fabian >> > > > >> > > > [1] https://issues.apache.org/jira/browse/FLINK-7245 >> > > > >> > > > >> > > > 2017-07-31 14:35 GMT+02:00 Jark Wu <j...@apache.org>: >> > > > >> > > > > Hi, >> > > > > >> > > > > @Fabian, I read your proposal carefully again, and I'm big +1 to >> do >> > it. >> > > > The >> > > > > proposal can address the problem of that how to forward both input >> > > > tables' >> > > > > rowtime of dual stream join (windowed/non-windowed). The >> additional >> > > > > payload drawback >> > > > > is acceptable. >> > > > > >> > > > > You mentioned that: >> > > > > >> > > > > > The query operators ensure that the watermarks are always behind >> > all >> > > > > > event-time timestamps. With additional analysis we will be able >> to >> > > > > restrict >> > > > > > this to timestamps that are actually used as such. >> > > > > >> > > > > I'm more curious about how can we define the watermark strategies >> in >> > > > order >> > > > > to make sure all timestamp columns are aligned to watermarks. >> > > Especially, >> > > > > when the watermark has been defined in the input DataStream. >> > > > > >> > > > > Bests, >> > > > > Jark Wu >> > > > > >> > > > > >> > > > > 2017-07-27 23:13 GMT+08:00 Xingcan Cui <xingc...@gmail.com>: >> > > > > >> > > > > > Hi all, >> > > > > > >> > > > > > Thanks for the answers, @Fabian. >> > > > > > >> > > > > > @Jark, at first I also wanted the users to reassign the >> timestamp >> > > field >> > > > > > arbitrarily. However, that means we have to break the current >> "time >> > > > > system" >> > > > > > and create a new one. The blocked watermarks become meaningless >> and >> > > > > maybe a >> > > > > > new WatermarkAssigner should be provided. A little more strict >> > > > mechanism >> > > > > > would be only allowing to use the existing timestamp fields. It >> > > sounds >> > > > > > reasonable, but will bring an unnecessary barrier to >> stream/batch >> > > SQL, >> > > > > i.e. >> > > > > > some SQL works for the batch can not be executed in the stream >> > > > > environment. >> > > > > > I just wonder if we could automatically choose a field, which >> will >> > be >> > > > > used >> > > > > > in the following calculations. Not sure if it makes sense. >> > > > > > >> > > > > > @Shaoxuan @Radu, I totally agree that the "proctime" is the main >> > > block >> > > > > for >> > > > > > consolidating stream/batch SQL. Though from a general point of >> > view, >> > > it >> > > > > can >> > > > > > indicate the time to some extent, the randomness property >> > determines >> > > > that >> > > > > > it should never be used in time-sensitive applications. I always >> > > > believe >> > > > > in >> > > > > > that all the information used for query evaluation should be >> > acquired >> > > > > from >> > > > > > data itself. >> > > > > > >> > > > > > Best, >> > > > > > Xingcan >> > > > > > >> > > > > > On Thu, Jul 27, 2017 at 7:24 PM, Fabian Hueske < >> fhue...@gmail.com> >> > > > > wrote: >> > > > > > >> > > > > > > Hi Shaoxuan, >> > > > > > > >> > > > > > > thanks for your comments. I agree with your comment: >> > > > > > > >> > > > > > > > The problem we used to have is that we have treated >> eventtime >> > > > column >> > > > > > as a >> > > > > > > special timestamp column. >> > > > > > > >> > > > > > > IMO, an event-time timestamp column is a regular column that >> is >> > > > aligned >> > > > > > > with the watermarks of the stream. >> > > > > > > In order to distinguish watermark aligned columns from >> others, we >> > > > need >> > > > > a >> > > > > > > special flag in the schema. >> > > > > > > When a timestamp column is modified and we cannot guarantee >> that >> > is >> > > > it >> > > > > > > still aligned with the watermarks, it must lose the special >> flag >> > > and >> > > > be >> > > > > > > treated like any other column. >> > > > > > > >> > > > > > > Regarding your comments: >> > > > > > > 1) I agree, that we can use Long in addition to Timestamp as a >> > > > > timestamp >> > > > > > > columns. Since timestamp columns need to be comparable to >> > > watermarks >> > > > > > which >> > > > > > > are Longs, I don't see that other types would make sense. For >> > now, >> > > I >> > > > > > would >> > > > > > > keep the restriction that timestamps can only be of Timestamp >> > > type. I >> > > > > > > think, extending this to Long would be a follow-up issue to >> the >> > > > > changes I >> > > > > > > proposed here. >> > > > > > > 2) Relates to 1) and I agree. if we use a Long attribute as >> > > timestamp >> > > > > it >> > > > > > > should remain of type Long. For now I would keep converting >> it to >> > > > > > Timestamp >> > > > > > > and change that later. >> > > > > > > 3) Yes, timestamp columns must be aligned to watermarks. >> That's >> > > their >> > > > > > > primary characteristic. How to define watermark strategies is >> > > > > orthogonal >> > > > > > to >> > > > > > > this discussion, IMO. >> > > > > > > 4) From my point of view, proc-time is a purely virtual column >> > and >> > > > not >> > > > > > > related to an actual (data) column. However, it must be part >> of >> > the >> > > > > > schema >> > > > > > > and treated like any other attribute for a good user >> experience >> > and >> > > > SQL >> > > > > > > compliance. In order to be able to join two tables on >> processing >> > > > time, >> > > > > it >> > > > > > > must be possible to include a processing time column in the >> > schema >> > > > > > > definition of the table. Processing time queries can never >> > compute >> > > > the >> > > > > > same >> > > > > > > results as batch queries but their semantics should be aligned >> > with >> > > > > > > event-time queries. >> > > > > > > >> > > > > > > Best, Fabian >> > > > > > > >> > > > > > > 2017-07-27 9:47 GMT+02:00 Radu Tudoran < >> radu.tudo...@huawei.com >> > >: >> > > > > > > >> > > > > > > > Hi all, >> > > > > > > > >> > > > > > > > @Shaoxuan - thanks for the remarks. I have a question >> > regarding >> > > > your >> > > > > > > > suggestion not to consider to create proctime window in a >> > regular >> > > > > > > column. I >> > > > > > > > think this would be useful though. First you might need to >> > carry >> > > > the >> > > > > > > > timestamp indicator of when the processing happened (for log >> > > > > purposes, >> > > > > > > > provenance, traceability ...). Secondly - I do not think it >> is >> > > > > > > > contradicting with the semantics in batch SQL as in SQL you >> > have >> > > > the >> > > > > > > > function "now()" ...which pretty much carry the same >> semantics >> > as >> > > > > > having >> > > > > > > a >> > > > > > > > function to mark the proctime and then projecting this into >> a >> > > > column. >> > > > > > If >> > > > > > > I >> > > > > > > > am not mistaken you can introduce in database columns the >> > result >> > > of >> > > > > > > calling >> > > > > > > > now(). >> > > > > > > > >> > > > > > > > >> > > > > > > > Dr. Radu Tudoran >> > > > > > > > Staff Research Engineer - Big Data Expert >> > > > > > > > IT R&D Division >> > > > > > > > >> > > > > > > > >> > > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH >> > > > > > > > German Research Center >> > > > > > > > Munich Office >> > > > > > > > Riesstrasse 25, 80992 München >> > > > > > > > >> > > > > > > > E-mail: radu.tudo...@huawei.com >> > > > > > > > Mobile: +49 15209084330 >> > > > > > > > Telephone: +49 891588344173 >> > > > > > > > >> > > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH >> > > > > > > > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com >> > > > > > > > Registered Office: Düsseldorf, Register Court Düsseldorf, >> HRB >> > > > 56063, >> > > > > > > > Managing Director: Bo PENG, Qiuen Peng, Shengli Wang >> > > > > > > > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, >> HRB >> > > > 56063, >> > > > > > > > Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang >> > > > > > > > This e-mail and its attachments contain confidential >> > information >> > > > from >> > > > > > > > HUAWEI, which is intended only for the person or entity >> whose >> > > > address >> > > > > > is >> > > > > > > > listed above. Any use of the information contained herein in >> > any >> > > > way >> > > > > > > > (including, but not limited to, total or partial disclosure, >> > > > > > > reproduction, >> > > > > > > > or dissemination) by persons other than the intended >> > recipient(s) >> > > > is >> > > > > > > > prohibited. If you receive this e-mail in error, please >> notify >> > > the >> > > > > > sender >> > > > > > > > by phone or email immediately and delete it! >> > > > > > > > >> > > > > > > > >> > > > > > > > -----Original Message----- >> > > > > > > > From: Shaoxuan Wang [mailto:shaox...@apache.org] >> > > > > > > > Sent: Thursday, July 27, 2017 6:00 AM >> > > > > > > > To: Dev >> > > > > > > > Subject: Re: [DISCUSS] Table API / SQL internal timestamp >> > > handling >> > > > > > > > >> > > > > > > > Hi Everyone, >> > > > > > > > I like this proposal. The problem we used to have is that we >> > have >> > > > > > treated >> > > > > > > > eventtime column as a special timestamp column. An eventtime >> > > column >> > > > > is >> > > > > > > > nothing special than all other regular columns, but with a >> > > certain >> > > > > flag >> > > > > > > > (eventtime-indicator) inferring that this column can be >> used as >> > > an >> > > > > > > eventime >> > > > > > > > to decide when a bounded query can emit the final result by >> > > > comparing >> > > > > > > with >> > > > > > > > a concern associated waterMark. >> > > > > > > > >> > > > > > > > I have a few comments adding on top of this (they may have >> > > already >> > > > > been >> > > > > > > > addressed in the conversation — since It’s a long >> discussion, I >> > > may >> > > > > > miss >> > > > > > > > something): >> > > > > > > > >> > > > > > > > 1. While we remove timestamp column, we introduce >> > > > > > eventtime-indicator >> > > > > > > > (we may already have this concept), it is only a flag >> can be >> > > > > applied >> > > > > > > for >> > > > > > > > any column (note that some types may not be able to be >> used >> > as >> > > > > > > eventtime >> > > > > > > > column), indicating if this column can be used as >> eventtime >> > or >> > > > > not. >> > > > > > > This >> > > > > > > > flag is useful for validation and codeGen. >> > > > > > > > 2. A column that has been used as an eventtime, should >> not >> > > lose >> > > > > its >> > > > > > > own >> > > > > > > > type. We should not cast all eventime column to the >> > timestamp >> > > > > type. >> > > > > > > For >> > > > > > > > instance, if a column is a long type, it will keep as >> long >> > > type >> > > > > even >> > > > > > > if >> > > > > > > > a >> > > > > > > > window aggregate has used it as a eventtime. >> > > > > > > > 3. Eventtime will only work well with some associated >> > > waterMark >> > > > > > > > strategy. We may consider forcing user to provide a >> > waterMark >> > > > > logic >> > > > > > on >> > > > > > > > his/her selected eventtime. >> > > > > > > > 4. For proctime, I hope we should not introduce >> > > > proctime-indicator >> > > > > > for >> > > > > > > > regular column. Ideally we should not allow user to >> create >> > > > > proctime >> > > > > > > > window >> > > > > > > > on regular column, as this is against the batch query >> > > semantics. >> > > > > > > > Therefore >> > > > > > > > I suggest we should always introduce a proctime timestamp >> > > column >> > > > > for >> > > > > > > > users >> > > > > > > > to create proctime window. And unlike eventtime, proctime >> > does >> > > > not >> > > > > > > need >> > > > > > > > any >> > > > > > > > associated waterMark strategy, as there is no such out of >> > > order >> > > > > > issue >> > > > > > > > for >> > > > > > > > the proctime. >> > > > > > > > >> > > > > > > > Regards, >> > > > > > > > Shaoxuan >> > > > > > > > >> > > > > > > > On Wed, Jul 26, 2017 at 9:10 PM, Fabian Hueske < >> > > fhue...@gmail.com> >> > > > > > > wrote: >> > > > > > > > >> > > > > > > > > Thanks everybody for the replies so far. >> > > > > > > > > >> > > > > > > > > Let me answer your questions and reply to your thoughts: >> > > > > > > > > >> > > > > > > > > Radu: >> > > > > > > > > --- >> > > > > > > > > First of all, although my proposal is movivated by a join >> > > > operator, >> > > > > > > > > this discussion is about timestamp handling, not about >> joins >> > in >> > > > > > > general. >> > > > > > > > > >> > > > > > > > > - The semantics of outer joins is to emit null and there >> is >> > no >> > > > way >> > > > > > > > > around that. This is not an issue for us. Actually, outer >> > joins >> > > > are >> > > > > > > > > supported by the batch SQL / Table API. It is true that >> outer >> > > > joins >> > > > > > > > > might result in null timestamps. Calcite will mark those >> > fields >> > > > as >> > > > > > > > > nullable and we should check that timestamps which are >> used >> > in >> > > > > > windows >> > > > > > > > or joins are not nullable. >> > > > > > > > > - The query has to explicitly specify which timestamp >> > attribute >> > > > to >> > > > > > use. >> > > > > > > > > Otherwise its semantics are not complete and it is >> invalid. A >> > > > > > > > > group-window that follows a join will reference a >> timestamp >> > > > > attribute >> > > > > > > > > and this will be used. The other timestamp might be >> projected >> > > > out. >> > > > > > > > > When a result with two timestamps is converted into a >> > > DataStream, >> > > > > the >> > > > > > > > > user has to decide. This could be done inside of the >> Table to >> > > > > > > > > DataStream conversion. If the Table has more than one >> valid >> > > > > > timestamp, >> > > > > > > > > the conversion will ask which timestamp to forward. >> > > > > > > > > - A proctime join should forward all proctime attributes >> of >> > the >> > > > > input >> > > > > > > > > tables. All will be the same, but that does not matter >> > because >> > > > they >> > > > > > > > > are either virtual or represented as 1 byte dummy >> attributes. >> > > > Also, >> > > > > > > > > unused ones will be automatically projected out anyway. >> > > > > > > > > - An event-time join should forward all event-time >> attributes >> > > of >> > > > > the >> > > > > > > > > input tables. Creating a new event-time attribute using >> > > > processing >> > > > > > > > > time makes event-time processing pointless and will give >> > > > completely >> > > > > > > > random results. >> > > > > > > > > Event-time is not about the "time an event is created" but >> > > about >> > > > a >> > > > > > > > > timestamp that is associated with an event. For example an >> > > order >> > > > > > event >> > > > > > > > > could have three timestamps: "orderTime", "shipTime", and >> > > > > > > "receiveTime". >> > > > > > > > > Each could be a valid event-time attribute. >> > > > > > > > > >> > > > > > > > > Jark: >> > > > > > > > > --- >> > > > > > > > > Thanks for the proposal. I think I understand what you >> want >> > to >> > > > > > achieve >> > > > > > > > > with this, but I think functions to instantiate time >> > attributes >> > > > are >> > > > > > > > > not necessary and would make things more complicated. The >> > point >> > > > of >> > > > > > > > > supporting multiple time attributes is to ensure that all >> of >> > > them >> > > > > are >> > > > > > > > > aligned with the watermarks. If we add a method >> > > > ROW_TIME(timestamp) >> > > > > > > > > and we don't know if the timestamp is aligned with the >> > > > watermarks. >> > > > > If >> > > > > > > > > that is not the case, the query won't be executed as >> > expected. >> > > > The >> > > > > > > > > issue of LEFT JOIN can easily be addressed by checking for >> > > > > > > > > nullablility during optimization when an operator tries to >> > use >> > > > it. >> > > > > > > > > >> > > > > > > > > The beauty of supporting multiple timestamps is that a >> user >> > > does >> > > > > not >> > > > > > > > > have to care at all about timestamps (or timestamp >> functions) >> > > and >> > > > > > > > > watermarks. As long as the query uses a timestamp >> attribute >> > > that >> > > > > was >> > > > > > > > > originally declared as rowtime in a source table (and was >> not >> > > > > > modified >> > > > > > > > > afterwards), this is fine. Think of a cascade of three >> > windowed >> > > > > > joins: >> > > > > > > > > R - S - T - U, and you want to join S - T first. In that >> > case, >> > > > you >> > > > > > > > > need to preserve the timestamps of S and T in order to >> join R >> > > and >> > > > > U. >> > > > > > > > > From a relational algebra point of view, there is no >> reason >> > to >> > > > > have a >> > > > > > > > > limitation on how these attributes are accessed. >> Timestamps >> > are >> > > > > just >> > > > > > > > > regular fields of a record. The only restriction in the >> > context >> > > > of >> > > > > > > > > stream processing is that the watermark must be aligned >> with >> > > > > > > > > timestamps, i.e., follow all timestamps such that data is >> not >> > > > late >> > > > > > > > > according to any of the timestamps. This we can achieve >> and >> > > > handle >> > > > > > > > internally without the user having to worry about it. >> > > > > > > > > >> > > > > > > > > Xingcan: >> > > > > > > > > --- >> > > > > > > > > I think your questions are mostly implementation details >> and >> > > not >> > > > so >> > > > > > > > > much related to the original proposal of supporting >> multiple >> > > > > > > timestamps. >> > > > > > > > > >> > > > > > > > > My take on your questions is: >> > > > > > > > > 1. The rate at which watermarks are emitted is not >> important >> > > for >> > > > > the >> > > > > > > > > correctness of a query. However, it can affect the >> > performance, >> > > > > > > > > because each watermark is sent as a special record and it >> is >> > > > > > > > > broadcasted. My initial take would be to emit a new >> watermark >> > > > > > whenever >> > > > > > > > > the operator updated its watermark because usually, the >> > > operator >> > > > > > would >> > > > > > > > > have forwarded the old watermark. >> > > > > > > > > 2. I would say this is the responsibility of the operator >> > > because >> > > > > > > > > first it is not related to the semantics of the query and >> > > second >> > > > it >> > > > > > is >> > > > > > > > > an operator responsibility in the existing code as well. >> > > > > > > > > >> > > > > > > > > Jark 2: >> > > > > > > > > You are right, the query (or user) must decide on the >> > > event-time >> > > > > > > > > attribute to use. My main point is, it is much easier for >> the >> > > > user >> > > > > > > > > (and for us >> > > > > > > > > internally) if we internally track multiple timestamps. >> > Because >> > > > we >> > > > > do >> > > > > > > > > not have to prune the timestamp that will not be later >> used >> > > into >> > > > > the >> > > > > > > > join. >> > > > > > > > > Moreover, both timestamps might be used later (see join >> > > example, >> > > > > > which >> > > > > > > > > could be reordered of course). All we have to do is to >> ensure >> > > > that >> > > > > > all >> > > > > > > > > timestamps are aligned with the watermarks. >> > > > > > > > > >> > > > > > > > > Radu 2: >> > > > > > > > > IMO, time (or anything else that affects the semantics) >> > should >> > > > > never >> > > > > > > > > be decided by the system. When we would do that, a query >> is >> > not >> > > > > fully >> > > > > > > > > specified or, even worse, the way it is executed is >> > > semantically >> > > > > > > > > incorrect and produces arbitrary results. >> > > > > > > > > >> > > > > > > > > Time attributes should be specified in the source tables >> and >> > > then >> > > > > > > > > forwarded from there. So far I haven't seen an example >> where >> > > this >> > > > > > > > > would not be possible (within the semantics or relational >> > > > queries). >> > > > > > If >> > > > > > > > > we do that right, there won't be a need for explicit time >> > > > > management >> > > > > > > > > except for the definition of the initial timestamps which >> can >> > > be >> > > > > > > > > hidden in the table definition. As I said before, we (or >> the >> > > > > system) >> > > > > > > > > cannot decide on the timestamp because that would lead to >> > > > arbitrary >> > > > > > > > > results. Asking the user to do that would mean explicit >> time >> > > > > > > > > management which is also not desirable. I think my >> proposal >> > > gives >> > > > > > > > > users all options (timestamps) to chose from and the >> system >> > can >> > > > do >> > > > > > the >> > > > > > > > rest. >> > > > > > > > > >> > > > > > > > > Best, Fabian >> > > > > > > > > >> > > > > > > > > 2017-07-26 10:46 GMT+02:00 Radu Tudoran < >> > > radu.tudo...@huawei.com >> > > > >: >> > > > > > > > > >> > > > > > > > > > Hi everyone, >> > > > > > > > > > >> > > > > > > > > > I just want to add that I was referring to NULL values >> not >> > > > > > > > > > specifically >> > > > > > > > > to >> > > > > > > > > > timefields but to the event itself. If you have the >> follow >> > > > > > situation >> > > > > > > > > > >> > > > > > > > > > Stream 1: .... | event1 | .... >> > > > > > > > > > Stream 2: .... | | .... >> > > > > > > > > > >> > > > > > > > > > And you have a LEFT JOIN between stream 1 and stream 2 >> (no >> > > > > > > > > > condition)...then you still need to emit (event1,null) >> ... >> > as >> > > > > this >> > > > > > > > > > is the behavior of left join. This is maybe a very >> simple >> > > > > > situation, >> > > > > > > > > > but the >> > > > > > > > > point >> > > > > > > > > > is that left joins and right joins can have situation >> when >> > > you >> > > > > have >> > > > > > > > > > elements only in the main stream and no element in the >> > right >> > > > > > stream. >> > > > > > > > > > And for this case you still need to emit. >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > Regarding whether time should be decided by system or >> > not...i >> > > > > think >> > > > > > > > > > the answer is it depends. I think the example from Jack >> is >> > > very >> > > > > > good >> > > > > > > > > > and >> > > > > > > > > shows >> > > > > > > > > > the need for some mechanisms to select/manage the time >> (I >> > > like >> > > > > the >> > > > > > > > > proposal >> > > > > > > > > > of having functions to insert the time in the output!). >> > > > However, >> > > > > if >> > > > > > > > > > a business analyst would write a query without explicit >> > time >> > > > > > > > > > management we still need to have some default behavior >> in >> > the >> > > > > > > > > > system. As per my initial proposal, I think we need to >> > > decide >> > > > on >> > > > > > > > > > one timestamp field to carry (either a new one at the >> > moment >> > > of >> > > > > the >> > > > > > > > > > join) or the timestamp from the >> > > > > > > > > main >> > > > > > > > > > stream (...although I am not sure which one is the main >> > > stream >> > > > > in >> > > > > > > > > > the >> > > > > > > > > case >> > > > > > > > > > of a full join:) ) >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > Dr. Radu Tudoran >> > > > > > > > > > Staff Research Engineer - Big Data Expert IT R&D >> Division >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH >> > > > > > > > > > German Research Center >> > > > > > > > > > Munich Office >> > > > > > > > > > Riesstrasse 25, 80992 München >> > > > > > > > > > >> > > > > > > > > > E-mail: radu.tudo...@huawei.com >> > > > > > > > > > Mobile: +49 15209084330 >> > > > > > > > > > Telephone: +49 891588344173 >> > > > > > > > > > >> > > > > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH >> > > > > > > > > > Hansaallee 205, 40549 Düsseldorf, Germany, >> www.huawei.com >> > > > > > > > > > Registered Office: Düsseldorf, Register Court >> Düsseldorf, >> > HRB >> > > > > > 56063, >> > > > > > > > > > Managing Director: Bo PENG, Qiuen Peng, Shengli Wang >> > > > > > > > > > Sitz der Gesellschaft: Düsseldorf, Amtsgericht >> Düsseldorf, >> > > HRB >> > > > > > 56063, >> > > > > > > > > > Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang >> > > > > > > > > > This e-mail and its attachments contain confidential >> > > > information >> > > > > > from >> > > > > > > > > > HUAWEI, which is intended only for the person or entity >> > whose >> > > > > > address >> > > > > > > > is >> > > > > > > > > > listed above. Any use of the information contained >> herein >> > in >> > > > any >> > > > > > way >> > > > > > > > > > (including, but not limited to, total or partial >> > disclosure, >> > > > > > > > > reproduction, >> > > > > > > > > > or dissemination) by persons other than the intended >> > > > recipient(s) >> > > > > > is >> > > > > > > > > > prohibited. If you receive this e-mail in error, please >> > > notify >> > > > > the >> > > > > > > > sender >> > > > > > > > > > by phone or email immediately and delete it! >> > > > > > > > > > >> > > > > > > > > > -----Original Message----- >> > > > > > > > > > From: Jark Wu [mailto:j...@apache.org] >> > > > > > > > > > Sent: Wednesday, July 26, 2017 8:29 AM >> > > > > > > > > > To: dev@flink.apache.org >> > > > > > > > > > Subject: Re: [DISCUSS] Table API / SQL internal >> timestamp >> > > > > handling >> > > > > > > > > > >> > > > > > > > > > Hi Xingcan, >> > > > > > > > > > >> > > > > > > > > > IMO, I don't think event-time of join results could be >> > > > > > automatically >> > > > > > > > > > decided by system. Considering batch tables, if users >> want >> > a >> > > > > event >> > > > > > > time >> > > > > > > > > > window aggregation after join, user must specify the >> time >> > > field >> > > > > > > > > explicitly >> > > > > > > > > > (T1.rowtime or T2.rowtime or the computed result of >> them). >> > So >> > > > in >> > > > > > the >> > > > > > > > case >> > > > > > > > > > of streaming tables, the system also can't automatically >> > > decide >> > > > > the >> > > > > > > > time >> > > > > > > > > > field for users. >> > > > > > > > > > >> > > > > > > > > > In regards to the question you asked, I think we don't >> need >> > > to >> > > > > > change >> > > > > > > > the >> > > > > > > > > > watermark no matter we choose the left rowtime or right >> > > rowtime >> > > > > or >> > > > > > > the >> > > > > > > > > > combination. Because the watermark has been aligned with >> > the >> > > > > > rowtime >> > > > > > > in >> > > > > > > > > the >> > > > > > > > > > source. Maybe I'm wrong about this, please correct me if >> > I'm >> > > > > > missing >> > > > > > > > > > something. >> > > > > > > > > > >> > > > > > > > > > What do you think? >> > > > > > > > > > >> > > > > > > > > > Regards, >> > > > > > > > > > Jark >> > > > > > > > > > >> > > > > > > > > > 2017-07-26 11:24 GMT+08:00 Xingcan Cui < >> xingc...@gmail.com >> > >: >> > > > > > > > > > >> > > > > > > > > > > Hi all, >> > > > > > > > > > > >> > > > > > > > > > > @Fabian, thanks for raising this. >> > > > > > > > > > > >> > > > > > > > > > > @Radu and Jark, personally I think the timestamp >> field is >> > > > > > critical >> > > > > > > > for >> > > > > > > > > > > query processing and thus should be declared as (or >> > > supposed >> > > > to >> > > > > > be) >> > > > > > > > > > > NOT NULL. In addition, I think the event-time >> semantic of >> > > the >> > > > > > join >> > > > > > > > > > > results should be automatically decided by the system, >> > > i.e., >> > > > we >> > > > > > do >> > > > > > > > not >> > > > > > > > > > > hand it over to users so to avoid some unpredictable >> > > > > assignment. >> > > > > > > > > > > >> > > > > > > > > > > Generally speaking, consolidating different time >> fields >> > is >> > > > > > possible >> > > > > > > > > > > since all of them should ideally be monotonically >> > > increasing. >> > > > > > From >> > > > > > > my >> > > > > > > > > > > point of view, the problem lies in >> > > > > > > > > > > (1) what's the relationship between the old and new >> > > > watermarks. >> > > > > > > Shall >> > > > > > > > > > > they be one-to-one mapping or the new watermarks could >> > skip >> > > > > some >> > > > > > > > > > > timestamps? And (2) who is in charge of emitting the >> > > blocked >> > > > > > > > > > > watermarks, the operator or the process function? >> > > > > > > > > > > >> > > > > > > > > > > I'd like to hear from you. >> > > > > > > > > > > >> > > > > > > > > > > Best, >> > > > > > > > > > > Xingcan >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > On Wed, Jul 26, 2017 at 10:40 AM, Jark Wu < >> > j...@apache.org >> > > > >> > > > > > wrote: >> > > > > > > > > > > >> > > > > > > > > > > > Hi, >> > > > > > > > > > > > >> > > > > > > > > > > > Radu's concerns make sense to me, especially the >> null >> > > value >> > > > > > > > > > > > timestamp and multi-proctime. >> > > > > > > > > > > > >> > > > > > > > > > > > I have also something in my mind. I would like to >> > propose >> > > > > some >> > > > > > > time >> > > > > > > > > > > > indicator built-in functions, e.g. >> ROW_TIME(Timestamp >> > ts) >> > > > > will >> > > > > > > > > > > > generate a event time logical attribute, PROC_TIME() >> > will >> > > > > > > generate >> > > > > > > > a >> > > > > > > > > > > > processing time logical attribute. It is similar to >> > > > > > > TUMBLE_ROWTIME >> > > > > > > > > > > > proposed in this PR https://github.com/apache/ >> > > > > flink/pull/4199. >> > > > > > > > These >> > > > > > > > > > > > can be used in any queries, but there still can't be >> > more >> > > > > than >> > > > > > > one >> > > > > > > > > > > > rowtime attribute or more than one proctime >> attribute >> > in >> > > a >> > > > > > table >> > > > > > > > > > schema. >> > > > > > > > > > > > >> > > > > > > > > > > > The both selected timestamp fields from a JOIN query >> > will >> > > > be >> > > > > > > > > > > materialized. >> > > > > > > > > > > > If someone needs further down the computation based >> on >> > > the >> > > > > > event >> > > > > > > > > > > > time, >> > > > > > > > > > > they >> > > > > > > > > > > > need to create a new time attribute using the >> > > ROW_TIME(...) >> > > > > > > > > > > > function. And this can also solve the null timestamp >> > > > problem >> > > > > in >> > > > > > > > LEFT >> > > > > > > > > > > > JOIN, because we >> > > > > > > > > > > can >> > > > > > > > > > > > use a user defined function to combine the two >> rowtimes >> > > and >> > > > > > make >> > > > > > > > the >> > > > > > > > > > > result >> > > > > > > > > > > > as the event time attribute, e.g. SELECT >> > > > > > ROW_TIME(udf(T1.rowtime, >> > > > > > > > > > > > T2.rowtime)) as rowtime FROM T1 JOIN T2 ... >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > What do you think? >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > 2017-07-25 23:48 GMT+08:00 Radu Tudoran < >> > > > > > radu.tudo...@huawei.com >> > > > > > > >: >> > > > > > > > > > > > >> > > > > > > > > > > > > Hi, >> > > > > > > > > > > > > >> > > > > > > > > > > > > I think this is an interesting discussion and I >> would >> > > > like >> > > > > to >> > > > > > > add >> > > > > > > > > > > > > some issues and give some feedback. >> > > > > > > > > > > > > >> > > > > > > > > > > > > - For supporting the join we do not only need to >> > think >> > > of >> > > > > the >> > > > > > > > time >> > > > > > > > > > > > > but also on the null values. For example if you >> have >> > a >> > > > LEFT >> > > > > > (or >> > > > > > > > > > > > > RIGHT) JOIN between items of 2 input streams, and >> the >> > > > > > secondary >> > > > > > > > > > > > > input is not >> > > > > > > > > > > > available >> > > > > > > > > > > > > you should still emit Row.of(event1, null)...as >> far >> > as >> > > I >> > > > > know >> > > > > > > if >> > > > > > > > > > > > > you >> > > > > > > > > > > need >> > > > > > > > > > > > > to serialize/deserialize null values to send them >> > they >> > > do >> > > > > not >> > > > > > > > > > > > > work. So >> > > > > > > > > > > we >> > > > > > > > > > > > > should include this scenario in the discussions >> -If >> > we >> > > > will >> > > > > > > have >> > > > > > > > > > > > > multiple timestamp in an (output) event, one >> question >> > > > > > > > > > > is >> > > > > > > > > > > > > how to select afterwards which is the primary time >> > > field >> > > > on >> > > > > > > which >> > > > > > > > > > > > > to operate. When we describe a query we might be >> able >> > > to >> > > > > > > specify >> > > > > > > > > > > > > (or we >> > > > > > > > > > > get >> > > > > > > > > > > > > this implicitly if we implement the carryon of >> the 2 >> > > > > > > timestamps) >> > > > > > > > > > > Select >> > > > > > > > > > > > > T1.rowtime, T2.rowtime ...but if the output of a >> > query >> > > is >> > > > > the >> > > > > > > > > > > > > input of >> > > > > > > > > > > a >> > > > > > > > > > > > > new processing pipeline, then, do we support >> > generally >> > > > also >> > > > > > > that >> > > > > > > > > > > > > the >> > > > > > > > > > > > input >> > > > > > > > > > > > > has 2 time fields? ...how do we deal with the 2 >> input >> > > > > fields >> > > > > > > > > > > > > (maybe I >> > > > > > > > > > > am >> > > > > > > > > > > > > missing something) further in the datastream >> pipeline >> > > > that >> > > > > we >> > > > > > > > > > > > > build >> > > > > > > > > > > based >> > > > > > > > > > > > > on the output? >> > > > > > > > > > > > > - For the case of proctime - do we need to carry 2 >> > > > > proctimes >> > > > > > > (the >> > > > > > > > > > > > > proctimes of the incoming events from each >> stream), >> > or >> > > 1 >> > > > > > > proctime >> > > > > > > > > > > > > (as >> > > > > > > > > > > we >> > > > > > > > > > > > > operate on proctime and the combination of the 2 >> > inputs >> > > > can >> > > > > > be >> > > > > > > > > > > considered >> > > > > > > > > > > > > as a new event, the current proctime on the >> machine >> > can >> > > > be >> > > > > > > > > > > > > considered >> > > > > > > > > > > the >> > > > > > > > > > > > > (proc)time reference for output event) or 3 >> proctimes >> > > > (the >> > > > > 2 >> > > > > > > > > > > > > proctimes >> > > > > > > > > > > of >> > > > > > > > > > > > > the input plus the proctime when the new event was >> > > > > created)? >> > > > > > > > > > > > > -Similar with the point above, for even time >> (which I >> > > am >> > > > > > > > > > > > > understanding >> > > > > > > > > > > as >> > > > > > > > > > > > > the time when the event was created...or do we >> > > understand >> > > > > > them >> > > > > > > as >> > > > > > > > > > > > > a >> > > > > > > > > > > time >> > > > > > > > > > > > > carry within the event?) - when we join 2 events >> and >> > > > output >> > > > > > an >> > > > > > > > > > > > > event >> > > > > > > > > > > that >> > > > > > > > > > > > > is the result of the join - isn't this a new event >> > > detach >> > > > > > from >> > > > > > > > the >> > > > > > > > > > > > > source\input events? ... I would tend to say it >> is a >> > > new >> > > > > > event >> > > > > > > > and >> > > > > > > > > > > > > then >> > > > > > > > > > > > as >> > > > > > > > > > > > > for proctime the event time of the new event is >> the >> > > > current >> > > > > > > time >> > > > > > > > > > > > > when >> > > > > > > > > > > > this >> > > > > > > > > > > > > output event was created. If we would accept this >> > > > > hypothesis >> > > > > > > then >> > > > > > > > > > > > > we >> > > > > > > > > > > > would >> > > > > > > > > > > > > not need the 2 time input fields to be >> > carried/managed >> > > > > > > > implicitly. >> > > > > > > > > > > > > If someone needs further down the computation >> > pipeline, >> > > > > then >> > > > > > in >> > > > > > > > > > > > > the query >> > > > > > > > > > > > they >> > > > > > > > > > > > > would be selected explicitly from the input stream >> > and >> > > > > > > projected >> > > > > > > > > > > > > in >> > > > > > > > > > > some >> > > > > > > > > > > > > fields to be carried (Select T1.rowtime as >> > FormerTime1, >> > > > > > > > T2.rowtime >> > > > > > > > > > > > > as FormerTime2, .... JOIN T1, T2...)...but they >> would >> > > not >> > > > > > have >> > > > > > > > the >> > > > > > > > > > > timestamp >> > > > > > > > > > > > > logic >> > > > > > > > > > > > > >> > > > > > > > > > > > > ..my 2 cents >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > Dr. Radu Tudoran >> > > > > > > > > > > > > Staff Research Engineer - Big Data Expert IT R&D >> > > Division >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH >> > > > > > > > > > > > > German Research Center >> > > > > > > > > > > > > Munich Office >> > > > > > > > > > > > > Riesstrasse 25, 80992 München >> > > > > > > > > > > > > >> > > > > > > > > > > > > E-mail: radu.tudo...@huawei.com >> > > > > > > > > > > > > Mobile: +49 15209084330 <+49%201520%209084330> >> > > > > > > > > > > > > Telephone: +49 891588344173 >> <+49%2089%201588344173> >> > > > > > > > > > > > > >> > > > > > > > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH >> > > > > > > > > > > > > Hansaallee 205, 40549 Düsseldorf, Germany, >> > > > www.huawei.com >> > > > > > > > > > > > > Registered Office: Düsseldorf, Register Court >> > > Düsseldorf, >> > > > > HRB >> > > > > > > > > 56063, >> > > > > > > > > > > > > Managing Director: Bo PENG, Qiuen Peng, Shengli >> Wang >> > > > > > > > > > > > > Sitz der Gesellschaft: Düsseldorf, Amtsgericht >> > > > Düsseldorf, >> > > > > > HRB >> > > > > > > > > 56063, >> > > > > > > > > > > > > Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang >> > > > > > > > > > > > > This e-mail and its attachments contain >> confidential >> > > > > > > information >> > > > > > > > > from >> > > > > > > > > > > > > HUAWEI, which is intended only for the person or >> > entity >> > > > > whose >> > > > > > > > > address >> > > > > > > > > > > is >> > > > > > > > > > > > > listed above. Any use of the information contained >> > > herein >> > > > > in >> > > > > > > any >> > > > > > > > > way >> > > > > > > > > > > > > (including, but not limited to, total or partial >> > > > > disclosure, >> > > > > > > > > > > > reproduction, >> > > > > > > > > > > > > or dissemination) by persons other than the >> intended >> > > > > > > recipient(s) >> > > > > > > > > is >> > > > > > > > > > > > > prohibited. If you receive this e-mail in error, >> > please >> > > > > > notify >> > > > > > > > the >> > > > > > > > > > > sender >> > > > > > > > > > > > > by phone or email immediately and delete it! >> > > > > > > > > > > > > >> > > > > > > > > > > > > -----Original Message----- >> > > > > > > > > > > > > From: Fabian Hueske [mailto:fhue...@gmail.com] >> > > > > > > > > > > > > Sent: Tuesday, July 25, 2017 4:22 PM >> > > > > > > > > > > > > To: dev@flink.apache.org >> > > > > > > > > > > > > Subject: [DISCUSS] Table API / SQL internal >> timestamp >> > > > > > handling >> > > > > > > > > > > > > >> > > > > > > > > > > > > Hi everybody, >> > > > > > > > > > > > > >> > > > > > > > > > > > > I'd like to propose and discuss some changes in >> the >> > way >> > > > how >> > > > > > the >> > > > > > > > > Table >> > > > > > > > > > > API >> > > > > > > > > > > > > / SQL internally handles timestamps. >> > > > > > > > > > > > > >> > > > > > > > > > > > > The Table API is implemented on top of the >> DataStream >> > > > API. >> > > > > > The >> > > > > > > > > > > DataStream >> > > > > > > > > > > > > API hides timestamps from users in order to ensure >> > that >> > > > > > > > timestamps >> > > > > > > > > > and >> > > > > > > > > > > > > watermarks are aligned. Instead users assign >> > timestamps >> > > > and >> > > > > > > > > > watermarks >> > > > > > > > > > > > once >> > > > > > > > > > > > > (usually at the source or in a subsequent >> operator) >> > and >> > > > let >> > > > > > the >> > > > > > > > > > system >> > > > > > > > > > > > > handle the timestamps from there on. Timestamps >> are >> > > > stored >> > > > > in >> > > > > > > the >> > > > > > > > > > > > timestamp >> > > > > > > > > > > > > field of the StreamRecord which is a holder for >> the >> > > user >> > > > > > record >> > > > > > > > and >> > > > > > > > > > the >> > > > > > > > > > > > > timestamp. DataStream operators that depend on >> time >> > > > > > > > (time-windows, >> > > > > > > > > > > > process >> > > > > > > > > > > > > function, ...) access the timestamp from the >> > > > StreamRecord. >> > > > > > > > > > > > > >> > > > > > > > > > > > > In contrast to the DataSteam API, the Table API >> and >> > SQL >> > > > are >> > > > > > > aware >> > > > > > > > > of >> > > > > > > > > > > the >> > > > > > > > > > > > > semantics of a query. I.e., we can analyze how >> users >> > > > access >> > > > > > > > > > timestamps >> > > > > > > > > > > > and >> > > > > > > > > > > > > whether they are modified or not. Another >> difference >> > is >> > > > > that >> > > > > > > the >> > > > > > > > > > > > timestamp >> > > > > > > > > > > > > must be part of the schema of a table in order to >> > have >> > > > > > correct >> > > > > > > > > query >> > > > > > > > > > > > > semantics. >> > > > > > > > > > > > > >> > > > > > > > > > > > > The current design to handle timestamps is as >> > follows. >> > > > The >> > > > > > > Table >> > > > > > > > > API >> > > > > > > > > > > > > stores timestamps in the timestamp field of the >> > > > > StreamRecord. >> > > > > > > > > > > Therefore, >> > > > > > > > > > > > > timestamps are detached from the remaining data >> which >> > > is >> > > > > > stored >> > > > > > > > in >> > > > > > > > > > Row >> > > > > > > > > > > > > objects. Hence, the physical representation of a >> row >> > is >> > > > > > > different >> > > > > > > > > > from >> > > > > > > > > > > > its >> > > > > > > > > > > > > logical representation. We introduced a >> translation >> > > layer >> > > > > > > > > (RowSchema) >> > > > > > > > > > > to >> > > > > > > > > > > > > convert logical schema into physical schema. This >> is >> > > > > > necessery >> > > > > > > > for >> > > > > > > > > > > > > serialization or code generation when the logical >> > plan >> > > is >> > > > > > > > > translated >> > > > > > > > > > > > into a >> > > > > > > > > > > > > physical execution plan. Processing-time >> timestamps >> > are >> > > > > > > similarly >> > > > > > > > > > > > handled. >> > > > > > > > > > > > > They are not included in the physical schema and >> > looked >> > > > up >> > > > > > when >> > > > > > > > > > needed. >> > > > > > > > > > > > > This design also requires that we need to >> materialize >> > > > > > > timestamps >> > > > > > > > > when >> > > > > > > > > > > > they >> > > > > > > > > > > > > are accessed by expressions. Timestamp >> > materialization >> > > is >> > > > > > done >> > > > > > > > as a >> > > > > > > > > > > > > pre-optimization step. >> > > > > > > > > > > > > >> > > > > > > > > > > > > While thinking about the implementation of the >> > > event-time >> > > > > > > > windowed >> > > > > > > > > > > > > stream-stream join [1] I stumbled over the >> question >> > > which >> > > > > > > > timestamp >> > > > > > > > > > of >> > > > > > > > > > > > both >> > > > > > > > > > > > > input tables to forward. With the current design, >> we >> > > > could >> > > > > > only >> > > > > > > > > have >> > > > > > > > > > a >> > > > > > > > > > > > > single timestamp, so keeping both timestamps would >> > not >> > > be >> > > > > > > > possible. >> > > > > > > > > > The >> > > > > > > > > > > > > choice of the timestamp would need to be >> specified by >> > > the >> > > > > > query >> > > > > > > > > > > otherwise >> > > > > > > > > > > > > it would lack clear semantics. When executing the >> > join, >> > > > the >> > > > > > > join >> > > > > > > > > > > operator >> > > > > > > > > > > > > would need to make sure that no late data is >> emitted. >> > > > This >> > > > > > > would >> > > > > > > > > only >> > > > > > > > > > > > work >> > > > > > > > > > > > > the operator was able to hold back watermarks [2]. >> > > > > > > > > > > > > >> > > > > > > > > > > > > With this information in mind, I'd like to discuss >> > the >> > > > > > > following >> > > > > > > > > > > > proposal: >> > > > > > > > > > > > > >> > > > > > > > > > > > > - We allow more than one event-time timestamp and >> > store >> > > > > them >> > > > > > > > > directly >> > > > > > > > > > > in >> > > > > > > > > > > > > the Row >> > > > > > > > > > > > > - The query operators ensure that the watermarks >> are >> > > > always >> > > > > > > > behind >> > > > > > > > > > all >> > > > > > > > > > > > > event-time timestamps. With additional analysis we >> > will >> > > > be >> > > > > > able >> > > > > > > > to >> > > > > > > > > > > > restrict >> > > > > > > > > > > > > this to timestamps that are actually used as such. >> > > > > > > > > > > > > - When a DataStream operator is time-based (e.g., >> a >> > > > > > DataStream >> > > > > > > > > > > > > time-windows), we inject an operator that copies >> the >> > > > > > timestamp >> > > > > > > > from >> > > > > > > > > > the >> > > > > > > > > > > > Row >> > > > > > > > > > > > > into the StreamRecord. >> > > > > > > > > > > > > - We try to remove the distinction between logical >> > and >> > > > > > physical >> > > > > > > > > > schema. >> > > > > > > > > > > > > For event-time timestamps this is because we store >> > them >> > > > in >> > > > > > the >> > > > > > > > Row >> > > > > > > > > > > > object, >> > > > > > > > > > > > > for processing-time timestamps, we add a dummy >> byte >> > > > field. >> > > > > > When >> > > > > > > > > > > > accessing a >> > > > > > > > > > > > > field of this type, the code generator injects the >> > code >> > > > to >> > > > > > > fetch >> > > > > > > > > the >> > > > > > > > > > > > > timestamps. >> > > > > > > > > > > > > - We might be able to get around the >> pre-optimization >> > > > time >> > > > > > > > > > > > materialization >> > > > > > > > > > > > > step. >> > > > > > > > > > > > > - A join result would be able to keep both >> > timestamps. >> > > > The >> > > > > > > > > watermark >> > > > > > > > > > > > would >> > > > > > > > > > > > > be hold back for both so both could be used in >> > > subsequent >> > > > > > > > > operations. >> > > > > > > > > > > > > >> > > > > > > > > > > > > I admit, I haven't thought this completely >> through. >> > > > > > > > > > > > > However, the benefits of this design from my >> point of >> > > > view >> > > > > > are: >> > > > > > > > > > > > > - encoding of timestamps in Rows means that the >> > logical >> > > > > > schema >> > > > > > > is >> > > > > > > > > > equal >> > > > > > > > > > > > to >> > > > > > > > > > > > > the physical schema >> > > > > > > > > > > > > - no timestamp materialization >> > > > > > > > > > > > > - support for multiple timestamps. Otherwise we >> would >> > > > need >> > > > > to >> > > > > > > > > expose >> > > > > > > > > > > > > internal restrictions to the user which are hard >> to >> > > > > explain / >> > > > > > > > > > > > communicate. >> > > > > > > > > > > > > - no need to change any public interfaces at the >> > > moment. >> > > > > > > > > > > > > >> > > > > > > > > > > > > The drawbacks as far as I see them are: >> > > > > > > > > > > > > - additional payload due to unused timestamp >> field + >> > > > > possibly >> > > > > > > the >> > > > > > > > > > > > > processing-time dummy field >> > > > > > > > > > > > > - complete rework of the internal timestamp logic >> > > > > (again...) >> > > > > > > > > > > > > >> > > > > > > > > > > > > Please let me know what you think, >> > > > > > > > > > > > > Fabian >> > > > > > > > > > > > > >> > > > > > > > > > > > > [1] https://issues.apache.org/jira >> /browse/FLINK-6233 >> > > > > > > > > > > > > [2] https://issues.apache.org/jira >> /browse/FLINK-7245 >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> > >