Hi,

I think this is an interesting discussion and I would like to add some issues 
and give some feedback.

- For supporting the join we do not only need to think of the time but also on 
the null values. For example if you have a LEFT (or RIGHT) JOIN between items 
of 2 input streams, and the secondary input is not available you should still 
emit Row.of(event1, null)...as far as I know if you need to 
serialize/deserialize null values to send them they do not work. So we should 
include this scenario in the discussions
-If we will have multiple timestamp in an (output) event, one question is how 
to select afterwards which is the primary time field on which to operate. When 
we describe a query we might be able to specify (or we get this implicitly if 
we implement the carryon of the 2 timestamps)  Select T1.rowtime, T2.rowtime 
...but if the output of a query is the input of a new processing pipeline, 
then, do we support generally also that the input has 2 time fields? ...how do 
we deal with the 2 input fields (maybe I am missing something) further in the 
datastream pipeline that we build based on the output?
- For the case of proctime - do we need to carry 2 proctimes (the proctimes of 
the incoming events from each stream), or 1 proctime (as we operate on proctime 
and the combination of the 2 inputs can be considered as a new event, the 
current proctime on the machine can be considered the (proc)time reference for 
output event) or 3 proctimes (the 2 proctimes of the input plus the proctime 
when the new event was created)?
-Similar with the point above, for even time (which I am understanding as the 
time when the event was created...or do we understand them as a time carry 
within the event?) - when we join 2 events and output an event that is the 
result of the join - isn't this a new event detach from the source\input 
events? ... I would tend to say it is a new event and then as for proctime the 
event time of the new event is the current time when this output event was 
created. If we would accept this hypothesis then we would not need the 2 time 
input fields to be carried/managed implicitly.  If someone needs further down 
the computation pipeline, then in the query they would be selected explicitly 
from the input stream and projected in some fields to be carried (Select 
T1.rowtime as FormerTime1, T2.rowtime as FormerTime2, .... JOIN T1, 
T2...)...but they would not have the timestamp logic

..my 2 cents




Dr. Radu Tudoran
Staff Research Engineer - Big Data Expert
IT R&D Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
German Research Center
Munich Office
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Qiuen Peng, Shengli Wang
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang 
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!

-----Original Message-----
From: Fabian Hueske [mailto:fhue...@gmail.com] 
Sent: Tuesday, July 25, 2017 4:22 PM
To: dev@flink.apache.org
Subject: [DISCUSS] Table API / SQL internal timestamp handling

Hi everybody,

I'd like to propose and discuss some changes in the way how the Table API / SQL 
internally handles timestamps.

The Table API is implemented on top of the DataStream API. The DataStream API 
hides timestamps from users in order to ensure that timestamps and watermarks 
are aligned. Instead users assign timestamps and watermarks once (usually at 
the source or in a subsequent operator) and let the system handle the 
timestamps from there on. Timestamps are stored in the timestamp field of the 
StreamRecord which is a holder for the user record and the timestamp. 
DataStream operators that depend on time (time-windows, process function, ...) 
access the timestamp from the StreamRecord.

In contrast to the DataSteam API, the Table API and SQL are aware of the 
semantics of a query. I.e., we can analyze how users access timestamps and 
whether they are modified or not. Another difference is that the timestamp must 
be part of the schema of a table in order to have correct query semantics.

The current design to handle timestamps is as follows. The Table API stores 
timestamps in the timestamp field of the StreamRecord. Therefore, timestamps 
are detached from the remaining data which is stored in Row objects. Hence, the 
physical representation of a row is different from its logical representation. 
We introduced a translation layer (RowSchema) to convert logical schema into 
physical schema. This is necessery for serialization or code generation when 
the logical plan is translated into a physical execution plan. Processing-time 
timestamps are similarly handled.
They are not included in the physical schema and looked up when needed.
This design also requires that we need to materialize timestamps when they are 
accessed by expressions. Timestamp materialization is done as a 
pre-optimization step.

While thinking about the implementation of the event-time windowed 
stream-stream join [1] I stumbled over the question which timestamp of both 
input tables to forward. With the current design, we could only have a single 
timestamp, so keeping both timestamps would not be possible. The choice of the 
timestamp would need to be specified by the query otherwise it would lack clear 
semantics. When executing the join, the join operator would need to make sure 
that no late data is emitted. This would only work the operator was able to 
hold back watermarks [2].

With this information in mind, I'd like to discuss the following proposal:

- We allow more than one event-time timestamp and store them directly in the Row
- The query operators ensure that the watermarks are always behind all 
event-time timestamps. With additional analysis we will be able to restrict 
this to timestamps that are actually used as such.
- When a DataStream operator is time-based (e.g., a DataStream time-windows), 
we inject an operator that copies the timestamp from the Row into the 
StreamRecord.
- We try to remove the distinction between logical and physical schema. For 
event-time timestamps this is because we store them in the Row object, for 
processing-time timestamps, we add a dummy byte field. When accessing a field 
of this type, the code generator injects the code to fetch the timestamps.
- We might be able to get around the pre-optimization time materialization step.
- A join result would be able to keep both timestamps. The watermark would be 
hold back for both so both could be used in subsequent operations.

I admit, I haven't thought this completely through.
However, the benefits of this design from my point of view are:
- encoding of timestamps in Rows means that the logical schema is equal to the 
physical schema
- no timestamp materialization
- support for multiple timestamps. Otherwise we would need to expose internal 
restrictions to the user which are hard to explain / communicate.
- no need to change any public interfaces at the moment.

The drawbacks as far as I see them are:
- additional payload due to unused timestamp field + possibly the 
processing-time dummy field
- complete rework of the internal timestamp logic (again...)

Please let me know what you think,
Fabian

[1] https://issues.apache.org/jira/browse/FLINK-6233
[2] https://issues.apache.org/jira/browse/FLINK-7245

Reply via email to