alpinegizmo commented on a change in pull request #14292:
URL: https://github.com/apache/flink/pull/14292#discussion_r538223361



##########
File path: docs/dev/table/streaming/temporal_tables.md
##########
@@ -22,30 +22,29 @@ specific language governing permissions and limitations
 under the License.
 --> 
 
-A Temporal table is a table that evolves over time -  otherwise known in Flink 
as a [dynamic table]({% link dev/table/streaming/dynamic_tables.md %}). Rows in 
a temporal table are associated with one or more temporal periods and all Flink 
tables are temporal(dynamic).
+Flink SQL operates over dynamic tables that evolve, which may either be 
append-only or updating. 
+Versioned tables represent a special type of updating table that remembers the 
past values for each key.
 
-A temporal table contains one or more versioned table snapshots, it can be a 
changing history table which tracks the changes(e.g. database changelog, 
contains all snapshots) or a changing dimensioned table which materializes the 
changes(e.g. database table, contains the latest snapshot). 
+<span class="label label-danger">Attention</span> The Legacy planner does not 
support versioned tables.
 
-**Version**: A temporal table can split into a set of versioned table 
snapshots, the version in table snapshots represents the valid life circle of 
rows, the start time and the end time of the valid period can be assigned by 
users. 
-Temporal table can split to `versioned table` and `regular table` according to 
the table can tracks its history version or not.
+* This will be replaced by the TOC
+{:toc}
 
-**Versioned table**: If the rows a in temporal table can track its history 
changes and visit its history versions, we call this a versioned table. Tables 
that comes from a database changelog can be defined as a versioned table.
+## Concept
 
-**Regular table**: If the row in temporal table can only track and visit its 
latest version,we call this kind of temporal table as regular table. Tables 
that comes from a database or HBase can be defined as a regular table.
+Dynamic tables define relations over time. 
+Often, particularly when working with metadata, a key's old value does not 
become irrelevant when it changes. 
 
-* This will be replaced by the TOC
-{:toc}
+Flink SQL can define versioned tables over any dynamic table with a `PRIMARY 
KEY` constraint and time attribute. 
 
-Motivation
-----------
+A primary key constraint in Flink means that a column or set of columns of a 
table or view are unique and non-null.
+The primary key semantic on a upserting table means the materialized changes 
for a particular key (`INSERT`/`UPDATE`/`DELETE`) represent the changes to a 
single row over time. The time attribute on a upserting table defines when each 
change occurred.
 
-### Correlate with a versioned table
-Given a scenario the order stream correlates the dimension table product, the 
table `orders` comes from kafka which contains the real time orders, the table 
`product_changelog` comes from the changelog of the database table `products`,
- the product price in table `products` is changing over time. 
+Taken together, Flink can track the changes to a row over time and maintain 
the period for which each value was valid for that key.
 
-{% highlight sql %}
-SELECT * FROM product_changelog;
+Suppose a table that tracks the price of different products in a store. 

Review comment:
       ```suggestion
   Suppose a table tracks the prices of different products in a store. 
   ```

##########
File path: docs/dev/table/streaming/temporal_tables.md
##########
@@ -57,134 +56,68 @@ SELECT * FROM product_changelog;
 -(DELETE)         18:00:00     p_001      scooter      12.99 
 {% endhighlight %}
 
-The table `product_changelog` represents an ever growing changelog of database 
table `products`,  for example, the initial price of product `scooter` is 
`11.11` at `00:01:00`, and the price increases to `12.99` at `12:00:00`,
- the product item is deleted from the table `products` at `18:00:00`.
+Given this set of changes, we track how the price of a scooter changes over 
time.
+It is initially $11.11 at `00:01:00` when added to the catalog.
+The price then rises to $12.99 at `12:00:00` before being deleted from the 
catalog at `18:00:00`.
 
-Given that we would like to output the version of `product_changelog` table of 
the time `10:00:00`, the following table shows the result. 
-{% highlight sql %}
-update_time  product_id product_name price
-===========  ========== ============ ===== 
-00:01:00     p_001      scooter      11.11
-00:02:00     p_002      basketball   23.11
-{% endhighlight %}
+If we queried the table for various products' prices at different times, we 
would retrieve different results. At `10:00:00` the table would show one set of 
prices.
 
-Given that we would like to output the version of `product_changelog` table of 
the time `13:00:00`, the following table shows the result. 
 {% highlight sql %}
 update_time  product_id product_name price
 ===========  ========== ============ ===== 
 12:00:00     p_001      scooter      12.99
 12:00:00     p_002      basketball   19.99

Review comment:
       ```suggestion
   00:01:00     p_001      scooter      11.11
   00:02:00     p_002      basketball   23.11
   ```

##########
File path: docs/dev/table/streaming/temporal_tables.md
##########
@@ -57,134 +56,68 @@ SELECT * FROM product_changelog;
 -(DELETE)         18:00:00     p_001      scooter      12.99 
 {% endhighlight %}
 
-The table `product_changelog` represents an ever growing changelog of database 
table `products`,  for example, the initial price of product `scooter` is 
`11.11` at `00:01:00`, and the price increases to `12.99` at `12:00:00`,
- the product item is deleted from the table `products` at `18:00:00`.
+Given this set of changes, we track how the price of a scooter changes over 
time.
+It is initially $11.11 at `00:01:00` when added to the catalog.
+The price then rises to $12.99 at `12:00:00` before being deleted from the 
catalog at `18:00:00`.
 
-Given that we would like to output the version of `product_changelog` table of 
the time `10:00:00`, the following table shows the result. 
-{% highlight sql %}
-update_time  product_id product_name price
-===========  ========== ============ ===== 
-00:01:00     p_001      scooter      11.11
-00:02:00     p_002      basketball   23.11
-{% endhighlight %}
+If we queried the table for various products' prices at different times, we 
would retrieve different results. At `10:00:00` the table would show one set of 
prices.
 
-Given that we would like to output the version of `product_changelog` table of 
the time `13:00:00`, the following table shows the result. 
 {% highlight sql %}
 update_time  product_id product_name price
 ===========  ========== ============ ===== 
 12:00:00     p_001      scooter      12.99
 12:00:00     p_002      basketball   19.99
 {% endhighlight %}
 
-In above example, the specific version of the table is tracked by 
`update_time` and `product_id`,  the `product_id` would be a primary key for 
`product_changelog` table and `update_time` would be the event time.
-
-In Flink, this is represented by a [*versioned 
table*](#defining-versioned-table).
-
-### Correlate with a regular table
 
-On the other hand, some use cases require to join a regular table which is an 
external database table.
+While at `13:00:00,` we found find another. 
 
-Let's assume that `LatestRates` is a table (e.g. stored in HBase) which is 
materialized with the latest rates. The `LatestRates` always represents the 
latest content of hbase table `rates`.
- 
-Then the content of `LatestRates` table when we query at time `10:15:00` is:
 {% highlight sql %}
-10:15:00 > SELECT * FROM LatestRates;
-
-currency  rate
-========= ====
-US Dollar 102
-Euro      114
-Yen       1
-{% endhighlight %}
-
-Then the content of `LatestRates` table when we query at time `11:00:00` is:
-{% highlight sql %}
-11:00:00 > SELECT * FROM LatestRates;
-
-currency  rate
-========= ====
-US Dollar 102
-Euro      116
-Yen       1
+update_time  product_id product_name price
+===========  ========== ============ ===== 
+12:00:00     p_001      scooter      12.99
+12:00:00     p_002      basketball   19.99
 {% endhighlight %}
 
-In Flink, this is represented by a [*regular Table*](#defining-regular-table).
 
-Temporal Table
---------------
-<span class="label label-danger">Attention</span> This is only supported in 
Blink planner.
+## Versioned Table Sources
 
-Flink uses primary key constraint and event time to define both versioned 
table and versioned view.
+Versioned tables are defined implicity for any table whose underlying source 
or format directly define changelogs. Examples include the [upsert Kafka]({% 
link dev/table/connectors/upsert-kafka.md %}) source as well as database 
changelog formats such as [debezium]({% link 
dev/table/connectors/formats/debezium.md %}) and [canal]({% link 
dev/table/connectors/formats/canal.md %}). As discussed above, the only 
additional requirement is the `CREATE` table statement must contain a primary 
key and event-time attribute. 
 
-### Defining Versioned Table
-The table is a versioned table in Flink only is the table contains primary key 
constraint and event time.
 {% highlight sql %}
--- Define a versioned table
-CREATE TABLE product_changelog (
-  product_id STRING,
-  product_name STRING,
-  product_price DECIMAL(10, 4),
-  update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
-  PRIMARY KEY(product_id) NOT ENFORCED,      -- (1) defines the primary key 
constraint
-  WATERMARK FOR update_time AS update_time   -- (2) defines the event time by 
watermark                               
-) WITH (
-  'connector' = 'kafka',
-  'topic' = 'products',
-  'scan.startup.mode' = 'earliest-offset',
-  'properties.bootstrap.servers' = 'localhost:9092',
-  'value.format' = 'debezium-json'
-);
+CREATE TABLE products (
+       product_id    STRING,
+       product_name  STRING,
+       price         DECIMAL(32, 2),
+       update_time   TIMESTAMP(3) METADATA FROM 'value.source.timestamp' 
VIRTUAL,
+       PRIMARY KEY (product_id) NOT ENFORCED
+       WATERMARK FOR update_time AS update_time
+) WITH (...);
 {% endhighlight %}
 
-Line `(1)` defines the primary key constraint for table `product_changelog`, 
Line `(2)` defines the `update_time` as event time for table 
`product_changelog`,
-thus table `product_changelog` is a versioned table.
+## Versioned Table Views
 
-**Note**: The grammar `METADATA FROM 'value.source.timestamp' VIRTUAL` means 
extract the database
-operation execution time for every changelog, it's strongly recommended 
defines the database operation execution time 
-as event time rather than ingestion-time or time in the record, otherwise the 
version extract from the changelog may
-mismatch with the version in database.
- 
-### Defining Versioned View
+Flink also supports defining versioned views if the underlying query contains 
a unique key constraint and event-time attribute. Image an append-only table of 
currency rates. 

Review comment:
       ```suggestion
   Flink also supports defining versioned views if the underlying query 
contains a unique key constraint and event-time attribute. Imagine an 
append-only table of currency rates. 
   ```

##########
File path: docs/dev/table/streaming/temporal_tables.md
##########
@@ -57,134 +56,68 @@ SELECT * FROM product_changelog;
 -(DELETE)         18:00:00     p_001      scooter      12.99 
 {% endhighlight %}
 
-The table `product_changelog` represents an ever growing changelog of database 
table `products`,  for example, the initial price of product `scooter` is 
`11.11` at `00:01:00`, and the price increases to `12.99` at `12:00:00`,
- the product item is deleted from the table `products` at `18:00:00`.
+Given this set of changes, we track how the price of a scooter changes over 
time.
+It is initially $11.11 at `00:01:00` when added to the catalog.
+The price then rises to $12.99 at `12:00:00` before being deleted from the 
catalog at `18:00:00`.
 
-Given that we would like to output the version of `product_changelog` table of 
the time `10:00:00`, the following table shows the result. 
-{% highlight sql %}
-update_time  product_id product_name price
-===========  ========== ============ ===== 
-00:01:00     p_001      scooter      11.11
-00:02:00     p_002      basketball   23.11
-{% endhighlight %}
+If we queried the table for various products' prices at different times, we 
would retrieve different results. At `10:00:00` the table would show one set of 
prices.
 
-Given that we would like to output the version of `product_changelog` table of 
the time `13:00:00`, the following table shows the result. 
 {% highlight sql %}
 update_time  product_id product_name price
 ===========  ========== ============ ===== 
 12:00:00     p_001      scooter      12.99
 12:00:00     p_002      basketball   19.99
 {% endhighlight %}
 
-In above example, the specific version of the table is tracked by 
`update_time` and `product_id`,  the `product_id` would be a primary key for 
`product_changelog` table and `update_time` would be the event time.
-
-In Flink, this is represented by a [*versioned 
table*](#defining-versioned-table).
-
-### Correlate with a regular table
 
-On the other hand, some use cases require to join a regular table which is an 
external database table.
+While at `13:00:00,` we found find another. 
 
-Let's assume that `LatestRates` is a table (e.g. stored in HBase) which is 
materialized with the latest rates. The `LatestRates` always represents the 
latest content of hbase table `rates`.
- 
-Then the content of `LatestRates` table when we query at time `10:15:00` is:
 {% highlight sql %}
-10:15:00 > SELECT * FROM LatestRates;
-
-currency  rate
-========= ====
-US Dollar 102
-Euro      114
-Yen       1
-{% endhighlight %}
-
-Then the content of `LatestRates` table when we query at time `11:00:00` is:
-{% highlight sql %}
-11:00:00 > SELECT * FROM LatestRates;
-
-currency  rate
-========= ====
-US Dollar 102
-Euro      116
-Yen       1
+update_time  product_id product_name price
+===========  ========== ============ ===== 
+12:00:00     p_001      scooter      12.99
+12:00:00     p_002      basketball   19.99
 {% endhighlight %}
 
-In Flink, this is represented by a [*regular Table*](#defining-regular-table).
 
-Temporal Table
---------------
-<span class="label label-danger">Attention</span> This is only supported in 
Blink planner.
+## Versioned Table Sources
 
-Flink uses primary key constraint and event time to define both versioned 
table and versioned view.
+Versioned tables are defined implicity for any table whose underlying source 
or format directly define changelogs. Examples include the [upsert Kafka]({% 
link dev/table/connectors/upsert-kafka.md %}) source as well as database 
changelog formats such as [debezium]({% link 
dev/table/connectors/formats/debezium.md %}) and [canal]({% link 
dev/table/connectors/formats/canal.md %}). As discussed above, the only 
additional requirement is the `CREATE` table statement must contain a primary 
key and event-time attribute. 

Review comment:
       ```suggestion
   Versioned tables are defined implicitly for any tables whose underlying 
sources or formats directly define changelogs. Examples include the [upsert 
Kafka]({% link dev/table/connectors/upsert-kafka.md %}) source as well as 
database changelog formats such as [debezium]({% link 
dev/table/connectors/formats/debezium.md %}) and [canal]({% link 
dev/table/connectors/formats/canal.md %}). As discussed above, the only 
additional requirement is the `CREATE` table statement must contain a primary 
key and an event-time attribute. 
   ```

##########
File path: docs/dev/table/streaming/temporal_tables.md
##########
@@ -57,134 +56,68 @@ SELECT * FROM product_changelog;
 -(DELETE)         18:00:00     p_001      scooter      12.99 
 {% endhighlight %}
 
-The table `product_changelog` represents an ever growing changelog of database 
table `products`,  for example, the initial price of product `scooter` is 
`11.11` at `00:01:00`, and the price increases to `12.99` at `12:00:00`,
- the product item is deleted from the table `products` at `18:00:00`.
+Given this set of changes, we track how the price of a scooter changes over 
time.
+It is initially $11.11 at `00:01:00` when added to the catalog.
+The price then rises to $12.99 at `12:00:00` before being deleted from the 
catalog at `18:00:00`.
 
-Given that we would like to output the version of `product_changelog` table of 
the time `10:00:00`, the following table shows the result. 
-{% highlight sql %}
-update_time  product_id product_name price
-===========  ========== ============ ===== 
-00:01:00     p_001      scooter      11.11
-00:02:00     p_002      basketball   23.11
-{% endhighlight %}
+If we queried the table for various products' prices at different times, we 
would retrieve different results. At `10:00:00` the table would show one set of 
prices.
 
-Given that we would like to output the version of `product_changelog` table of 
the time `13:00:00`, the following table shows the result. 
 {% highlight sql %}
 update_time  product_id product_name price
 ===========  ========== ============ ===== 
 12:00:00     p_001      scooter      12.99
 12:00:00     p_002      basketball   19.99
 {% endhighlight %}
 
-In above example, the specific version of the table is tracked by 
`update_time` and `product_id`,  the `product_id` would be a primary key for 
`product_changelog` table and `update_time` would be the event time.
-
-In Flink, this is represented by a [*versioned 
table*](#defining-versioned-table).
-
-### Correlate with a regular table
 
-On the other hand, some use cases require to join a regular table which is an 
external database table.
+While at `13:00:00,` we found find another. 
 
-Let's assume that `LatestRates` is a table (e.g. stored in HBase) which is 
materialized with the latest rates. The `LatestRates` always represents the 
latest content of hbase table `rates`.
- 
-Then the content of `LatestRates` table when we query at time `10:15:00` is:
 {% highlight sql %}
-10:15:00 > SELECT * FROM LatestRates;
-
-currency  rate
-========= ====
-US Dollar 102
-Euro      114
-Yen       1
-{% endhighlight %}
-
-Then the content of `LatestRates` table when we query at time `11:00:00` is:
-{% highlight sql %}
-11:00:00 > SELECT * FROM LatestRates;
-
-currency  rate
-========= ====
-US Dollar 102
-Euro      116
-Yen       1
+update_time  product_id product_name price
+===========  ========== ============ ===== 
+12:00:00     p_001      scooter      12.99
+12:00:00     p_002      basketball   19.99
 {% endhighlight %}
 
-In Flink, this is represented by a [*regular Table*](#defining-regular-table).
 
-Temporal Table
---------------
-<span class="label label-danger">Attention</span> This is only supported in 
Blink planner.
+## Versioned Table Sources
 
-Flink uses primary key constraint and event time to define both versioned 
table and versioned view.
+Versioned tables are defined implicity for any table whose underlying source 
or format directly define changelogs. Examples include the [upsert Kafka]({% 
link dev/table/connectors/upsert-kafka.md %}) source as well as database 
changelog formats such as [debezium]({% link 
dev/table/connectors/formats/debezium.md %}) and [canal]({% link 
dev/table/connectors/formats/canal.md %}). As discussed above, the only 
additional requirement is the `CREATE` table statement must contain a primary 
key and event-time attribute. 
 
-### Defining Versioned Table
-The table is a versioned table in Flink only is the table contains primary key 
constraint and event time.
 {% highlight sql %}
--- Define a versioned table
-CREATE TABLE product_changelog (
-  product_id STRING,
-  product_name STRING,
-  product_price DECIMAL(10, 4),
-  update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
-  PRIMARY KEY(product_id) NOT ENFORCED,      -- (1) defines the primary key 
constraint
-  WATERMARK FOR update_time AS update_time   -- (2) defines the event time by 
watermark                               
-) WITH (
-  'connector' = 'kafka',
-  'topic' = 'products',
-  'scan.startup.mode' = 'earliest-offset',
-  'properties.bootstrap.servers' = 'localhost:9092',
-  'value.format' = 'debezium-json'
-);
+CREATE TABLE products (
+       product_id    STRING,
+       product_name  STRING,
+       price         DECIMAL(32, 2),
+       update_time   TIMESTAMP(3) METADATA FROM 'value.source.timestamp' 
VIRTUAL,
+       PRIMARY KEY (product_id) NOT ENFORCED
+       WATERMARK FOR update_time AS update_time
+) WITH (...);
 {% endhighlight %}
 
-Line `(1)` defines the primary key constraint for table `product_changelog`, 
Line `(2)` defines the `update_time` as event time for table 
`product_changelog`,
-thus table `product_changelog` is a versioned table.
+## Versioned Table Views
 
-**Note**: The grammar `METADATA FROM 'value.source.timestamp' VIRTUAL` means 
extract the database
-operation execution time for every changelog, it's strongly recommended 
defines the database operation execution time 
-as event time rather than ingestion-time or time in the record, otherwise the 
version extract from the changelog may
-mismatch with the version in database.
- 
-### Defining Versioned View
+Flink also supports defining versioned views if the underlying query contains 
a unique key constraint and event-time attribute. Image an append-only table of 
currency rates. 
 
-Flink also supports defining versioned view only if the view contains unique 
key constraint and event time.
- 
-Let’s assume that we have the following table `RatesHistory`:
 {% highlight sql %}
--- Define an append-only table
-CREATE TABLE RatesHistory (
-    currency_time TIMESTAMP(3),
-    currency STRING,
-    rate DECIMAL(38, 10),
-    WATERMARK FOR currency_time AS currency_time   -- defines the event time
+CREATE TABLE currency_rates (
+       currency      STRING,
+       rate          DECIMAL(32, 10)
+       update_time   TIMESTAMP(3),
+       WATERMARK FOR update_time AS update_time
 ) WITH (
-  'connector' = 'kafka',
-  'topic' = 'rates',
-  'scan.startup.mode' = 'earliest-offset',
-  'properties.bootstrap.servers' = 'localhost:9092',
-  'format' = 'json'                                -- this is an append only 
source
-)
+       'connector' = 'kafka',
+       'topic'     = 'rates',
+       'properties.bootstrap.servers' = 'localhost:9092',
+       'format'    = 'json'
+);
 {% endhighlight %}
 
-Table `RatesHistory` represents an ever growing append-only table of currency 
exchange rates with respect to 
-Yen (which has a rate of 1). For example, the exchange rate for the period 
from 09:00 to 10:45 of Euro to Yen was 114.
-From 10:45 to 11:15 it was 116.
-
-{% highlight sql %}
-SELECT * FROM RatesHistory;
-
-currency_time currency  rate
-============= ========= ====
-09:00:00      US Dollar 102
-09:00:00      Euro      114
-09:00:00      Yen       1
-10:45:00      Euro      116
-11:15:00      Euro      119
-11:49:00      Pounds    108
-{% endhighlight %}
+The table `currency_rates` contains a row for each currency - with respect to 
USD - and receives a new row each time the rate changes.

Review comment:
       ```suggestion
   The table `currency_rates` contains a row for each currency &mdash; with 
respect to USD &mdash; and receives a new row each time the rate changes.
   ```

##########
File path: docs/dev/table/streaming/temporal_tables.md
##########
@@ -194,10 +127,10 @@ SELECT currency, rate, currency_time            -- (1) 
`currency_time` keeps the
       SELECT *,
       ROW_NUMBER() OVER (PARTITION BY currency  -- (2) the inferred unique key 
`currency` can be a primary key
          ORDER BY currency_time DESC) AS rowNum 

Review comment:
       As mentioned above, `currency_time` should become `update time` 
throughout this view definition.
   
   ```suggestion
            ORDER BY update_time DESC) AS rowNum 
   ```

##########
File path: docs/dev/table/streaming/temporal_tables.md
##########
@@ -57,134 +56,68 @@ SELECT * FROM product_changelog;
 -(DELETE)         18:00:00     p_001      scooter      12.99 
 {% endhighlight %}
 
-The table `product_changelog` represents an ever growing changelog of database 
table `products`,  for example, the initial price of product `scooter` is 
`11.11` at `00:01:00`, and the price increases to `12.99` at `12:00:00`,
- the product item is deleted from the table `products` at `18:00:00`.
+Given this set of changes, we track how the price of a scooter changes over 
time.
+It is initially $11.11 at `00:01:00` when added to the catalog.
+The price then rises to $12.99 at `12:00:00` before being deleted from the 
catalog at `18:00:00`.
 
-Given that we would like to output the version of `product_changelog` table of 
the time `10:00:00`, the following table shows the result. 
-{% highlight sql %}
-update_time  product_id product_name price
-===========  ========== ============ ===== 
-00:01:00     p_001      scooter      11.11
-00:02:00     p_002      basketball   23.11
-{% endhighlight %}
+If we queried the table for various products' prices at different times, we 
would retrieve different results. At `10:00:00` the table would show one set of 
prices.
 
-Given that we would like to output the version of `product_changelog` table of 
the time `13:00:00`, the following table shows the result. 
 {% highlight sql %}
 update_time  product_id product_name price
 ===========  ========== ============ ===== 
 12:00:00     p_001      scooter      12.99
 12:00:00     p_002      basketball   19.99
 {% endhighlight %}
 
-In above example, the specific version of the table is tracked by 
`update_time` and `product_id`,  the `product_id` would be a primary key for 
`product_changelog` table and `update_time` would be the event time.
-
-In Flink, this is represented by a [*versioned 
table*](#defining-versioned-table).
-
-### Correlate with a regular table
 
-On the other hand, some use cases require to join a regular table which is an 
external database table.
+While at `13:00:00,` we found find another. 

Review comment:
       ```suggestion
   While at `13:00:00,` we would find another set of prices.
   ```

##########
File path: docs/dev/table/streaming/temporal_tables.md
##########
@@ -194,10 +127,10 @@ SELECT currency, rate, currency_time            -- (1) 
`currency_time` keeps the
       SELECT *,
       ROW_NUMBER() OVER (PARTITION BY currency  -- (2) the inferred unique key 
`currency` can be a primary key
          ORDER BY currency_time DESC) AS rowNum 
-      FROM RatesHistory)
+      FROM currency_rates)
 WHERE rowNum = 1; 
 
--- the view `versioned_rates` will produce changelog as the following.
+-- the view `versioned_rates` will produce a changelog as the following.
 (changelog kind) currency_time currency   rate

Review comment:
       ```suggestion
   (changelog kind) update_time currency   rate
   ```

##########
File path: docs/dev/table/streaming/temporal_tables.md
##########
@@ -57,134 +56,68 @@ SELECT * FROM product_changelog;
 -(DELETE)         18:00:00     p_001      scooter      12.99 
 {% endhighlight %}
 
-The table `product_changelog` represents an ever growing changelog of database 
table `products`,  for example, the initial price of product `scooter` is 
`11.11` at `00:01:00`, and the price increases to `12.99` at `12:00:00`,
- the product item is deleted from the table `products` at `18:00:00`.
+Given this set of changes, we track how the price of a scooter changes over 
time.
+It is initially $11.11 at `00:01:00` when added to the catalog.
+The price then rises to $12.99 at `12:00:00` before being deleted from the 
catalog at `18:00:00`.
 
-Given that we would like to output the version of `product_changelog` table of 
the time `10:00:00`, the following table shows the result. 
-{% highlight sql %}
-update_time  product_id product_name price
-===========  ========== ============ ===== 
-00:01:00     p_001      scooter      11.11
-00:02:00     p_002      basketball   23.11
-{% endhighlight %}
+If we queried the table for various products' prices at different times, we 
would retrieve different results. At `10:00:00` the table would show one set of 
prices.
 
-Given that we would like to output the version of `product_changelog` table of 
the time `13:00:00`, the following table shows the result. 
 {% highlight sql %}
 update_time  product_id product_name price
 ===========  ========== ============ ===== 
 12:00:00     p_001      scooter      12.99
 12:00:00     p_002      basketball   19.99
 {% endhighlight %}
 
-In above example, the specific version of the table is tracked by 
`update_time` and `product_id`,  the `product_id` would be a primary key for 
`product_changelog` table and `update_time` would be the event time.
-
-In Flink, this is represented by a [*versioned 
table*](#defining-versioned-table).
-
-### Correlate with a regular table
 
-On the other hand, some use cases require to join a regular table which is an 
external database table.
+While at `13:00:00,` we found find another. 
 
-Let's assume that `LatestRates` is a table (e.g. stored in HBase) which is 
materialized with the latest rates. The `LatestRates` always represents the 
latest content of hbase table `rates`.
- 
-Then the content of `LatestRates` table when we query at time `10:15:00` is:
 {% highlight sql %}
-10:15:00 > SELECT * FROM LatestRates;
-
-currency  rate
-========= ====
-US Dollar 102
-Euro      114
-Yen       1
-{% endhighlight %}
-
-Then the content of `LatestRates` table when we query at time `11:00:00` is:
-{% highlight sql %}
-11:00:00 > SELECT * FROM LatestRates;
-
-currency  rate
-========= ====
-US Dollar 102
-Euro      116
-Yen       1
+update_time  product_id product_name price
+===========  ========== ============ ===== 
+12:00:00     p_001      scooter      12.99
+12:00:00     p_002      basketball   19.99
 {% endhighlight %}
 
-In Flink, this is represented by a [*regular Table*](#defining-regular-table).
 
-Temporal Table
---------------
-<span class="label label-danger">Attention</span> This is only supported in 
Blink planner.
+## Versioned Table Sources
 
-Flink uses primary key constraint and event time to define both versioned 
table and versioned view.
+Versioned tables are defined implicity for any table whose underlying source 
or format directly define changelogs. Examples include the [upsert Kafka]({% 
link dev/table/connectors/upsert-kafka.md %}) source as well as database 
changelog formats such as [debezium]({% link 
dev/table/connectors/formats/debezium.md %}) and [canal]({% link 
dev/table/connectors/formats/canal.md %}). As discussed above, the only 
additional requirement is the `CREATE` table statement must contain a primary 
key and event-time attribute. 
 
-### Defining Versioned Table
-The table is a versioned table in Flink only is the table contains primary key 
constraint and event time.
 {% highlight sql %}
--- Define a versioned table
-CREATE TABLE product_changelog (
-  product_id STRING,
-  product_name STRING,
-  product_price DECIMAL(10, 4),
-  update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
-  PRIMARY KEY(product_id) NOT ENFORCED,      -- (1) defines the primary key 
constraint
-  WATERMARK FOR update_time AS update_time   -- (2) defines the event time by 
watermark                               
-) WITH (
-  'connector' = 'kafka',
-  'topic' = 'products',
-  'scan.startup.mode' = 'earliest-offset',
-  'properties.bootstrap.servers' = 'localhost:9092',
-  'value.format' = 'debezium-json'
-);
+CREATE TABLE products (
+       product_id    STRING,
+       product_name  STRING,
+       price         DECIMAL(32, 2),
+       update_time   TIMESTAMP(3) METADATA FROM 'value.source.timestamp' 
VIRTUAL,
+       PRIMARY KEY (product_id) NOT ENFORCED
+       WATERMARK FOR update_time AS update_time
+) WITH (...);
 {% endhighlight %}
 
-Line `(1)` defines the primary key constraint for table `product_changelog`, 
Line `(2)` defines the `update_time` as event time for table 
`product_changelog`,
-thus table `product_changelog` is a versioned table.
+## Versioned Table Views
 
-**Note**: The grammar `METADATA FROM 'value.source.timestamp' VIRTUAL` means 
extract the database
-operation execution time for every changelog, it's strongly recommended 
defines the database operation execution time 
-as event time rather than ingestion-time or time in the record, otherwise the 
version extract from the changelog may
-mismatch with the version in database.
- 
-### Defining Versioned View
+Flink also supports defining versioned views if the underlying query contains 
a unique key constraint and event-time attribute. Image an append-only table of 
currency rates. 
 
-Flink also supports defining versioned view only if the view contains unique 
key constraint and event time.
- 
-Let’s assume that we have the following table `RatesHistory`:
 {% highlight sql %}
--- Define an append-only table
-CREATE TABLE RatesHistory (
-    currency_time TIMESTAMP(3),
-    currency STRING,
-    rate DECIMAL(38, 10),
-    WATERMARK FOR currency_time AS currency_time   -- defines the event time
+CREATE TABLE currency_rates (
+       currency      STRING,
+       rate          DECIMAL(32, 10)
+       update_time   TIMESTAMP(3),
+       WATERMARK FOR update_time AS update_time
 ) WITH (
-  'connector' = 'kafka',
-  'topic' = 'rates',
-  'scan.startup.mode' = 'earliest-offset',
-  'properties.bootstrap.servers' = 'localhost:9092',
-  'format' = 'json'                                -- this is an append only 
source
-)
+       'connector' = 'kafka',
+       'topic'     = 'rates',
+       'properties.bootstrap.servers' = 'localhost:9092',
+       'format'    = 'json'
+);
 {% endhighlight %}
 
-Table `RatesHistory` represents an ever growing append-only table of currency 
exchange rates with respect to 
-Yen (which has a rate of 1). For example, the exchange rate for the period 
from 09:00 to 10:45 of Euro to Yen was 114.
-From 10:45 to 11:15 it was 116.
-
-{% highlight sql %}
-SELECT * FROM RatesHistory;
-
-currency_time currency  rate
-============= ========= ====
-09:00:00      US Dollar 102
-09:00:00      Euro      114
-09:00:00      Yen       1
-10:45:00      Euro      116
-11:15:00      Euro      119
-11:49:00      Pounds    108
-{% endhighlight %}
+The table `currency_rates` contains a row for each currency - with respect to 
USD - and receives a new row each time the rate changes.
+The `JSON` format does not support native changelog semantics, and so Flink 
can only read this table as append-only.

Review comment:
       ```suggestion
   The `JSON` format does not support native changelog semantics, so Flink can 
only read this table as append-only.
   ```

##########
File path: docs/dev/table/streaming/temporal_tables.md
##########
@@ -57,134 +56,68 @@ SELECT * FROM product_changelog;
 -(DELETE)         18:00:00     p_001      scooter      12.99 
 {% endhighlight %}
 
-The table `product_changelog` represents an ever growing changelog of database 
table `products`,  for example, the initial price of product `scooter` is 
`11.11` at `00:01:00`, and the price increases to `12.99` at `12:00:00`,
- the product item is deleted from the table `products` at `18:00:00`.
+Given this set of changes, we track how the price of a scooter changes over 
time.
+It is initially $11.11 at `00:01:00` when added to the catalog.
+The price then rises to $12.99 at `12:00:00` before being deleted from the 
catalog at `18:00:00`.
 
-Given that we would like to output the version of `product_changelog` table of 
the time `10:00:00`, the following table shows the result. 
-{% highlight sql %}
-update_time  product_id product_name price
-===========  ========== ============ ===== 
-00:01:00     p_001      scooter      11.11
-00:02:00     p_002      basketball   23.11
-{% endhighlight %}
+If we queried the table for various products' prices at different times, we 
would retrieve different results. At `10:00:00` the table would show one set of 
prices.
 
-Given that we would like to output the version of `product_changelog` table of 
the time `13:00:00`, the following table shows the result. 
 {% highlight sql %}
 update_time  product_id product_name price
 ===========  ========== ============ ===== 
 12:00:00     p_001      scooter      12.99
 12:00:00     p_002      basketball   19.99
 {% endhighlight %}
 
-In above example, the specific version of the table is tracked by 
`update_time` and `product_id`,  the `product_id` would be a primary key for 
`product_changelog` table and `update_time` would be the event time.
-
-In Flink, this is represented by a [*versioned 
table*](#defining-versioned-table).
-
-### Correlate with a regular table
 
-On the other hand, some use cases require to join a regular table which is an 
external database table.
+While at `13:00:00,` we found find another. 
 
-Let's assume that `LatestRates` is a table (e.g. stored in HBase) which is 
materialized with the latest rates. The `LatestRates` always represents the 
latest content of hbase table `rates`.
- 
-Then the content of `LatestRates` table when we query at time `10:15:00` is:
 {% highlight sql %}
-10:15:00 > SELECT * FROM LatestRates;
-
-currency  rate
-========= ====
-US Dollar 102
-Euro      114
-Yen       1
-{% endhighlight %}
-
-Then the content of `LatestRates` table when we query at time `11:00:00` is:
-{% highlight sql %}
-11:00:00 > SELECT * FROM LatestRates;
-
-currency  rate
-========= ====
-US Dollar 102
-Euro      116
-Yen       1
+update_time  product_id product_name price
+===========  ========== ============ ===== 
+12:00:00     p_001      scooter      12.99
+12:00:00     p_002      basketball   19.99
 {% endhighlight %}
 
-In Flink, this is represented by a [*regular Table*](#defining-regular-table).
 
-Temporal Table
---------------
-<span class="label label-danger">Attention</span> This is only supported in 
Blink planner.
+## Versioned Table Sources
 
-Flink uses primary key constraint and event time to define both versioned 
table and versioned view.
+Versioned tables are defined implicity for any table whose underlying source 
or format directly define changelogs. Examples include the [upsert Kafka]({% 
link dev/table/connectors/upsert-kafka.md %}) source as well as database 
changelog formats such as [debezium]({% link 
dev/table/connectors/formats/debezium.md %}) and [canal]({% link 
dev/table/connectors/formats/canal.md %}). As discussed above, the only 
additional requirement is the `CREATE` table statement must contain a primary 
key and event-time attribute. 
 
-### Defining Versioned Table
-The table is a versioned table in Flink only is the table contains primary key 
constraint and event time.
 {% highlight sql %}
--- Define a versioned table
-CREATE TABLE product_changelog (
-  product_id STRING,
-  product_name STRING,
-  product_price DECIMAL(10, 4),
-  update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
-  PRIMARY KEY(product_id) NOT ENFORCED,      -- (1) defines the primary key 
constraint
-  WATERMARK FOR update_time AS update_time   -- (2) defines the event time by 
watermark                               
-) WITH (
-  'connector' = 'kafka',
-  'topic' = 'products',
-  'scan.startup.mode' = 'earliest-offset',
-  'properties.bootstrap.servers' = 'localhost:9092',
-  'value.format' = 'debezium-json'
-);
+CREATE TABLE products (
+       product_id    STRING,
+       product_name  STRING,
+       price         DECIMAL(32, 2),
+       update_time   TIMESTAMP(3) METADATA FROM 'value.source.timestamp' 
VIRTUAL,
+       PRIMARY KEY (product_id) NOT ENFORCED
+       WATERMARK FOR update_time AS update_time
+) WITH (...);
 {% endhighlight %}
 
-Line `(1)` defines the primary key constraint for table `product_changelog`, 
Line `(2)` defines the `update_time` as event time for table 
`product_changelog`,
-thus table `product_changelog` is a versioned table.
+## Versioned Table Views
 
-**Note**: The grammar `METADATA FROM 'value.source.timestamp' VIRTUAL` means 
extract the database
-operation execution time for every changelog, it's strongly recommended 
defines the database operation execution time 
-as event time rather than ingestion-time or time in the record, otherwise the 
version extract from the changelog may
-mismatch with the version in database.
- 
-### Defining Versioned View
+Flink also supports defining versioned views if the underlying query contains 
a unique key constraint and event-time attribute. Image an append-only table of 
currency rates. 
 
-Flink also supports defining versioned view only if the view contains unique 
key constraint and event time.
- 
-Let’s assume that we have the following table `RatesHistory`:
 {% highlight sql %}
--- Define an append-only table
-CREATE TABLE RatesHistory (
-    currency_time TIMESTAMP(3),
-    currency STRING,
-    rate DECIMAL(38, 10),
-    WATERMARK FOR currency_time AS currency_time   -- defines the event time
+CREATE TABLE currency_rates (
+       currency      STRING,
+       rate          DECIMAL(32, 10)
+       update_time   TIMESTAMP(3),
+       WATERMARK FOR update_time AS update_time
 ) WITH (
-  'connector' = 'kafka',
-  'topic' = 'rates',
-  'scan.startup.mode' = 'earliest-offset',
-  'properties.bootstrap.servers' = 'localhost:9092',
-  'format' = 'json'                                -- this is an append only 
source
-)
+       'connector' = 'kafka',
+       'topic'     = 'rates',
+       'properties.bootstrap.servers' = 'localhost:9092',
+       'format'    = 'json'
+);
 {% endhighlight %}
 
-Table `RatesHistory` represents an ever growing append-only table of currency 
exchange rates with respect to 
-Yen (which has a rate of 1). For example, the exchange rate for the period 
from 09:00 to 10:45 of Euro to Yen was 114.
-From 10:45 to 11:15 it was 116.
-
-{% highlight sql %}
-SELECT * FROM RatesHistory;
-
-currency_time currency  rate
-============= ========= ====
-09:00:00      US Dollar 102
-09:00:00      Euro      114
-09:00:00      Yen       1
-10:45:00      Euro      116
-11:15:00      Euro      119
-11:49:00      Pounds    108
-{% endhighlight %}
+The table `currency_rates` contains a row for each currency - with respect to 
USD - and receives a new row each time the rate changes.
+The `JSON` format does not support native changelog semantics, and so Flink 
can only read this table as append-only.
+However, it is clear to us (the query developer) that this table has all the 
necessary information to define a versioned table. 
 
-To define a versioned table on `RatesHistory`, Flink supports defining a 
versioned view 
-by [deduplication query]({% link dev/table/sql/queries.md %}#deduplication) 
which produces an ordered changelog
-stream with an inferred primary key(`currency`) and event 
time(`currency_time`).
+Flink can reinterpret this table as a versioned table by defining a 
[deduplication query]({% link dev/table/sql/queries.md %}#deduplication) which 
produces an ordered changelog stream with an inferred primary key (currency) 
and event time (update_time). 

Review comment:
       In the view defined below, `currency_time` must become `update_time` in 
both the SELECT and in the comment. github won't allow me to suggest changes 
here, unfortunately.
   
   Also, I'm confused by one thing. I see how `currency` can be the primary key 
that a versioned table needs, but how does this actually work without our 
explicitly declaring that `currency` is the PRIMARY KEY for this view? Why do 
we specify a primary key when using CREATE TABLE, but not when using CREATE 
VIEW?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to