Hi everybody,

I'd like to propose and discuss some changes in the way how the Table API /
SQL internally handles timestamps.

The Table API is implemented on top of the DataStream API. The DataStream
API hides timestamps from users in order to ensure that timestamps and
watermarks are aligned. Instead users assign timestamps and watermarks once
(usually at the source or in a subsequent operator) and let the system
handle the timestamps from there on. Timestamps are stored in the timestamp
field of the StreamRecord which is a holder for the user record and the
timestamp. DataStream operators that depend on time (time-windows, process
function, ...) access the timestamp from the StreamRecord.

In contrast to the DataSteam API, the Table API and SQL are aware of the
semantics of a query. I.e., we can analyze how users access timestamps and
whether they are modified or not. Another difference is that the timestamp
must be part of the schema of a table in order to have correct query
semantics.

The current design to handle timestamps is as follows. The Table API stores
timestamps in the timestamp field of the StreamRecord. Therefore,
timestamps are detached from the remaining data which is stored in Row
objects. Hence, the physical representation of a row is different from its
logical representation. We introduced a translation layer (RowSchema) to
convert logical schema into physical schema. This is necessery for
serialization or code generation when the logical plan is translated into a
physical execution plan. Processing-time timestamps are similarly handled.
They are not included in the physical schema and looked up when needed.
This design also requires that we need to materialize timestamps when they
are accessed by expressions. Timestamp materialization is done as a
pre-optimization step.

While thinking about the implementation of the event-time windowed
stream-stream join [1] I stumbled over the question which timestamp of both
input tables to forward. With the current design, we could only have a
single timestamp, so keeping both timestamps would not be possible. The
choice of the timestamp would need to be specified by the query otherwise
it would lack clear semantics. When executing the join, the join operator
would need to make sure that no late data is emitted. This would only work
the operator was able to hold back watermarks [2].

With this information in mind, I'd like to discuss the following proposal:

- We allow more than one event-time timestamp and store them directly in
the Row
- The query operators ensure that the watermarks are always behind all
event-time timestamps. With additional analysis we will be able to restrict
this to timestamps that are actually used as such.
- When a DataStream operator is time-based (e.g., a DataStream
time-windows), we inject an operator that copies the timestamp from the Row
into the StreamRecord.
- We try to remove the distinction between logical and physical schema. For
event-time timestamps this is because we store them in the Row object, for
processing-time timestamps, we add a dummy byte field. When accessing a
field of this type, the code generator injects the code to fetch the
timestamps.
- We might be able to get around the pre-optimization time materialization
step.
- A join result would be able to keep both timestamps. The watermark would
be hold back for both so both could be used in subsequent operations.

I admit, I haven't thought this completely through.
However, the benefits of this design from my point of view are:
- encoding of timestamps in Rows means that the logical schema is equal to
the physical schema
- no timestamp materialization
- support for multiple timestamps. Otherwise we would need to expose
internal restrictions to the user which are hard to explain / communicate.
- no need to change any public interfaces at the moment.

The drawbacks as far as I see them are:
- additional payload due to unused timestamp field + possibly the
processing-time dummy field
- complete rework of the internal timestamp logic (again...)

Please let me know what you think,
Fabian

[1] https://issues.apache.org/jira/browse/FLINK-6233
[2] https://issues.apache.org/jira/browse/FLINK-7245

Reply via email to