dawidwys commented on a change in pull request #14152:
URL: https://github.com/apache/flink/pull/14152#discussion_r528911851
##########
File path: docs/dev/table/streaming/temporal_tables.md
##########
@@ -20,103 +20,255 @@ software distributed under the License is distributed on
an
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
--->
+-->
-Temporal Tables represent a concept of a (parameterized) view on a changing
table that returns the content of a table at a specific point in time.
+Temporal table is a table that evolves over time as known as Flink [dynamic
table](dynamic_tables.html), rows in temporal table are associated with one or
more temporal periods, all Flink tables are temporal(dynamic) table.
-The changing table can either be a changing history table which tracks the
changes (e.g. database changelogs) or a changing dimension table which
materializes the changes (e.g. database tables).
+A temporal table contains one or more versioned table snapshots, it can be a
changing history table which tracks the changes(e.g. database changelog,
contains all snapshots) or a changing dimensioned table which materializes the
changes(e.g. database table, contains the latest snapshot).
-For the changing history table, Flink can keep track of the changes and allows
for accessing the content of the table at a certain point in time within a
query. In Flink, this kind of table is represented by a *Temporal Table
Function*.
+**Version**: A temporal table can split into a set of versioned table
snapshots, the version in table snapshots represents the valid life circle of
rows, the start time and the end time of the valid period can be assigned by
users.
+Temporal table can split to `versioned table` and `regular table` according to
the table can tracks its history version or not.
-For the changing dimension table, Flink allows for accessing the content of
the table at processing time within a query. In Flink, this kind of table is
represented by a *Temporal Table*.
+**Versioned table**: If the row in temporal table can track its history
changes and visit its history versions, we call this kind of temporal table as
versioned table, the table comes from database changelog can define as a
versioned table.
+
+**Regular table**: If the row in temporal table can only track and visit its
latest version,we call this kind of temporal table as regular table. The table
comes from database or HBase can define as a regular table.
* This will be replaced by the TOC
{:toc}
Motivation
----------
-### Correlate with a changing history table
+### Correlate with a versioned table
+Given a scenario the order stream correlates the dimension table product, the
table `orders` comes from kafka which contains the real time orders, the table
`product_changelog` comes from the changelog of the database table `products`,
+ the product price in table `products` is changing over time.
-Let's assume that we have the following table `RatesHistory`.
+{% highlight sql %}
+SELECT * FROM product_changelog;
+
+(changelog kind) update_time product_id product_name price
+================= =========== ========== ============ =====
++(INSERT) 00:01:00 p_001 scooter 11.11
++(INSERT) 00:02:00 p_002 basketball 23.11
+-(UPDATE_BEFORE) 12:00:00 p_001 scooter 11.11
++(UPDATE_AFTER) 12:00:00 p_001 scooter 12.99
+-(UPDATE_BEFORE) 12:00:00 p_002 basketball 23.11
++(UPDATE_AFTER) 12:00:00 p_002 basketball 19.99
+-(DELETE) 18:00:00 p_001 scooter 12.99
+{% endhighlight %}
+The table `product_changelog` represents an ever growing changelog of database
table `products`, for example, the initial price of product `scooter` is
`11.11` at `00:01:00`, and the price increases to `12.99` at `12:00:00`,
+ the product item is deleted from the table `products` at `18:00:00`.
+
+Given that we would like to output the version of `product_changelog` table of
the time `10:00:00`, the following table shows the result.
{% highlight sql %}
-SELECT * FROM RatesHistory;
+update_time product_id product_name price
+=========== ========== ============ =====
+00:01:00 p_001 scooter 11.11
+00:02:00 p_002 basketball 23.11
+{% endhighlight %}
-rowtime currency rate
-======= ======== ======
-09:00 US Dollar 102
-09:00 Euro 114
-09:00 Yen 1
-10:45 Euro 116
-11:15 Euro 119
-11:49 Pounds 108
+Given that we would like to output the version of `product_changelog` table of
the time `13:00:00`, the following table shows the result.
+{% highlight sql %}
+update_time product_id product_name price
+=========== ========== ============ =====
+12:00:00 p_001 scooter 12.99
+12:00:00 p_002 basketball 19.99
{% endhighlight %}
-`RatesHistory` represents an ever growing append-only table of currency
exchange rates with respect to `Yen` (which has a rate of `1`).
-For example, the exchange rate for the period from `09:00` to `10:45` of
`Euro` to `Yen` was `114`. From `10:45` to `11:15` it was `116`.
+In above example, the specific version of the table is tracked by
`update_time` and `product_id`, the `product_id` would be a primary key for
`product_changelog` table and `update_time` would be the event time.
+
+In Flink, this is represented by a [*versioned
table*](#defining-versioned-table).
+
+### Correlate with a regular table
-Given that we would like to output all current rates at the time `10:58`, we
would need the following SQL query to compute a result table:
+On the other hand, some use cases require to join a regular table which is an
external database table.
+Let's assume that `LatestRates` is a table (e.g. stored in HBase) which is
materialized with the latest rates. The `LatestRates` always represents the
latest content of hbase table `rates`.
+
+Then the content of `LatestRates` table when we query at time `10:15:00` is:
{% highlight sql %}
-SELECT *
-FROM RatesHistory AS r
-WHERE r.rowtime = (
- SELECT MAX(rowtime)
- FROM RatesHistory AS r2
- WHERE r2.currency = r.currency
- AND r2.rowtime <= TIME '10:58');
+10:15:00 > SELECT * FROM LatestRates;
+
+currency rate
+========= ====
+US Dollar 102
+Euro 114
+Yen 1
+{% endhighlight %}
+
+Then the content of `LatestRates` table when we query at time `11:00:00` is:
+{% highlight sql %}
+11:00:00 > SELECT * FROM LatestRates;
+
+currency rate
+========= ====
+US Dollar 102
+Euro 116
+Yen 1
{% endhighlight %}
-The correlated subquery determines the maximum time for the corresponding
currency that is lower or equal than the desired time. The outer query lists
the rates that have a maximum timestamp.
+In Flink, this is represented by a [*regular Table*](#defining-regular-table).
+
+Temporal Table
+--------------
+<span class="label label-danger">Attention</span> This is only supported in
Blink planner.
-The following table shows the result of such a computation. In our example,
the update to `Euro` at `10:45` is taken into account, however, the update to
`Euro` at `11:15` and the new entry of `Pounds` are not considered in the
table's version at time `10:58`.
+Flink uses primary key constraint and event time to define both versioned
table and versioned view.
-{% highlight text %}
-rowtime currency rate
-======= ======== ======
-09:00 US Dollar 102
-09:00 Yen 1
-10:45 Euro 116
+### Defining Versioned Table
+The table is a versioned table in Flink only is the table contains primary key
constraint and event time.
+{% highlight sql %}
+-- Define a versioned table
+CREATE TABLE product_changelog (
+ product_id STRING,
+ product_name STRING,
+ product_price DECIMAL(10, 4),
+ update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
+ PRIMARY KEY(product_id) NOT ENFORCED, -- (1) defines the primary key
constraint
+ WATERMARK FOR update_time AS update_time -- (2) defines the event time by
watermark
+) WITH (
+ 'connector' = 'kafka',
+ 'topic' = 'products',
+ 'scan.startup.mode' = 'earliest-offset',
+ 'properties.bootstrap.servers' = 'localhost:9092',
+ 'value.format' = 'debezium-json'
+);
{% endhighlight %}
-The concept of *Temporal Tables* aims to simplify such queries, speed up their
execution, and reduce Flink's state usage. A *Temporal Table* is a
parameterized view on an append-only table that interprets the rows of the
append-only table as the changelog of a table and provides the version of that
table at a specific point in time. Interpreting the append-only table as a
changelog requires the specification of a primary key attribute and a timestamp
attribute. The primary key determines which rows are overwritten and the
timestamp determines the time during which a row is valid.
+Line `(1)` defines the primary key constraint for table `product_changelog`,
Line `(2)` defines the `update_time` as event time for table
`product_changelog`,
+thus table `product_changelog` is a versioned table.
-In the above example `currency` would be a primary key for `RatesHistory`
table and `rowtime` would be the timestamp attribute.
+**Note**: The grammar `METADATA FROM 'value.source.timestamp' VIRTUAL` means
extract the database
+operation execution time for every changelog, it's strongly recommended
defines the database operation execution time
+as event time rather than ingestion-time or time in the record, otherwise the
version extract from the changelog may
+mismatch with the version in database.
+
+### Defining Versioned View
-In Flink, this is represented by a [*Temporal Table
Function*](#temporal-table-function).
+Flink also supports defining versioned view only if the view contains unique
key constraint and event time.
+
+Let’s assume that we have the following table `RatesHistory`:
+{% highlight sql %}
+-- Define an append-only table
+CREATE TABLE RatesHistory (
+ currency_time TIMESTAMP(3),
+ currency STRING,
+ rate DECIMAL(38, 10),
+ WATERMARK FOR currency_time AS currency_time -- defines the event time
+) WITH (
+ 'connector' = 'kafka',
+ 'topic' = 'rates',
+ 'scan.startup.mode' = 'earliest-offset',
+ 'properties.bootstrap.servers' = 'localhost:9092',
+ 'format' = 'json' -- this is an append only
source
+)
+{% endhighlight %}
-### Correlate with a changing dimension table
+Table `RatesHistory` represents an ever growing append-only table of currency
exchange rates with respect to
+Yen (which has a rate of 1). For example, the exchange rate for the period
from 09:00 to 10:45 of Euro to Yen was 114.
+From 10:45 to 11:15 it was 116.
+
+{% highlight sql %}
+SELECT * FROM RatesHistory;
-On the other hand, some use cases require to join a changing dimension table
which is an external database table.
+currency_time currency rate
+============= ========= ====
+09:00:00 US Dollar 102
+09:00:00 Euro 114
+09:00:00 Yen 1
+10:45:00 Euro 116
+11:15:00 Euro 119
+11:49:00 Pounds 108
+{% endhighlight %}
-Let's assume that `LatestRates` is a table (e.g. stored in HBase) which is
materialized with the latest rate. The `LatestRates` is the materialized
history `RatesHistory`. Then the content of `LatestRates` table at time `10:58`
will be:
+To define a versioned table on `RatesHistory`, Flink supports defining a
versioned view
+by [deduplication query]({{ site.baseurl
}}/dev/table/sql/queries.html#deduplication) which produces an ordered changelog
+stream with an inferred primary key(`currency`) and event
time(`currency_time`).
-{% highlight text %}
-10:58> SELECT * FROM LatestRates;
-currency rate
-======== ======
-US Dollar 102
-Yen 1
-Euro 116
+{% highlight sql %}
+-- Define a versioned view
+CREATE VIEW versioned_rates AS
+SELECT currency, rate, currency_time -- (1) `currency_time` keeps
the event time
+ FROM (
+ SELECT *,
+ ROW_NUMBER() OVER (PARTITION BY currency -- (2) the inferred unique key
`currency` can be a primary key
+ ORDER BY currency_time DESC) AS rowNum
+ FROM rates )
Review comment:
Make sure the examples are "runnable" We should use the `RatesHistory`
table here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]