dawidwys commented on a change in pull request #14152:
URL: https://github.com/apache/flink/pull/14152#discussion_r528911851



##########
File path: docs/dev/table/streaming/temporal_tables.md
##########
@@ -20,103 +20,255 @@ software distributed under the License is distributed on 
an
 KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
--->
+--> 
 
-Temporal Tables represent a concept of a (parameterized) view on a changing 
table that returns the content of a table at a specific point in time.
+Temporal table is a table that evolves over time as known as Flink [dynamic 
table](dynamic_tables.html), rows in temporal table are associated with one or 
more temporal periods, all Flink tables are temporal(dynamic) table.
 
-The changing table can either be a changing history table which tracks the 
changes (e.g. database changelogs) or a changing dimension table which 
materializes the changes (e.g. database tables).
+A temporal table contains one or more versioned table snapshots, it can be a 
changing history table which tracks the changes(e.g. database changelog, 
contains all snapshots) or a changing dimensioned table which materializes the 
changes(e.g. database table, contains the latest snapshot). 
 
-For the changing history table, Flink can keep track of the changes and allows 
for accessing the content of the table at a certain point in time within a 
query. In Flink, this kind of table is represented by a *Temporal Table 
Function*.
+**Version**: A temporal table can split into a set of versioned table 
snapshots, the version in table snapshots represents the valid life circle of 
rows, the start time and the end time of the valid period can be assigned by 
users. 
+Temporal table can split to `versioned table` and `regular table` according to 
the table can tracks its history version or not.
 
-For the changing dimension table, Flink allows for accessing the content of 
the table at processing time within a query. In Flink, this kind of table is 
represented by a *Temporal Table*.
+**Versioned table**: If the row in temporal table can track its history 
changes and visit its history versions, we call this kind of temporal table as 
versioned table, the table comes from database changelog can define as a 
versioned table.
+
+**Regular table**: If the row in temporal table can only track and visit its 
latest version,we call this kind of temporal table as regular table. The table 
comes from database or HBase can define as a regular table.
 
 * This will be replaced by the TOC
 {:toc}
 
 Motivation
 ----------
 
-### Correlate with a changing history table
+### Correlate with a versioned table
+Given a scenario the order stream correlates the dimension table product, the 
table `orders` comes from kafka which contains the real time orders, the table 
`product_changelog` comes from the changelog of the database table `products`,
+ the product price in table `products` is changing over time. 
 
-Let's assume that we have the following table `RatesHistory`.
+{% highlight sql %}
+SELECT * FROM product_changelog;
+
+(changelog kind)  update_time  product_id product_name price
+================= ===========  ========== ============ ===== 
++(INSERT)         00:01:00     p_001      scooter      11.11
++(INSERT)         00:02:00     p_002      basketball   23.11
+-(UPDATE_BEFORE)  12:00:00     p_001      scooter      11.11
++(UPDATE_AFTER)   12:00:00     p_001      scooter      12.99
+-(UPDATE_BEFORE)  12:00:00     p_002      basketball   23.11 
++(UPDATE_AFTER)   12:00:00     p_002      basketball   19.99
+-(DELETE)         18:00:00     p_001      scooter      12.99 
+{% endhighlight %}
 
+The table `product_changelog` represents an ever growing changelog of database 
table `products`,  for example, the initial price of product `scooter` is 
`11.11` at `00:01:00`, and the price increases to `12.99` at `12:00:00`,
+ the product item is deleted from the table `products` at `18:00:00`.
+
+Given that we would like to output the version of `product_changelog` table of 
the time `10:00:00`, the following table shows the result. 
 {% highlight sql %}
-SELECT * FROM RatesHistory;
+update_time  product_id product_name price
+===========  ========== ============ ===== 
+00:01:00     p_001      scooter      11.11
+00:02:00     p_002      basketball   23.11
+{% endhighlight %}
 
-rowtime currency   rate
-======= ======== ======
-09:00   US Dollar   102
-09:00   Euro        114
-09:00   Yen           1
-10:45   Euro        116
-11:15   Euro        119
-11:49   Pounds      108
+Given that we would like to output the version of `product_changelog` table of 
the time `13:00:00`, the following table shows the result. 
+{% highlight sql %}
+update_time  product_id product_name price
+===========  ========== ============ ===== 
+12:00:00     p_001      scooter      12.99
+12:00:00     p_002      basketball   19.99
 {% endhighlight %}
 
-`RatesHistory` represents an ever growing append-only table of currency 
exchange rates with respect to `Yen` (which has a rate of `1`).
-For example, the exchange rate for the period from `09:00` to `10:45` of 
`Euro` to `Yen` was `114`. From `10:45` to `11:15` it was `116`.
+In above example, the specific version of the table is tracked by 
`update_time` and `product_id`,  the `product_id` would be a primary key for 
`product_changelog` table and `update_time` would be the event time.
+
+In Flink, this is represented by a [*versioned 
table*](#defining-versioned-table).
+
+### Correlate with a regular table
 
-Given that we would like to output all current rates at the time `10:58`, we 
would need the following SQL query to compute a result table:
+On the other hand, some use cases require to join a regular table which is an 
external database table.
 
+Let's assume that `LatestRates` is a table (e.g. stored in HBase) which is 
materialized with the latest rates. The `LatestRates` always represents the 
latest content of hbase table `rates`.
+ 
+Then the content of `LatestRates` table when we query at time `10:15:00` is:
 {% highlight sql %}
-SELECT *
-FROM RatesHistory AS r
-WHERE r.rowtime = (
-  SELECT MAX(rowtime)
-  FROM RatesHistory AS r2
-  WHERE r2.currency = r.currency
-  AND r2.rowtime <= TIME '10:58');
+10:15:00 > SELECT * FROM LatestRates;
+
+currency  rate
+========= ====
+US Dollar 102
+Euro      114
+Yen       1
+{% endhighlight %}
+
+Then the content of `LatestRates` table when we query at time `11:00:00` is:
+{% highlight sql %}
+11:00:00 > SELECT * FROM LatestRates;
+
+currency  rate
+========= ====
+US Dollar 102
+Euro      116
+Yen       1
 {% endhighlight %}
 
-The correlated subquery determines the maximum time for the corresponding 
currency that is lower or equal than the desired time. The outer query lists 
the rates that have a maximum timestamp.
+In Flink, this is represented by a [*regular Table*](#defining-regular-table).
+
+Temporal Table
+--------------
+<span class="label label-danger">Attention</span> This is only supported in 
Blink planner.
 
-The following table shows the result of such a computation. In our example, 
the update to `Euro` at `10:45` is taken into account, however, the update to 
`Euro` at `11:15` and the new entry of `Pounds` are not considered in the 
table's version at time `10:58`.
+Flink uses primary key constraint and event time to define both versioned 
table and versioned view.
 
-{% highlight text %}
-rowtime currency   rate
-======= ======== ======
-09:00   US Dollar   102
-09:00   Yen           1
-10:45   Euro        116
+### Defining Versioned Table
+The table is a versioned table in Flink only is the table contains primary key 
constraint and event time.
+{% highlight sql %}
+-- Define a versioned table
+CREATE TABLE product_changelog (
+  product_id STRING,
+  product_name STRING,
+  product_price DECIMAL(10, 4),
+  update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
+  PRIMARY KEY(product_id) NOT ENFORCED,      -- (1) defines the primary key 
constraint
+  WATERMARK FOR update_time AS update_time   -- (2) defines the event time by 
watermark                               
+) WITH (
+  'connector' = 'kafka',
+  'topic' = 'products',
+  'scan.startup.mode' = 'earliest-offset',
+  'properties.bootstrap.servers' = 'localhost:9092',
+  'value.format' = 'debezium-json'
+);
 {% endhighlight %}
 
-The concept of *Temporal Tables* aims to simplify such queries, speed up their 
execution, and reduce Flink's state usage. A *Temporal Table* is a 
parameterized view on an append-only table that interprets the rows of the 
append-only table as the changelog of a table and provides the version of that 
table at a specific point in time. Interpreting the append-only table as a 
changelog requires the specification of a primary key attribute and a timestamp 
attribute. The primary key determines which rows are overwritten and the 
timestamp determines the time during which a row is valid.
+Line `(1)` defines the primary key constraint for table `product_changelog`, 
Line `(2)` defines the `update_time` as event time for table 
`product_changelog`,
+thus table `product_changelog` is a versioned table.
 
-In the above example `currency` would be a primary key for `RatesHistory` 
table and `rowtime` would be the timestamp attribute.
+**Note**: The grammar `METADATA FROM 'value.source.timestamp' VIRTUAL` means 
extract the database
+operation execution time for every changelog, it's strongly recommended 
defines the database operation execution time 
+as event time rather than ingestion-time or time in the record, otherwise the 
version extract from the changelog may
+mismatch with the version in database.
+ 
+### Defining Versioned View
 
-In Flink, this is represented by a [*Temporal Table 
Function*](#temporal-table-function).
+Flink also supports defining versioned view only if the view contains unique 
key constraint and event time.
+ 
+Let’s assume that we have the following table `RatesHistory`:
+{% highlight sql %}
+-- Define an append-only table
+CREATE TABLE RatesHistory (
+    currency_time TIMESTAMP(3),
+    currency STRING,
+    rate DECIMAL(38, 10),
+    WATERMARK FOR currency_time AS currency_time   -- defines the event time
+) WITH (
+  'connector' = 'kafka',
+  'topic' = 'rates',
+  'scan.startup.mode' = 'earliest-offset',
+  'properties.bootstrap.servers' = 'localhost:9092',
+  'format' = 'json'                                -- this is an append only 
source
+)
+{% endhighlight %}
 
-### Correlate with a changing dimension table
+Table `RatesHistory` represents an ever growing append-only table of currency 
exchange rates with respect to 
+Yen (which has a rate of 1). For example, the exchange rate for the period 
from 09:00 to 10:45 of Euro to Yen was 114.
+From 10:45 to 11:15 it was 116.
+
+{% highlight sql %}
+SELECT * FROM RatesHistory;
 
-On the other hand, some use cases require to join a changing dimension table 
which is an external database table.
+currency_time currency  rate
+============= ========= ====
+09:00:00      US Dollar 102
+09:00:00      Euro      114
+09:00:00      Yen       1
+10:45:00      Euro      116
+11:15:00      Euro      119
+11:49:00      Pounds    108
+{% endhighlight %}
 
-Let's assume that `LatestRates` is a table (e.g. stored in HBase) which is 
materialized with the latest rate. The `LatestRates` is the materialized 
history `RatesHistory`. Then the content of `LatestRates` table at time `10:58` 
will be:
+To define a versioned table on `RatesHistory`, Flink supports defining a 
versioned view 
+by [deduplication query]({{ site.baseurl 
}}/dev/table/sql/queries.html#deduplication) which produces an ordered changelog
+stream with an inferred primary key(`currency`) and event 
time(`currency_time`).
 
-{% highlight text %}
-10:58> SELECT * FROM LatestRates;
-currency   rate
-======== ======
-US Dollar   102
-Yen           1
-Euro        116
+{% highlight sql %}
+-- Define a versioned view
+CREATE VIEW versioned_rates AS              
+SELECT currency, rate, currency_time            -- (1) `currency_time` keeps 
the event time
+  FROM (
+      SELECT *,
+      ROW_NUMBER() OVER (PARTITION BY currency  -- (2) the inferred unique key 
`currency` can be a primary key
+         ORDER BY currency_time DESC) AS rowNum 
+      FROM rates )

Review comment:
       Make sure the examples are "runnable" We should use the `RatesHistory` 
table here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to