alpinegizmo commented on a change in pull request #14292:
URL: https://github.com/apache/flink/pull/14292#discussion_r538223361
##########
File path: docs/dev/table/streaming/temporal_tables.md
##########
@@ -22,30 +22,29 @@ specific language governing permissions and limitations
under the License.
-->
-A Temporal table is a table that evolves over time - otherwise known in Flink
as a [dynamic table]({% link dev/table/streaming/dynamic_tables.md %}). Rows in
a temporal table are associated with one or more temporal periods and all Flink
tables are temporal(dynamic).
+Flink SQL operates over dynamic tables that evolve, which may either be
append-only or updating.
+Versioned tables represent a special type of updating table that remembers the
past values for each key.
-A temporal table contains one or more versioned table snapshots, it can be a
changing history table which tracks the changes(e.g. database changelog,
contains all snapshots) or a changing dimensioned table which materializes the
changes(e.g. database table, contains the latest snapshot).
+<span class="label label-danger">Attention</span> The Legacy planner does not
support versioned tables.
-**Version**: A temporal table can split into a set of versioned table
snapshots, the version in table snapshots represents the valid life circle of
rows, the start time and the end time of the valid period can be assigned by
users.
-Temporal table can split to `versioned table` and `regular table` according to
the table can tracks its history version or not.
+* This will be replaced by the TOC
+{:toc}
-**Versioned table**: If the rows a in temporal table can track its history
changes and visit its history versions, we call this a versioned table. Tables
that comes from a database changelog can be defined as a versioned table.
+## Concept
-**Regular table**: If the row in temporal table can only track and visit its
latest version,we call this kind of temporal table as regular table. Tables
that comes from a database or HBase can be defined as a regular table.
+Dynamic tables define relations over time.
+Often, particularly when working with metadata, a key's old value does not
become irrelevant when it changes.
-* This will be replaced by the TOC
-{:toc}
+Flink SQL can define versioned tables over any dynamic table with a `PRIMARY
KEY` constraint and time attribute.
-Motivation
-----------
+A primary key constraint in Flink means that a column or set of columns of a
table or view are unique and non-null.
+The primary key semantic on a upserting table means the materialized changes
for a particular key (`INSERT`/`UPDATE`/`DELETE`) represent the changes to a
single row over time. The time attribute on a upserting table defines when each
change occurred.
-### Correlate with a versioned table
-Given a scenario the order stream correlates the dimension table product, the
table `orders` comes from kafka which contains the real time orders, the table
`product_changelog` comes from the changelog of the database table `products`,
- the product price in table `products` is changing over time.
+Taken together, Flink can track the changes to a row over time and maintain
the period for which each value was valid for that key.
-{% highlight sql %}
-SELECT * FROM product_changelog;
+Suppose a table that tracks the price of different products in a store.
Review comment:
```suggestion
Suppose a table tracks the prices of different products in a store.
```
##########
File path: docs/dev/table/streaming/temporal_tables.md
##########
@@ -57,134 +56,68 @@ SELECT * FROM product_changelog;
-(DELETE) 18:00:00 p_001 scooter 12.99
{% endhighlight %}
-The table `product_changelog` represents an ever growing changelog of database
table `products`, for example, the initial price of product `scooter` is
`11.11` at `00:01:00`, and the price increases to `12.99` at `12:00:00`,
- the product item is deleted from the table `products` at `18:00:00`.
+Given this set of changes, we track how the price of a scooter changes over
time.
+It is initially $11.11 at `00:01:00` when added to the catalog.
+The price then rises to $12.99 at `12:00:00` before being deleted from the
catalog at `18:00:00`.
-Given that we would like to output the version of `product_changelog` table of
the time `10:00:00`, the following table shows the result.
-{% highlight sql %}
-update_time product_id product_name price
-=========== ========== ============ =====
-00:01:00 p_001 scooter 11.11
-00:02:00 p_002 basketball 23.11
-{% endhighlight %}
+If we queried the table for various products' prices at different times, we
would retrieve different results. At `10:00:00` the table would show one set of
prices.
-Given that we would like to output the version of `product_changelog` table of
the time `13:00:00`, the following table shows the result.
{% highlight sql %}
update_time product_id product_name price
=========== ========== ============ =====
12:00:00 p_001 scooter 12.99
12:00:00 p_002 basketball 19.99
Review comment:
```suggestion
00:01:00 p_001 scooter 11.11
00:02:00 p_002 basketball 23.11
```
##########
File path: docs/dev/table/streaming/temporal_tables.md
##########
@@ -57,134 +56,68 @@ SELECT * FROM product_changelog;
-(DELETE) 18:00:00 p_001 scooter 12.99
{% endhighlight %}
-The table `product_changelog` represents an ever growing changelog of database
table `products`, for example, the initial price of product `scooter` is
`11.11` at `00:01:00`, and the price increases to `12.99` at `12:00:00`,
- the product item is deleted from the table `products` at `18:00:00`.
+Given this set of changes, we track how the price of a scooter changes over
time.
+It is initially $11.11 at `00:01:00` when added to the catalog.
+The price then rises to $12.99 at `12:00:00` before being deleted from the
catalog at `18:00:00`.
-Given that we would like to output the version of `product_changelog` table of
the time `10:00:00`, the following table shows the result.
-{% highlight sql %}
-update_time product_id product_name price
-=========== ========== ============ =====
-00:01:00 p_001 scooter 11.11
-00:02:00 p_002 basketball 23.11
-{% endhighlight %}
+If we queried the table for various products' prices at different times, we
would retrieve different results. At `10:00:00` the table would show one set of
prices.
-Given that we would like to output the version of `product_changelog` table of
the time `13:00:00`, the following table shows the result.
{% highlight sql %}
update_time product_id product_name price
=========== ========== ============ =====
12:00:00 p_001 scooter 12.99
12:00:00 p_002 basketball 19.99
{% endhighlight %}
-In above example, the specific version of the table is tracked by
`update_time` and `product_id`, the `product_id` would be a primary key for
`product_changelog` table and `update_time` would be the event time.
-
-In Flink, this is represented by a [*versioned
table*](#defining-versioned-table).
-
-### Correlate with a regular table
-On the other hand, some use cases require to join a regular table which is an
external database table.
+While at `13:00:00,` we found find another.
-Let's assume that `LatestRates` is a table (e.g. stored in HBase) which is
materialized with the latest rates. The `LatestRates` always represents the
latest content of hbase table `rates`.
-
-Then the content of `LatestRates` table when we query at time `10:15:00` is:
{% highlight sql %}
-10:15:00 > SELECT * FROM LatestRates;
-
-currency rate
-========= ====
-US Dollar 102
-Euro 114
-Yen 1
-{% endhighlight %}
-
-Then the content of `LatestRates` table when we query at time `11:00:00` is:
-{% highlight sql %}
-11:00:00 > SELECT * FROM LatestRates;
-
-currency rate
-========= ====
-US Dollar 102
-Euro 116
-Yen 1
+update_time product_id product_name price
+=========== ========== ============ =====
+12:00:00 p_001 scooter 12.99
+12:00:00 p_002 basketball 19.99
{% endhighlight %}
-In Flink, this is represented by a [*regular Table*](#defining-regular-table).
-Temporal Table
---------------
-<span class="label label-danger">Attention</span> This is only supported in
Blink planner.
+## Versioned Table Sources
-Flink uses primary key constraint and event time to define both versioned
table and versioned view.
+Versioned tables are defined implicity for any table whose underlying source
or format directly define changelogs. Examples include the [upsert Kafka]({%
link dev/table/connectors/upsert-kafka.md %}) source as well as database
changelog formats such as [debezium]({% link
dev/table/connectors/formats/debezium.md %}) and [canal]({% link
dev/table/connectors/formats/canal.md %}). As discussed above, the only
additional requirement is the `CREATE` table statement must contain a primary
key and event-time attribute.
-### Defining Versioned Table
-The table is a versioned table in Flink only is the table contains primary key
constraint and event time.
{% highlight sql %}
--- Define a versioned table
-CREATE TABLE product_changelog (
- product_id STRING,
- product_name STRING,
- product_price DECIMAL(10, 4),
- update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
- PRIMARY KEY(product_id) NOT ENFORCED, -- (1) defines the primary key
constraint
- WATERMARK FOR update_time AS update_time -- (2) defines the event time by
watermark
-) WITH (
- 'connector' = 'kafka',
- 'topic' = 'products',
- 'scan.startup.mode' = 'earliest-offset',
- 'properties.bootstrap.servers' = 'localhost:9092',
- 'value.format' = 'debezium-json'
-);
+CREATE TABLE products (
+ product_id STRING,
+ product_name STRING,
+ price DECIMAL(32, 2),
+ update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp'
VIRTUAL,
+ PRIMARY KEY (product_id) NOT ENFORCED
+ WATERMARK FOR update_time AS update_time
+) WITH (...);
{% endhighlight %}
-Line `(1)` defines the primary key constraint for table `product_changelog`,
Line `(2)` defines the `update_time` as event time for table
`product_changelog`,
-thus table `product_changelog` is a versioned table.
+## Versioned Table Views
-**Note**: The grammar `METADATA FROM 'value.source.timestamp' VIRTUAL` means
extract the database
-operation execution time for every changelog, it's strongly recommended
defines the database operation execution time
-as event time rather than ingestion-time or time in the record, otherwise the
version extract from the changelog may
-mismatch with the version in database.
-
-### Defining Versioned View
+Flink also supports defining versioned views if the underlying query contains
a unique key constraint and event-time attribute. Image an append-only table of
currency rates.
Review comment:
```suggestion
Flink also supports defining versioned views if the underlying query
contains a unique key constraint and event-time attribute. Imagine an
append-only table of currency rates.
```
##########
File path: docs/dev/table/streaming/temporal_tables.md
##########
@@ -57,134 +56,68 @@ SELECT * FROM product_changelog;
-(DELETE) 18:00:00 p_001 scooter 12.99
{% endhighlight %}
-The table `product_changelog` represents an ever growing changelog of database
table `products`, for example, the initial price of product `scooter` is
`11.11` at `00:01:00`, and the price increases to `12.99` at `12:00:00`,
- the product item is deleted from the table `products` at `18:00:00`.
+Given this set of changes, we track how the price of a scooter changes over
time.
+It is initially $11.11 at `00:01:00` when added to the catalog.
+The price then rises to $12.99 at `12:00:00` before being deleted from the
catalog at `18:00:00`.
-Given that we would like to output the version of `product_changelog` table of
the time `10:00:00`, the following table shows the result.
-{% highlight sql %}
-update_time product_id product_name price
-=========== ========== ============ =====
-00:01:00 p_001 scooter 11.11
-00:02:00 p_002 basketball 23.11
-{% endhighlight %}
+If we queried the table for various products' prices at different times, we
would retrieve different results. At `10:00:00` the table would show one set of
prices.
-Given that we would like to output the version of `product_changelog` table of
the time `13:00:00`, the following table shows the result.
{% highlight sql %}
update_time product_id product_name price
=========== ========== ============ =====
12:00:00 p_001 scooter 12.99
12:00:00 p_002 basketball 19.99
{% endhighlight %}
-In above example, the specific version of the table is tracked by
`update_time` and `product_id`, the `product_id` would be a primary key for
`product_changelog` table and `update_time` would be the event time.
-
-In Flink, this is represented by a [*versioned
table*](#defining-versioned-table).
-
-### Correlate with a regular table
-On the other hand, some use cases require to join a regular table which is an
external database table.
+While at `13:00:00,` we found find another.
-Let's assume that `LatestRates` is a table (e.g. stored in HBase) which is
materialized with the latest rates. The `LatestRates` always represents the
latest content of hbase table `rates`.
-
-Then the content of `LatestRates` table when we query at time `10:15:00` is:
{% highlight sql %}
-10:15:00 > SELECT * FROM LatestRates;
-
-currency rate
-========= ====
-US Dollar 102
-Euro 114
-Yen 1
-{% endhighlight %}
-
-Then the content of `LatestRates` table when we query at time `11:00:00` is:
-{% highlight sql %}
-11:00:00 > SELECT * FROM LatestRates;
-
-currency rate
-========= ====
-US Dollar 102
-Euro 116
-Yen 1
+update_time product_id product_name price
+=========== ========== ============ =====
+12:00:00 p_001 scooter 12.99
+12:00:00 p_002 basketball 19.99
{% endhighlight %}
-In Flink, this is represented by a [*regular Table*](#defining-regular-table).
-Temporal Table
---------------
-<span class="label label-danger">Attention</span> This is only supported in
Blink planner.
+## Versioned Table Sources
-Flink uses primary key constraint and event time to define both versioned
table and versioned view.
+Versioned tables are defined implicity for any table whose underlying source
or format directly define changelogs. Examples include the [upsert Kafka]({%
link dev/table/connectors/upsert-kafka.md %}) source as well as database
changelog formats such as [debezium]({% link
dev/table/connectors/formats/debezium.md %}) and [canal]({% link
dev/table/connectors/formats/canal.md %}). As discussed above, the only
additional requirement is the `CREATE` table statement must contain a primary
key and event-time attribute.
Review comment:
```suggestion
Versioned tables are defined implicitly for any tables whose underlying
sources or formats directly define changelogs. Examples include the [upsert
Kafka]({% link dev/table/connectors/upsert-kafka.md %}) source as well as
database changelog formats such as [debezium]({% link
dev/table/connectors/formats/debezium.md %}) and [canal]({% link
dev/table/connectors/formats/canal.md %}). As discussed above, the only
additional requirement is the `CREATE` table statement must contain a primary
key and an event-time attribute.
```
##########
File path: docs/dev/table/streaming/temporal_tables.md
##########
@@ -57,134 +56,68 @@ SELECT * FROM product_changelog;
-(DELETE) 18:00:00 p_001 scooter 12.99
{% endhighlight %}
-The table `product_changelog` represents an ever growing changelog of database
table `products`, for example, the initial price of product `scooter` is
`11.11` at `00:01:00`, and the price increases to `12.99` at `12:00:00`,
- the product item is deleted from the table `products` at `18:00:00`.
+Given this set of changes, we track how the price of a scooter changes over
time.
+It is initially $11.11 at `00:01:00` when added to the catalog.
+The price then rises to $12.99 at `12:00:00` before being deleted from the
catalog at `18:00:00`.
-Given that we would like to output the version of `product_changelog` table of
the time `10:00:00`, the following table shows the result.
-{% highlight sql %}
-update_time product_id product_name price
-=========== ========== ============ =====
-00:01:00 p_001 scooter 11.11
-00:02:00 p_002 basketball 23.11
-{% endhighlight %}
+If we queried the table for various products' prices at different times, we
would retrieve different results. At `10:00:00` the table would show one set of
prices.
-Given that we would like to output the version of `product_changelog` table of
the time `13:00:00`, the following table shows the result.
{% highlight sql %}
update_time product_id product_name price
=========== ========== ============ =====
12:00:00 p_001 scooter 12.99
12:00:00 p_002 basketball 19.99
{% endhighlight %}
-In above example, the specific version of the table is tracked by
`update_time` and `product_id`, the `product_id` would be a primary key for
`product_changelog` table and `update_time` would be the event time.
-
-In Flink, this is represented by a [*versioned
table*](#defining-versioned-table).
-
-### Correlate with a regular table
-On the other hand, some use cases require to join a regular table which is an
external database table.
+While at `13:00:00,` we found find another.
-Let's assume that `LatestRates` is a table (e.g. stored in HBase) which is
materialized with the latest rates. The `LatestRates` always represents the
latest content of hbase table `rates`.
-
-Then the content of `LatestRates` table when we query at time `10:15:00` is:
{% highlight sql %}
-10:15:00 > SELECT * FROM LatestRates;
-
-currency rate
-========= ====
-US Dollar 102
-Euro 114
-Yen 1
-{% endhighlight %}
-
-Then the content of `LatestRates` table when we query at time `11:00:00` is:
-{% highlight sql %}
-11:00:00 > SELECT * FROM LatestRates;
-
-currency rate
-========= ====
-US Dollar 102
-Euro 116
-Yen 1
+update_time product_id product_name price
+=========== ========== ============ =====
+12:00:00 p_001 scooter 12.99
+12:00:00 p_002 basketball 19.99
{% endhighlight %}
-In Flink, this is represented by a [*regular Table*](#defining-regular-table).
-Temporal Table
---------------
-<span class="label label-danger">Attention</span> This is only supported in
Blink planner.
+## Versioned Table Sources
-Flink uses primary key constraint and event time to define both versioned
table and versioned view.
+Versioned tables are defined implicity for any table whose underlying source
or format directly define changelogs. Examples include the [upsert Kafka]({%
link dev/table/connectors/upsert-kafka.md %}) source as well as database
changelog formats such as [debezium]({% link
dev/table/connectors/formats/debezium.md %}) and [canal]({% link
dev/table/connectors/formats/canal.md %}). As discussed above, the only
additional requirement is the `CREATE` table statement must contain a primary
key and event-time attribute.
-### Defining Versioned Table
-The table is a versioned table in Flink only is the table contains primary key
constraint and event time.
{% highlight sql %}
--- Define a versioned table
-CREATE TABLE product_changelog (
- product_id STRING,
- product_name STRING,
- product_price DECIMAL(10, 4),
- update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
- PRIMARY KEY(product_id) NOT ENFORCED, -- (1) defines the primary key
constraint
- WATERMARK FOR update_time AS update_time -- (2) defines the event time by
watermark
-) WITH (
- 'connector' = 'kafka',
- 'topic' = 'products',
- 'scan.startup.mode' = 'earliest-offset',
- 'properties.bootstrap.servers' = 'localhost:9092',
- 'value.format' = 'debezium-json'
-);
+CREATE TABLE products (
+ product_id STRING,
+ product_name STRING,
+ price DECIMAL(32, 2),
+ update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp'
VIRTUAL,
+ PRIMARY KEY (product_id) NOT ENFORCED
+ WATERMARK FOR update_time AS update_time
+) WITH (...);
{% endhighlight %}
-Line `(1)` defines the primary key constraint for table `product_changelog`,
Line `(2)` defines the `update_time` as event time for table
`product_changelog`,
-thus table `product_changelog` is a versioned table.
+## Versioned Table Views
-**Note**: The grammar `METADATA FROM 'value.source.timestamp' VIRTUAL` means
extract the database
-operation execution time for every changelog, it's strongly recommended
defines the database operation execution time
-as event time rather than ingestion-time or time in the record, otherwise the
version extract from the changelog may
-mismatch with the version in database.
-
-### Defining Versioned View
+Flink also supports defining versioned views if the underlying query contains
a unique key constraint and event-time attribute. Image an append-only table of
currency rates.
-Flink also supports defining versioned view only if the view contains unique
key constraint and event time.
-
-Let’s assume that we have the following table `RatesHistory`:
{% highlight sql %}
--- Define an append-only table
-CREATE TABLE RatesHistory (
- currency_time TIMESTAMP(3),
- currency STRING,
- rate DECIMAL(38, 10),
- WATERMARK FOR currency_time AS currency_time -- defines the event time
+CREATE TABLE currency_rates (
+ currency STRING,
+ rate DECIMAL(32, 10)
+ update_time TIMESTAMP(3),
+ WATERMARK FOR update_time AS update_time
) WITH (
- 'connector' = 'kafka',
- 'topic' = 'rates',
- 'scan.startup.mode' = 'earliest-offset',
- 'properties.bootstrap.servers' = 'localhost:9092',
- 'format' = 'json' -- this is an append only
source
-)
+ 'connector' = 'kafka',
+ 'topic' = 'rates',
+ 'properties.bootstrap.servers' = 'localhost:9092',
+ 'format' = 'json'
+);
{% endhighlight %}
-Table `RatesHistory` represents an ever growing append-only table of currency
exchange rates with respect to
-Yen (which has a rate of 1). For example, the exchange rate for the period
from 09:00 to 10:45 of Euro to Yen was 114.
-From 10:45 to 11:15 it was 116.
-
-{% highlight sql %}
-SELECT * FROM RatesHistory;
-
-currency_time currency rate
-============= ========= ====
-09:00:00 US Dollar 102
-09:00:00 Euro 114
-09:00:00 Yen 1
-10:45:00 Euro 116
-11:15:00 Euro 119
-11:49:00 Pounds 108
-{% endhighlight %}
+The table `currency_rates` contains a row for each currency - with respect to
USD - and receives a new row each time the rate changes.
Review comment:
```suggestion
The table `currency_rates` contains a row for each currency — with
respect to USD — and receives a new row each time the rate changes.
```
##########
File path: docs/dev/table/streaming/temporal_tables.md
##########
@@ -194,10 +127,10 @@ SELECT currency, rate, currency_time -- (1)
`currency_time` keeps the
SELECT *,
ROW_NUMBER() OVER (PARTITION BY currency -- (2) the inferred unique key
`currency` can be a primary key
ORDER BY currency_time DESC) AS rowNum
Review comment:
As mentioned above, `currency_time` should become `update time`
throughout this view definition.
```suggestion
ORDER BY update_time DESC) AS rowNum
```
##########
File path: docs/dev/table/streaming/temporal_tables.md
##########
@@ -57,134 +56,68 @@ SELECT * FROM product_changelog;
-(DELETE) 18:00:00 p_001 scooter 12.99
{% endhighlight %}
-The table `product_changelog` represents an ever growing changelog of database
table `products`, for example, the initial price of product `scooter` is
`11.11` at `00:01:00`, and the price increases to `12.99` at `12:00:00`,
- the product item is deleted from the table `products` at `18:00:00`.
+Given this set of changes, we track how the price of a scooter changes over
time.
+It is initially $11.11 at `00:01:00` when added to the catalog.
+The price then rises to $12.99 at `12:00:00` before being deleted from the
catalog at `18:00:00`.
-Given that we would like to output the version of `product_changelog` table of
the time `10:00:00`, the following table shows the result.
-{% highlight sql %}
-update_time product_id product_name price
-=========== ========== ============ =====
-00:01:00 p_001 scooter 11.11
-00:02:00 p_002 basketball 23.11
-{% endhighlight %}
+If we queried the table for various products' prices at different times, we
would retrieve different results. At `10:00:00` the table would show one set of
prices.
-Given that we would like to output the version of `product_changelog` table of
the time `13:00:00`, the following table shows the result.
{% highlight sql %}
update_time product_id product_name price
=========== ========== ============ =====
12:00:00 p_001 scooter 12.99
12:00:00 p_002 basketball 19.99
{% endhighlight %}
-In above example, the specific version of the table is tracked by
`update_time` and `product_id`, the `product_id` would be a primary key for
`product_changelog` table and `update_time` would be the event time.
-
-In Flink, this is represented by a [*versioned
table*](#defining-versioned-table).
-
-### Correlate with a regular table
-On the other hand, some use cases require to join a regular table which is an
external database table.
+While at `13:00:00,` we found find another.
Review comment:
```suggestion
While at `13:00:00,` we would find another set of prices.
```
##########
File path: docs/dev/table/streaming/temporal_tables.md
##########
@@ -194,10 +127,10 @@ SELECT currency, rate, currency_time -- (1)
`currency_time` keeps the
SELECT *,
ROW_NUMBER() OVER (PARTITION BY currency -- (2) the inferred unique key
`currency` can be a primary key
ORDER BY currency_time DESC) AS rowNum
- FROM RatesHistory)
+ FROM currency_rates)
WHERE rowNum = 1;
--- the view `versioned_rates` will produce changelog as the following.
+-- the view `versioned_rates` will produce a changelog as the following.
(changelog kind) currency_time currency rate
Review comment:
```suggestion
(changelog kind) update_time currency rate
```
##########
File path: docs/dev/table/streaming/temporal_tables.md
##########
@@ -57,134 +56,68 @@ SELECT * FROM product_changelog;
-(DELETE) 18:00:00 p_001 scooter 12.99
{% endhighlight %}
-The table `product_changelog` represents an ever growing changelog of database
table `products`, for example, the initial price of product `scooter` is
`11.11` at `00:01:00`, and the price increases to `12.99` at `12:00:00`,
- the product item is deleted from the table `products` at `18:00:00`.
+Given this set of changes, we track how the price of a scooter changes over
time.
+It is initially $11.11 at `00:01:00` when added to the catalog.
+The price then rises to $12.99 at `12:00:00` before being deleted from the
catalog at `18:00:00`.
-Given that we would like to output the version of `product_changelog` table of
the time `10:00:00`, the following table shows the result.
-{% highlight sql %}
-update_time product_id product_name price
-=========== ========== ============ =====
-00:01:00 p_001 scooter 11.11
-00:02:00 p_002 basketball 23.11
-{% endhighlight %}
+If we queried the table for various products' prices at different times, we
would retrieve different results. At `10:00:00` the table would show one set of
prices.
-Given that we would like to output the version of `product_changelog` table of
the time `13:00:00`, the following table shows the result.
{% highlight sql %}
update_time product_id product_name price
=========== ========== ============ =====
12:00:00 p_001 scooter 12.99
12:00:00 p_002 basketball 19.99
{% endhighlight %}
-In above example, the specific version of the table is tracked by
`update_time` and `product_id`, the `product_id` would be a primary key for
`product_changelog` table and `update_time` would be the event time.
-
-In Flink, this is represented by a [*versioned
table*](#defining-versioned-table).
-
-### Correlate with a regular table
-On the other hand, some use cases require to join a regular table which is an
external database table.
+While at `13:00:00,` we found find another.
-Let's assume that `LatestRates` is a table (e.g. stored in HBase) which is
materialized with the latest rates. The `LatestRates` always represents the
latest content of hbase table `rates`.
-
-Then the content of `LatestRates` table when we query at time `10:15:00` is:
{% highlight sql %}
-10:15:00 > SELECT * FROM LatestRates;
-
-currency rate
-========= ====
-US Dollar 102
-Euro 114
-Yen 1
-{% endhighlight %}
-
-Then the content of `LatestRates` table when we query at time `11:00:00` is:
-{% highlight sql %}
-11:00:00 > SELECT * FROM LatestRates;
-
-currency rate
-========= ====
-US Dollar 102
-Euro 116
-Yen 1
+update_time product_id product_name price
+=========== ========== ============ =====
+12:00:00 p_001 scooter 12.99
+12:00:00 p_002 basketball 19.99
{% endhighlight %}
-In Flink, this is represented by a [*regular Table*](#defining-regular-table).
-Temporal Table
---------------
-<span class="label label-danger">Attention</span> This is only supported in
Blink planner.
+## Versioned Table Sources
-Flink uses primary key constraint and event time to define both versioned
table and versioned view.
+Versioned tables are defined implicity for any table whose underlying source
or format directly define changelogs. Examples include the [upsert Kafka]({%
link dev/table/connectors/upsert-kafka.md %}) source as well as database
changelog formats such as [debezium]({% link
dev/table/connectors/formats/debezium.md %}) and [canal]({% link
dev/table/connectors/formats/canal.md %}). As discussed above, the only
additional requirement is the `CREATE` table statement must contain a primary
key and event-time attribute.
-### Defining Versioned Table
-The table is a versioned table in Flink only is the table contains primary key
constraint and event time.
{% highlight sql %}
--- Define a versioned table
-CREATE TABLE product_changelog (
- product_id STRING,
- product_name STRING,
- product_price DECIMAL(10, 4),
- update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
- PRIMARY KEY(product_id) NOT ENFORCED, -- (1) defines the primary key
constraint
- WATERMARK FOR update_time AS update_time -- (2) defines the event time by
watermark
-) WITH (
- 'connector' = 'kafka',
- 'topic' = 'products',
- 'scan.startup.mode' = 'earliest-offset',
- 'properties.bootstrap.servers' = 'localhost:9092',
- 'value.format' = 'debezium-json'
-);
+CREATE TABLE products (
+ product_id STRING,
+ product_name STRING,
+ price DECIMAL(32, 2),
+ update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp'
VIRTUAL,
+ PRIMARY KEY (product_id) NOT ENFORCED
+ WATERMARK FOR update_time AS update_time
+) WITH (...);
{% endhighlight %}
-Line `(1)` defines the primary key constraint for table `product_changelog`,
Line `(2)` defines the `update_time` as event time for table
`product_changelog`,
-thus table `product_changelog` is a versioned table.
+## Versioned Table Views
-**Note**: The grammar `METADATA FROM 'value.source.timestamp' VIRTUAL` means
extract the database
-operation execution time for every changelog, it's strongly recommended
defines the database operation execution time
-as event time rather than ingestion-time or time in the record, otherwise the
version extract from the changelog may
-mismatch with the version in database.
-
-### Defining Versioned View
+Flink also supports defining versioned views if the underlying query contains
a unique key constraint and event-time attribute. Image an append-only table of
currency rates.
-Flink also supports defining versioned view only if the view contains unique
key constraint and event time.
-
-Let’s assume that we have the following table `RatesHistory`:
{% highlight sql %}
--- Define an append-only table
-CREATE TABLE RatesHistory (
- currency_time TIMESTAMP(3),
- currency STRING,
- rate DECIMAL(38, 10),
- WATERMARK FOR currency_time AS currency_time -- defines the event time
+CREATE TABLE currency_rates (
+ currency STRING,
+ rate DECIMAL(32, 10)
+ update_time TIMESTAMP(3),
+ WATERMARK FOR update_time AS update_time
) WITH (
- 'connector' = 'kafka',
- 'topic' = 'rates',
- 'scan.startup.mode' = 'earliest-offset',
- 'properties.bootstrap.servers' = 'localhost:9092',
- 'format' = 'json' -- this is an append only
source
-)
+ 'connector' = 'kafka',
+ 'topic' = 'rates',
+ 'properties.bootstrap.servers' = 'localhost:9092',
+ 'format' = 'json'
+);
{% endhighlight %}
-Table `RatesHistory` represents an ever growing append-only table of currency
exchange rates with respect to
-Yen (which has a rate of 1). For example, the exchange rate for the period
from 09:00 to 10:45 of Euro to Yen was 114.
-From 10:45 to 11:15 it was 116.
-
-{% highlight sql %}
-SELECT * FROM RatesHistory;
-
-currency_time currency rate
-============= ========= ====
-09:00:00 US Dollar 102
-09:00:00 Euro 114
-09:00:00 Yen 1
-10:45:00 Euro 116
-11:15:00 Euro 119
-11:49:00 Pounds 108
-{% endhighlight %}
+The table `currency_rates` contains a row for each currency - with respect to
USD - and receives a new row each time the rate changes.
+The `JSON` format does not support native changelog semantics, and so Flink
can only read this table as append-only.
Review comment:
```suggestion
The `JSON` format does not support native changelog semantics, so Flink can
only read this table as append-only.
```
##########
File path: docs/dev/table/streaming/temporal_tables.md
##########
@@ -57,134 +56,68 @@ SELECT * FROM product_changelog;
-(DELETE) 18:00:00 p_001 scooter 12.99
{% endhighlight %}
-The table `product_changelog` represents an ever growing changelog of database
table `products`, for example, the initial price of product `scooter` is
`11.11` at `00:01:00`, and the price increases to `12.99` at `12:00:00`,
- the product item is deleted from the table `products` at `18:00:00`.
+Given this set of changes, we track how the price of a scooter changes over
time.
+It is initially $11.11 at `00:01:00` when added to the catalog.
+The price then rises to $12.99 at `12:00:00` before being deleted from the
catalog at `18:00:00`.
-Given that we would like to output the version of `product_changelog` table of
the time `10:00:00`, the following table shows the result.
-{% highlight sql %}
-update_time product_id product_name price
-=========== ========== ============ =====
-00:01:00 p_001 scooter 11.11
-00:02:00 p_002 basketball 23.11
-{% endhighlight %}
+If we queried the table for various products' prices at different times, we
would retrieve different results. At `10:00:00` the table would show one set of
prices.
-Given that we would like to output the version of `product_changelog` table of
the time `13:00:00`, the following table shows the result.
{% highlight sql %}
update_time product_id product_name price
=========== ========== ============ =====
12:00:00 p_001 scooter 12.99
12:00:00 p_002 basketball 19.99
{% endhighlight %}
-In above example, the specific version of the table is tracked by
`update_time` and `product_id`, the `product_id` would be a primary key for
`product_changelog` table and `update_time` would be the event time.
-
-In Flink, this is represented by a [*versioned
table*](#defining-versioned-table).
-
-### Correlate with a regular table
-On the other hand, some use cases require to join a regular table which is an
external database table.
+While at `13:00:00,` we found find another.
-Let's assume that `LatestRates` is a table (e.g. stored in HBase) which is
materialized with the latest rates. The `LatestRates` always represents the
latest content of hbase table `rates`.
-
-Then the content of `LatestRates` table when we query at time `10:15:00` is:
{% highlight sql %}
-10:15:00 > SELECT * FROM LatestRates;
-
-currency rate
-========= ====
-US Dollar 102
-Euro 114
-Yen 1
-{% endhighlight %}
-
-Then the content of `LatestRates` table when we query at time `11:00:00` is:
-{% highlight sql %}
-11:00:00 > SELECT * FROM LatestRates;
-
-currency rate
-========= ====
-US Dollar 102
-Euro 116
-Yen 1
+update_time product_id product_name price
+=========== ========== ============ =====
+12:00:00 p_001 scooter 12.99
+12:00:00 p_002 basketball 19.99
{% endhighlight %}
-In Flink, this is represented by a [*regular Table*](#defining-regular-table).
-Temporal Table
---------------
-<span class="label label-danger">Attention</span> This is only supported in
Blink planner.
+## Versioned Table Sources
-Flink uses primary key constraint and event time to define both versioned
table and versioned view.
+Versioned tables are defined implicity for any table whose underlying source
or format directly define changelogs. Examples include the [upsert Kafka]({%
link dev/table/connectors/upsert-kafka.md %}) source as well as database
changelog formats such as [debezium]({% link
dev/table/connectors/formats/debezium.md %}) and [canal]({% link
dev/table/connectors/formats/canal.md %}). As discussed above, the only
additional requirement is the `CREATE` table statement must contain a primary
key and event-time attribute.
-### Defining Versioned Table
-The table is a versioned table in Flink only is the table contains primary key
constraint and event time.
{% highlight sql %}
--- Define a versioned table
-CREATE TABLE product_changelog (
- product_id STRING,
- product_name STRING,
- product_price DECIMAL(10, 4),
- update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
- PRIMARY KEY(product_id) NOT ENFORCED, -- (1) defines the primary key
constraint
- WATERMARK FOR update_time AS update_time -- (2) defines the event time by
watermark
-) WITH (
- 'connector' = 'kafka',
- 'topic' = 'products',
- 'scan.startup.mode' = 'earliest-offset',
- 'properties.bootstrap.servers' = 'localhost:9092',
- 'value.format' = 'debezium-json'
-);
+CREATE TABLE products (
+ product_id STRING,
+ product_name STRING,
+ price DECIMAL(32, 2),
+ update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp'
VIRTUAL,
+ PRIMARY KEY (product_id) NOT ENFORCED
+ WATERMARK FOR update_time AS update_time
+) WITH (...);
{% endhighlight %}
-Line `(1)` defines the primary key constraint for table `product_changelog`,
Line `(2)` defines the `update_time` as event time for table
`product_changelog`,
-thus table `product_changelog` is a versioned table.
+## Versioned Table Views
-**Note**: The grammar `METADATA FROM 'value.source.timestamp' VIRTUAL` means
extract the database
-operation execution time for every changelog, it's strongly recommended
defines the database operation execution time
-as event time rather than ingestion-time or time in the record, otherwise the
version extract from the changelog may
-mismatch with the version in database.
-
-### Defining Versioned View
+Flink also supports defining versioned views if the underlying query contains
a unique key constraint and event-time attribute. Image an append-only table of
currency rates.
-Flink also supports defining versioned view only if the view contains unique
key constraint and event time.
-
-Let’s assume that we have the following table `RatesHistory`:
{% highlight sql %}
--- Define an append-only table
-CREATE TABLE RatesHistory (
- currency_time TIMESTAMP(3),
- currency STRING,
- rate DECIMAL(38, 10),
- WATERMARK FOR currency_time AS currency_time -- defines the event time
+CREATE TABLE currency_rates (
+ currency STRING,
+ rate DECIMAL(32, 10)
+ update_time TIMESTAMP(3),
+ WATERMARK FOR update_time AS update_time
) WITH (
- 'connector' = 'kafka',
- 'topic' = 'rates',
- 'scan.startup.mode' = 'earliest-offset',
- 'properties.bootstrap.servers' = 'localhost:9092',
- 'format' = 'json' -- this is an append only
source
-)
+ 'connector' = 'kafka',
+ 'topic' = 'rates',
+ 'properties.bootstrap.servers' = 'localhost:9092',
+ 'format' = 'json'
+);
{% endhighlight %}
-Table `RatesHistory` represents an ever growing append-only table of currency
exchange rates with respect to
-Yen (which has a rate of 1). For example, the exchange rate for the period
from 09:00 to 10:45 of Euro to Yen was 114.
-From 10:45 to 11:15 it was 116.
-
-{% highlight sql %}
-SELECT * FROM RatesHistory;
-
-currency_time currency rate
-============= ========= ====
-09:00:00 US Dollar 102
-09:00:00 Euro 114
-09:00:00 Yen 1
-10:45:00 Euro 116
-11:15:00 Euro 119
-11:49:00 Pounds 108
-{% endhighlight %}
+The table `currency_rates` contains a row for each currency - with respect to
USD - and receives a new row each time the rate changes.
+The `JSON` format does not support native changelog semantics, and so Flink
can only read this table as append-only.
+However, it is clear to us (the query developer) that this table has all the
necessary information to define a versioned table.
-To define a versioned table on `RatesHistory`, Flink supports defining a
versioned view
-by [deduplication query]({% link dev/table/sql/queries.md %}#deduplication)
which produces an ordered changelog
-stream with an inferred primary key(`currency`) and event
time(`currency_time`).
+Flink can reinterpret this table as a versioned table by defining a
[deduplication query]({% link dev/table/sql/queries.md %}#deduplication) which
produces an ordered changelog stream with an inferred primary key (currency)
and event time (update_time).
Review comment:
In the view defined below, `currency_time` must become `update_time` in
both the SELECT and in the comment. github won't allow me to suggest changes
here, unfortunately.
Also, I'm confused by one thing. I see how `currency` can be the primary key
that a versioned table needs, but how does this actually work without our
explicitly declaring that `currency` is the PRIMARY KEY for this view? Why do
we specify a primary key when using CREATE TABLE, but not when using CREATE
VIEW?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]